Repository: beam Updated Branches: refs/heads/master 294f6339e -> 25b9c35a9
[BEAM-2997] Encapsulate enums within a message so that C++/Python have meaningful namespaces when importing. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6fddd4ed Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6fddd4ed Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6fddd4ed Branch: refs/heads/master Commit: 6fddd4ed951c6e5618add72f49211bedcf046edc Parents: da531b7 Author: Luke Cwik <lc...@google.com> Authored: Wed Sep 27 13:33:16 2017 -0700 Committer: Luke Cwik <lc...@google.com> Committed: Thu Sep 28 11:40:43 2017 -0700 ---------------------------------------------------------------------- .../construction/DisplayDataTranslation.java | 2 +- .../construction/PCollectionTranslation.java | 8 +- .../core/construction/ParDoTranslation.java | 12 +- .../core/construction/ReadTranslation.java | 8 +- .../core/construction/TriggerTranslation.java | 8 +- .../WindowingStrategyTranslation.java | 34 ++-- .../core/construction/ReadTranslationTest.java | 4 +- .../fn-api/src/main/proto/beam_fn_api.proto | 42 ++-- .../src/main/proto/beam_job_api.proto | 35 ++-- .../src/main/proto/beam_runner_api.proto | 200 ++++++++++--------- .../fn/harness/logging/BeamFnLoggingClient.java | 16 +- .../logging/BeamFnLoggingClientTest.java | 4 +- sdks/python/apache_beam/io/iobase.py | 4 +- sdks/python/apache_beam/pvalue.py | 2 +- .../python_rpc_direct_runner.py | 2 +- .../experimental/python_rpc_direct/server.py | 2 +- .../portability/universal_local_runner.py | 6 +- .../apache_beam/runners/worker/log_handler.py | 10 +- .../runners/worker/log_handler_test.py | 3 +- sdks/python/apache_beam/transforms/core.py | 6 +- sdks/python/apache_beam/transforms/trigger.py | 4 +- sdks/python/apache_beam/transforms/window.py | 6 +- 22 files changed, 219 insertions(+), 199 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/6fddd4ed/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/DisplayDataTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/DisplayDataTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/DisplayDataTranslation.java index ff7f9f2..5186caf 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/DisplayDataTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/DisplayDataTranslation.java @@ -32,7 +32,7 @@ public class DisplayDataTranslation { RunnerApi.DisplayData.Item.newBuilder() .setId(RunnerApi.DisplayData.Identifier.newBuilder().setKey("stubImplementation")) .setLabel("Stub implementation") - .setType(RunnerApi.DisplayData.Type.BOOLEAN) + .setType(RunnerApi.DisplayData.Type.Enum.BOOLEAN) .setValue(Any.pack(BoolValue.newBuilder().setValue(true).build()))) .build(); } http://git-wip-us.apache.org/repos/asf/beam/blob/6fddd4ed/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java index c256e4c..84b3386 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java @@ -62,19 +62,19 @@ public class PCollectionTranslation { return fromProto(pCollection.getIsBounded()); } - static RunnerApi.IsBounded toProto(IsBounded bounded) { + static RunnerApi.IsBounded.Enum toProto(IsBounded bounded) { switch (bounded) { case BOUNDED: - return RunnerApi.IsBounded.BOUNDED; + return RunnerApi.IsBounded.Enum.BOUNDED; case UNBOUNDED: - return RunnerApi.IsBounded.UNBOUNDED; + return RunnerApi.IsBounded.Enum.UNBOUNDED; default: throw new IllegalArgumentException( String.format("Unknown %s %s", IsBounded.class.getSimpleName(), bounded)); } } - static IsBounded fromProto(RunnerApi.IsBounded isBounded) { + static IsBounded fromProto(RunnerApi.IsBounded.Enum isBounded) { switch (isBounded) { case BOUNDED: return IsBounded.BOUNDED; http://git-wip-us.apache.org/repos/asf/beam/blob/6fddd4ed/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java index 037ffe3..714c59d 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java @@ -430,14 +430,14 @@ public class ParDoTranslation { return RunnerApi.TimerSpec.newBuilder().setTimeDomain(toProto(timer.getTimeDomain())).build(); } - private static RunnerApi.TimeDomain toProto(TimeDomain timeDomain) { + private static RunnerApi.TimeDomain.Enum toProto(TimeDomain timeDomain) { switch(timeDomain) { case EVENT_TIME: - return RunnerApi.TimeDomain.EVENT_TIME; + return RunnerApi.TimeDomain.Enum.EVENT_TIME; case PROCESSING_TIME: - return RunnerApi.TimeDomain.PROCESSING_TIME; + return RunnerApi.TimeDomain.Enum.PROCESSING_TIME; case SYNCHRONIZED_PROCESSING_TIME: - return RunnerApi.TimeDomain.SYNCHRONIZED_PROCESSING_TIME; + return RunnerApi.TimeDomain.Enum.SYNCHRONIZED_PROCESSING_TIME; default: throw new IllegalArgumentException("Unknown time domain"); } @@ -486,13 +486,13 @@ public class ParDoTranslation { new Cases.WithDefault<Optional<RunnerApi.Parameter>>() { @Override public Optional<RunnerApi.Parameter> dispatch(WindowParameter p) { - return Optional.of(RunnerApi.Parameter.newBuilder().setType(Type.WINDOW).build()); + return Optional.of(RunnerApi.Parameter.newBuilder().setType(Type.Enum.WINDOW).build()); } @Override public Optional<RunnerApi.Parameter> dispatch(RestrictionTrackerParameter p) { return Optional.of( - RunnerApi.Parameter.newBuilder().setType(Type.RESTRICTION_TRACKER).build()); + RunnerApi.Parameter.newBuilder().setType(Type.Enum.RESTRICTION_TRACKER).build()); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/6fddd4ed/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java index 06d1074..4cc31e8 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java @@ -53,14 +53,14 @@ public class ReadTranslation { public static ReadPayload toProto(Read.Bounded<?> read) { return ReadPayload.newBuilder() - .setIsBounded(IsBounded.BOUNDED) + .setIsBounded(IsBounded.Enum.BOUNDED) .setSource(toProto(read.getSource())) .build(); } public static ReadPayload toProto(Read.Unbounded<?> read) { return ReadPayload.newBuilder() - .setIsBounded(IsBounded.UNBOUNDED) + .setIsBounded(IsBounded.Enum.UNBOUNDED) .setSource(toProto(read.getSource())) .build(); } @@ -88,7 +88,7 @@ public class ReadTranslation { public static BoundedSource<?> boundedSourceFromProto(ReadPayload payload) throws InvalidProtocolBufferException { - checkArgument(payload.getIsBounded().equals(IsBounded.BOUNDED)); + checkArgument(payload.getIsBounded().equals(IsBounded.Enum.BOUNDED)); return (BoundedSource<?>) SerializableUtils.deserializeFromByteArray( payload .getSource() @@ -135,7 +135,7 @@ public class ReadTranslation { public static UnboundedSource<?, ?> unboundedSourceFromProto(ReadPayload payload) throws InvalidProtocolBufferException { - checkArgument(payload.getIsBounded().equals(IsBounded.UNBOUNDED)); + checkArgument(payload.getIsBounded().equals(IsBounded.Enum.UNBOUNDED)); return (UnboundedSource<?, ?>) SerializableUtils.deserializeFromByteArray( payload .getSource() http://git-wip-us.apache.org/repos/asf/beam/blob/6fddd4ed/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TriggerTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TriggerTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TriggerTranslation.java index 777b165..b23f686 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TriggerTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TriggerTranslation.java @@ -123,14 +123,14 @@ public class TriggerTranslation implements Serializable { .build(); } - private RunnerApi.TimeDomain convertTimeDomain(TimeDomain timeDomain) { + private RunnerApi.TimeDomain.Enum convertTimeDomain(TimeDomain timeDomain) { switch (timeDomain) { case EVENT_TIME: - return RunnerApi.TimeDomain.EVENT_TIME; + return RunnerApi.TimeDomain.Enum.EVENT_TIME; case PROCESSING_TIME: - return RunnerApi.TimeDomain.PROCESSING_TIME; + return RunnerApi.TimeDomain.Enum.PROCESSING_TIME; case SYNCHRONIZED_PROCESSING_TIME: - return RunnerApi.TimeDomain.SYNCHRONIZED_PROCESSING_TIME; + return RunnerApi.TimeDomain.Enum.SYNCHRONIZED_PROCESSING_TIME; default: throw new IllegalArgumentException(String.format("Unknown time domain: %s", timeDomain)); } http://git-wip-us.apache.org/repos/asf/beam/blob/6fddd4ed/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java index 7e02da8..1b4786c 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java @@ -51,7 +51,7 @@ import org.joda.time.Duration; /** Utilities for working with {@link WindowingStrategy WindowingStrategies}. */ public class WindowingStrategyTranslation implements Serializable { - public static AccumulationMode fromProto(RunnerApi.AccumulationMode proto) { + public static AccumulationMode fromProto(RunnerApi.AccumulationMode.Enum proto) { switch (proto) { case DISCARDING: return AccumulationMode.DISCARDING_FIRED_PANES; @@ -71,12 +71,12 @@ public class WindowingStrategyTranslation implements Serializable { } } - public static RunnerApi.AccumulationMode toProto(AccumulationMode accumulationMode) { + public static RunnerApi.AccumulationMode.Enum toProto(AccumulationMode accumulationMode) { switch (accumulationMode) { case DISCARDING_FIRED_PANES: - return RunnerApi.AccumulationMode.DISCARDING; + return RunnerApi.AccumulationMode.Enum.DISCARDING; case ACCUMULATING_FIRED_PANES: - return RunnerApi.AccumulationMode.ACCUMULATING; + return RunnerApi.AccumulationMode.Enum.ACCUMULATING; default: throw new IllegalArgumentException( String.format( @@ -87,12 +87,12 @@ public class WindowingStrategyTranslation implements Serializable { } } - public static RunnerApi.ClosingBehavior toProto(ClosingBehavior closingBehavior) { + public static RunnerApi.ClosingBehavior.Enum toProto(ClosingBehavior closingBehavior) { switch (closingBehavior) { case FIRE_ALWAYS: - return RunnerApi.ClosingBehavior.EMIT_ALWAYS; + return RunnerApi.ClosingBehavior.Enum.EMIT_ALWAYS; case FIRE_IF_NON_EMPTY: - return RunnerApi.ClosingBehavior.EMIT_IF_NONEMPTY; + return RunnerApi.ClosingBehavior.Enum.EMIT_IF_NONEMPTY; default: throw new IllegalArgumentException( String.format( @@ -103,7 +103,7 @@ public class WindowingStrategyTranslation implements Serializable { } } - public static ClosingBehavior fromProto(RunnerApi.ClosingBehavior proto) { + public static ClosingBehavior fromProto(RunnerApi.ClosingBehavior.Enum proto) { switch (proto) { case EMIT_ALWAYS: return ClosingBehavior.FIRE_ALWAYS; @@ -123,12 +123,12 @@ public class WindowingStrategyTranslation implements Serializable { } } - public static RunnerApi.OnTimeBehavior toProto(OnTimeBehavior onTimeBehavior) { + public static RunnerApi.OnTimeBehavior.Enum toProto(OnTimeBehavior onTimeBehavior) { switch (onTimeBehavior) { case FIRE_ALWAYS: - return RunnerApi.OnTimeBehavior.FIRE_ALWAYS; + return RunnerApi.OnTimeBehavior.Enum.FIRE_ALWAYS; case FIRE_IF_NON_EMPTY: - return RunnerApi.OnTimeBehavior.FIRE_IF_NONEMPTY; + return RunnerApi.OnTimeBehavior.Enum.FIRE_IF_NONEMPTY; default: throw new IllegalArgumentException( String.format( @@ -139,7 +139,7 @@ public class WindowingStrategyTranslation implements Serializable { } } - public static OnTimeBehavior fromProto(RunnerApi.OnTimeBehavior proto) { + public static OnTimeBehavior fromProto(RunnerApi.OnTimeBehavior.Enum proto) { switch (proto) { case FIRE_ALWAYS: return OnTimeBehavior.FIRE_ALWAYS; @@ -159,14 +159,14 @@ public class WindowingStrategyTranslation implements Serializable { } } - public static RunnerApi.OutputTime toProto(TimestampCombiner timestampCombiner) { + public static RunnerApi.OutputTime.Enum toProto(TimestampCombiner timestampCombiner) { switch(timestampCombiner) { case EARLIEST: - return OutputTime.EARLIEST_IN_PANE; + return OutputTime.Enum.EARLIEST_IN_PANE; case END_OF_WINDOW: - return OutputTime.END_OF_WINDOW; + return OutputTime.Enum.END_OF_WINDOW; case LATEST: - return OutputTime.LATEST_IN_PANE; + return OutputTime.Enum.LATEST_IN_PANE; default: throw new IllegalArgumentException( String.format( @@ -176,7 +176,7 @@ public class WindowingStrategyTranslation implements Serializable { } } - public static TimestampCombiner timestampCombinerFromProto(RunnerApi.OutputTime proto) { + public static TimestampCombiner timestampCombinerFromProto(RunnerApi.OutputTime.Enum proto) { switch (proto) { case EARLIEST_IN_PANE: return TimestampCombiner.EARLIEST; http://git-wip-us.apache.org/repos/asf/beam/blob/6fddd4ed/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslationTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslationTest.java index 3eee78c..22c79b3 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslationTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslationTest.java @@ -75,7 +75,7 @@ public class ReadTranslationTest { BoundedSource<?> boundedSource = (BoundedSource<?>) this.source; Read.Bounded<?> boundedRead = Read.from(boundedSource); ReadPayload payload = ReadTranslation.toProto(boundedRead); - assertThat(payload.getIsBounded(), equalTo(RunnerApi.IsBounded.BOUNDED)); + assertThat(payload.getIsBounded(), equalTo(RunnerApi.IsBounded.Enum.BOUNDED)); BoundedSource<?> deserializedSource = ReadTranslation.boundedSourceFromProto(payload); assertThat(deserializedSource, Matchers.<Source<?>>equalTo(source)); } @@ -86,7 +86,7 @@ public class ReadTranslationTest { UnboundedSource<?, ?> unboundedSource = (UnboundedSource<?, ?>) this.source; Read.Unbounded<?> unboundedRead = Read.from(unboundedSource); ReadPayload payload = ReadTranslation.toProto(unboundedRead); - assertThat(payload.getIsBounded(), equalTo(RunnerApi.IsBounded.UNBOUNDED)); + assertThat(payload.getIsBounded(), equalTo(RunnerApi.IsBounded.Enum.UNBOUNDED)); UnboundedSource<?, ?> deserializedSource = ReadTranslation.unboundedSourceFromProto(payload); assertThat(deserializedSource, Matchers.<Source<?>>equalTo(source)); } http://git-wip-us.apache.org/repos/asf/beam/blob/6fddd4ed/sdks/common/fn-api/src/main/proto/beam_fn_api.proto ---------------------------------------------------------------------- diff --git a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto b/sdks/common/fn-api/src/main/proto/beam_fn_api.proto index f2bbd3c..9d4c5f6 100644 --- a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto +++ b/sdks/common/fn-api/src/main/proto/beam_fn_api.proto @@ -630,29 +630,31 @@ message LogEntry { // common set of "good enough" severity levels so that logging front ends // can provide filtering and searching across log types. Users of the API are // free not to use all severity levels in their log messages. - enum Severity { - SEVERITY_UNSPECIFIED = 0; - // Trace level information, also the default log level unless - // another severity is specified. - TRACE = 1; - // Debugging information. - DEBUG = 2; - // Normal events. - INFO = 3; - // Normal but significant events, such as start up, shut down, or - // configuration. - NOTICE = 4; - // Warning events might cause problems. - WARN = 5; - // Error events are likely to cause problems. - ERROR = 6; - // Critical events cause severe problems or brief outages and may - // indicate that a person must take action. - CRITICAL = 7; + message Severity { + enum Enum { + UNSPECIFIED = 0; + // Trace level information, also the default log level unless + // another severity is specified. + TRACE = 1; + // Debugging information. + DEBUG = 2; + // Normal events. + INFO = 3; + // Normal but significant events, such as start up, shut down, or + // configuration. + NOTICE = 4; + // Warning events might cause problems. + WARN = 5; + // Error events are likely to cause problems. + ERROR = 6; + // Critical events cause severe problems or brief outages and may + // indicate that a person must take action. + CRITICAL = 7; + } } // (Required) The severity of the log statement. - Severity severity = 1; + Severity.Enum severity = 1; // (Required) The time at which this log statement occurred. google.protobuf.Timestamp timestamp = 2; http://git-wip-us.apache.org/repos/asf/beam/blob/6fddd4ed/sdks/common/runner-api/src/main/proto/beam_job_api.proto ---------------------------------------------------------------------- diff --git a/sdks/common/runner-api/src/main/proto/beam_job_api.proto b/sdks/common/runner-api/src/main/proto/beam_job_api.proto index 9d826ff..d76e907 100644 --- a/sdks/common/runner-api/src/main/proto/beam_job_api.proto +++ b/sdks/common/runner-api/src/main/proto/beam_job_api.proto @@ -91,7 +91,7 @@ message RunJobResponse { } -// Cancel is a synchronus request that returns a jobState back +// Cancel is a synchronus request that returns a job state back // Throws error GRPC_STATUS_UNAVAILABLE if server is down // Throws error NOT_FOUND if the jobId is not found message CancelJobRequest { @@ -101,11 +101,11 @@ message CancelJobRequest { // Valid responses include any terminal state or CANCELLING message CancelJobResponse { - JobState.JobStateType state = 1; // (required) + JobState.Enum state = 1; // (required) } -// GetState is a synchronus request that returns a jobState back +// GetState is a synchronus request that returns a job state back // Throws error GRPC_STATUS_UNAVAILABLE if server is down // Throws error NOT_FOUND if the jobId is not found message GetJobStateRequest { @@ -114,7 +114,7 @@ message GetJobStateRequest { } message GetJobStateResponse { - JobState.JobStateType state = 1; // (required) + JobState.Enum state = 1; // (required) } @@ -150,20 +150,19 @@ message JobMessagesResponse { } } +// Enumeration of all JobStates message JobState { - // Enumeration of all JobStates - enum JobStateType { - JOB_STATE_TYPE_UNSPECIFIED = 0; - UNKNOWN = 1; - STOPPED = 2; - RUNNING = 3; - DONE = 4; - FAILED = 5; - CANCELLED = 6; - UPDATED = 7; - DRAINING = 8; - DRAINED = 9; - STARTING = 10; - CANCELLING = 11; + enum Enum { + UNSPECIFIED = 0; + STOPPED = 1; + RUNNING = 2; + DONE = 3; + FAILED = 4; + CANCELLED = 5; + UPDATED = 6; + DRAINING = 7; + DRAINED = 8; + STARTING = 9; + CANCELLING = 10; } } http://git-wip-us.apache.org/repos/asf/beam/blob/6fddd4ed/sdks/common/runner-api/src/main/proto/beam_runner_api.proto ---------------------------------------------------------------------- diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto index 3b68993..9ba5577 100644 --- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto +++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto @@ -191,7 +191,7 @@ message PCollection { string coder_id = 2; // (Required) Whether this PCollection is bounded or unbounded - IsBounded is_bounded = 3; + IsBounded.Enum is_bounded = 3; // (Required) The id of the windowing strategy for this PCollection. string windowing_strategy_id = 4; @@ -242,13 +242,15 @@ message ParDoPayload { // TODO: the evolution of the Fn API will influence what needs explicit // representation here message Parameter { - Type type = 1; - - enum Type { - TYPE_UNSPECIFIED = 0; - WINDOW = 1; - PIPELINE_OPTIONS = 2; - RESTRICTION_TRACKER = 3; + Type.Enum type = 1; + + message Type { + enum Enum { + UNSPECIFIED = 0; + WINDOW = 1; + PIPELINE_OPTIONS = 2; + RESTRICTION_TRACKER = 3; + } } } @@ -285,13 +287,15 @@ message SetStateSpec { } message TimerSpec { - TimeDomain time_domain = 1; + TimeDomain.Enum time_domain = 1; } -enum IsBounded { - IS_BOUNDED_UNSPECIFIED = 0; - UNBOUNDED = 1; - BOUNDED = 2; +message IsBounded { + enum Enum { + UNSPECIFIED = 0; + UNBOUNDED = 1; + BOUNDED = 2; + } } // The payload for the primitive Read transform. @@ -301,7 +305,7 @@ message ReadPayload { SdkFunctionSpec source = 1; // (Required) Whether the source is bounded or unbounded - IsBounded is_bounded = 2; + IsBounded.Enum is_bounded = 2; // TODO: full audit of fields required by runners as opposed to SDK harness } @@ -412,7 +416,7 @@ message WindowingStrategy { // (Required) Whether or not the window fn is merging. // // This knowledge is required for many optimizations. - MergeStatus merge_status = 2; + MergeStatus.Enum merge_status = 2; // (Required) The coder for the windows of this PCollection. string window_coder_id = 3; @@ -424,7 +428,7 @@ message WindowingStrategy { // replacement for prior panes or whether they are deltas to be combined // with other panes (the combine should correspond to whatever the upstream // grouping transform is). - AccumulationMode accumulation_mode = 5; + AccumulationMode.Enum accumulation_mode = 5; // (Required) The OutputTime specifies, for a grouping transform, how to // compute the aggregate timestamp. The window_fn will first possibly shift @@ -434,17 +438,17 @@ message WindowingStrategy { // This is actually only for input to grouping transforms, but since they // may be introduced in runner-specific ways, it is carried along with the // windowing strategy. - OutputTime output_time = 6; + OutputTime.Enum output_time = 6; // (Required) Indicate when output should be omitted upon window expiration. - ClosingBehavior closing_behavior = 7; + ClosingBehavior.Enum closing_behavior = 7; // (Required) The duration, in milliseconds, beyond the end of a window at // which the window becomes droppable. int64 allowed_lateness = 8; // (Required) Indicate whether empty on-time panes should be omitted. - OnTimeBehavior OnTimeBehavior = 9; + OnTimeBehavior.Enum OnTimeBehavior = 9; // (Required) Whether or not the window fn assigns inputs to exactly one window // @@ -455,97 +459,109 @@ message WindowingStrategy { // Whether or not a PCollection's WindowFn is non-merging, merging, or // merging-but-already-merged, in which case a subsequent GroupByKey is almost // always going to do something the user does not want -enum MergeStatus { - MERGE_STATUS_UNSPECIFIED = 0; - - // The WindowFn does not require merging. - // Examples: global window, FixedWindows, SlidingWindows - NON_MERGING = 1; - - // The WindowFn is merging and the PCollection has not had merging - // performed. - // Example: Sessions prior to a GroupByKey - NEEDS_MERGE = 2; - - // The WindowFn is merging and the PCollection has had merging occur - // already. - // Example: Sessions after a GroupByKey - ALREADY_MERGED = 3; +message MergeStatus { + enum Enum { + UNSPECIFIED = 0; + + // The WindowFn does not require merging. + // Examples: global window, FixedWindows, SlidingWindows + NON_MERGING = 1; + + // The WindowFn is merging and the PCollection has not had merging + // performed. + // Example: Sessions prior to a GroupByKey + NEEDS_MERGE = 2; + + // The WindowFn is merging and the PCollection has had merging occur + // already. + // Example: Sessions after a GroupByKey + ALREADY_MERGED = 3; + } } // Whether or not subsequent outputs of aggregations should be entire // replacement values or just the aggregation of inputs received since // the prior output. -enum AccumulationMode { - ACCUMULATION_MODE_UNSPECIFIED = 0; +message AccumulationMode { + enum Enum { + UNSPECIFIED = 0; - // The aggregation is discarded when it is output - DISCARDING = 1; + // The aggregation is discarded when it is output + DISCARDING = 1; - // The aggregation is accumulated across outputs - ACCUMULATING = 2; + // The aggregation is accumulated across outputs + ACCUMULATING = 2; + } } // Controls whether or not an aggregating transform should output data // when a window expires. -enum ClosingBehavior { - CLOSING_BEHVAIOR_UNSPECIFIED = 0; +message ClosingBehavior { + enum Enum { + UNSPECIFIED = 0; - // Emit output when a window expires, whether or not there has been - // any new data since the last output. - EMIT_ALWAYS = 1; + // Emit output when a window expires, whether or not there has been + // any new data since the last output. + EMIT_ALWAYS = 1; - // Only emit output when new data has arrives since the last output - EMIT_IF_NONEMPTY = 2; + // Only emit output when new data has arrives since the last output + EMIT_IF_NONEMPTY = 2; + } } // Controls whether or not an aggregating transform should output data // when an on-time pane is empty. -enum OnTimeBehavior { - ON_TIME_BEHAVIOR_UNSPECIFIED = 0; +message OnTimeBehavior { + enum Enum { + UNSPECIFIED = 0; - // Always fire the on-time pane. Even if there is no new data since - // the previous firing, an element will be produced. - FIRE_ALWAYS = 1; + // Always fire the on-time pane. Even if there is no new data since + // the previous firing, an element will be produced. + FIRE_ALWAYS = 1; - // Only fire the on-time pane if there is new data since the previous firing. - FIRE_IF_NONEMPTY = 2; + // Only fire the on-time pane if there is new data since the previous firing. + FIRE_IF_NONEMPTY = 2; + } } // When a number of windowed, timestamped inputs are aggregated, the timestamp // for the resulting output. -enum OutputTime { - OUTPUT_TIME_UNSPECIFIED = 0; +message OutputTime { + enum Enum { + UNSPECIFIED = 0; - // The output has the timestamp of the end of the window. - END_OF_WINDOW = 1; + // The output has the timestamp of the end of the window. + END_OF_WINDOW = 1; - // The output has the latest timestamp of the input elements since - // the last output. - LATEST_IN_PANE = 2; + // The output has the latest timestamp of the input elements since + // the last output. + LATEST_IN_PANE = 2; - // The output has the earliest timestamp of the input elements since - // the last output. - EARLIEST_IN_PANE = 3; + // The output has the earliest timestamp of the input elements since + // the last output. + EARLIEST_IN_PANE = 3; + } } // The different time domains in the Beam model. -enum TimeDomain { - TIME_DOMAIN_UNSPECIFIED = 0; - - // Event time is time from the perspective of the data - EVENT_TIME = 1; - - // Processing time is time from the perspective of the - // execution of your pipeline - PROCESSING_TIME = 2; - - // Synchronized processing time is the minimum of the - // processing time of all pending elements. - // - // The "processing time" of an element refers to - // the local processing time at which it was emitted - SYNCHRONIZED_PROCESSING_TIME = 3; +message TimeDomain { + enum Enum { + UNSPECIFIED = 0; + + // Event time is time from the perspective of the data + EVENT_TIME = 1; + + // Processing time is time from the perspective of the + // execution of your pipeline + PROCESSING_TIME = 2; + + // Synchronized processing time is the minimum of the + // processing time of all pending elements. + // + // The "processing time" of an element refers to + // the local processing time at which it was emitted + SYNCHRONIZED_PROCESSING_TIME = 3; + } } // A small DSL for expressing when to emit new aggregations @@ -799,7 +815,7 @@ message DisplayData { Identifier id = 1; // (Required) - Type type = 2; + Type.Enum type = 2; // (Required) google.protobuf.Any value = 3; @@ -814,14 +830,16 @@ message DisplayData { string link_url = 6; } - enum Type { - TYPE_UNSPECIFIED = 0; - STRING = 1; - INTEGER = 2; - FLOAT = 3; - BOOLEAN = 4; - TIMESTAMP = 5; - DURATION = 6; - JAVA_CLASS = 7; + message Type { + enum Enum { + UNSPECIFIED = 0; + STRING = 1; + INTEGER = 2; + FLOAT = 3; + BOOLEAN = 4; + TIMESTAMP = 5; + DURATION = 6; + JAVA_CLASS = 7; + } } } http://git-wip-us.apache.org/repos/asf/beam/blob/6fddd4ed/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java index d56ee6d..c9f5d80 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java @@ -57,13 +57,13 @@ import org.apache.beam.sdk.options.PipelineOptions; */ public class BeamFnLoggingClient implements AutoCloseable { private static final String ROOT_LOGGER_NAME = ""; - private static final ImmutableMap<Level, BeamFnApi.LogEntry.Severity> LOG_LEVEL_MAP = - ImmutableMap.<Level, BeamFnApi.LogEntry.Severity>builder() - .put(Level.SEVERE, BeamFnApi.LogEntry.Severity.ERROR) - .put(Level.WARNING, BeamFnApi.LogEntry.Severity.WARN) - .put(Level.INFO, BeamFnApi.LogEntry.Severity.INFO) - .put(Level.FINE, BeamFnApi.LogEntry.Severity.DEBUG) - .put(Level.FINEST, BeamFnApi.LogEntry.Severity.TRACE) + private static final ImmutableMap<Level, BeamFnApi.LogEntry.Severity.Enum> LOG_LEVEL_MAP = + ImmutableMap.<Level, BeamFnApi.LogEntry.Severity.Enum>builder() + .put(Level.SEVERE, BeamFnApi.LogEntry.Severity.Enum.ERROR) + .put(Level.WARNING, BeamFnApi.LogEntry.Severity.Enum.WARN) + .put(Level.INFO, BeamFnApi.LogEntry.Severity.Enum.INFO) + .put(Level.FINE, BeamFnApi.LogEntry.Severity.Enum.DEBUG) + .put(Level.FINEST, BeamFnApi.LogEntry.Severity.Enum.TRACE) .build(); private static final ImmutableMap<DataflowWorkerLoggingOptions.Level, Level> LEVEL_CONFIGURATION = @@ -190,7 +190,7 @@ public class BeamFnLoggingClient implements AutoCloseable { @Override public void publish(LogRecord record) { - BeamFnApi.LogEntry.Severity severity = LOG_LEVEL_MAP.get(record.getLevel()); + BeamFnApi.LogEntry.Severity.Enum severity = LOG_LEVEL_MAP.get(record.getLevel()); if (severity == null) { return; } http://git-wip-us.apache.org/repos/asf/beam/blob/6fddd4ed/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java index bb6a501..c2c26e7 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java @@ -74,7 +74,7 @@ public class BeamFnLoggingClientTest { private static final BeamFnApi.LogEntry TEST_ENTRY = BeamFnApi.LogEntry.newBuilder() - .setSeverity(BeamFnApi.LogEntry.Severity.DEBUG) + .setSeverity(BeamFnApi.LogEntry.Severity.Enum.DEBUG) .setMessage("Message") .setThread("12345") .setTimestamp(Timestamp.newBuilder().setSeconds(1234567).setNanos(890000000).build()) @@ -82,7 +82,7 @@ public class BeamFnLoggingClientTest { .build(); private static final BeamFnApi.LogEntry TEST_ENTRY_WITH_EXCEPTION = BeamFnApi.LogEntry.newBuilder() - .setSeverity(BeamFnApi.LogEntry.Severity.WARN) + .setSeverity(BeamFnApi.LogEntry.Severity.Enum.WARN) .setMessage("MessageWithException") .setTrace(getStackTraceAsString(TEST_RECORD_WITH_EXCEPTION.getThrown())) .setThread("12345") http://git-wip-us.apache.org/repos/asf/beam/blob/6fddd4ed/sdks/python/apache_beam/io/iobase.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py index 043666d..1f2a8bf 100644 --- a/sdks/python/apache_beam/io/iobase.py +++ b/sdks/python/apache_beam/io/iobase.py @@ -834,9 +834,9 @@ class Read(ptransform.PTransform): return (urns.READ_TRANSFORM, beam_runner_api_pb2.ReadPayload( source=self.source.to_runner_api(context), - is_bounded=beam_runner_api_pb2.BOUNDED + is_bounded=beam_runner_api_pb2.IsBounded.BOUNDED if self.source.is_bounded() - else beam_runner_api_pb2.UNBOUNDED)) + else beam_runner_api_pb2.IsBounded.UNBOUNDED)) @staticmethod def from_runner_api_parameter(parameter, context): http://git-wip-us.apache.org/repos/asf/beam/blob/6fddd4ed/sdks/python/apache_beam/pvalue.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pvalue.py b/sdks/python/apache_beam/pvalue.py index 53a6121..d2d3653 100644 --- a/sdks/python/apache_beam/pvalue.py +++ b/sdks/python/apache_beam/pvalue.py @@ -133,7 +133,7 @@ class PCollection(PValue): unique_name='%d%s.%s' % ( len(self.producer.full_label), self.producer.full_label, self.tag), coder_id=pickler.dumps(self.element_type), - is_bounded=beam_runner_api_pb2.BOUNDED, + is_bounded=beam_runner_api_pb2.IsBounded.BOUNDED, windowing_strategy_id=context.windowing_strategies.get_id( self.windowing)) http://git-wip-us.apache.org/repos/asf/beam/blob/6fddd4ed/sdks/python/apache_beam/runners/experimental/python_rpc_direct/python_rpc_direct_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/python_rpc_direct_runner.py b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/python_rpc_direct_runner.py index 85e3f75..84bed42 100644 --- a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/python_rpc_direct_runner.py +++ b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/python_rpc_direct_runner.py @@ -96,7 +96,7 @@ class PythonRPCDirectPipelineResult(PipelineResult): if message.HasField('stateResponse'): logging.info( 'Current state of job: %s', - beam_job_api_pb2.JobState.JobStateType.Name( + beam_job_api_pb2.JobState.Enum.Name( message.stateResponse.state)) else: logging.info('Message %s', message.messageResponse) http://git-wip-us.apache.org/repos/asf/beam/blob/6fddd4ed/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py index 1d07e71..4986dc4 100644 --- a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py +++ b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py @@ -72,7 +72,7 @@ class JobService(beam_job_api_pb2_grpc.JobServiceServicer): @staticmethod def _map_state_to_jobState(state): if state == PipelineState.UNKNOWN: - return beam_job_api_pb2.JobState.UNKNOWN + return beam_job_api_pb2.JobState.UNSPECIFIED elif state == PipelineState.STOPPED: return beam_job_api_pb2.JobState.STOPPED elif state == PipelineState.RUNNING: http://git-wip-us.apache.org/repos/asf/beam/blob/6fddd4ed/sdks/python/apache_beam/runners/portability/universal_local_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/portability/universal_local_runner.py b/sdks/python/apache_beam/runners/portability/universal_local_runner.py index 844b3a8..bc62823 100644 --- a/sdks/python/apache_beam/runners/portability/universal_local_runner.py +++ b/sdks/python/apache_beam/runners/portability/universal_local_runner.py @@ -148,7 +148,7 @@ class UniversalLocalRunner(runner.PipelineRunner): class PipelineResult(runner.PipelineResult): def __init__(self, job_service, job_id): - super(PipelineResult, self).__init__(beam_job_api_pb2.JobState.UNKNOWN) + super(PipelineResult, self).__init__(beam_job_api_pb2.JobState.UNSPECIFIED) self._job_service = job_service self._job_id = job_id self._messages = [] @@ -167,11 +167,11 @@ class PipelineResult(runner.PipelineResult): def _runner_api_state_to_pipeline_state(runner_api_state): return getattr( runner.PipelineState, - beam_job_api_pb2.JobState.JobStateType.Name(runner_api_state)) + beam_job_api_pb2.JobState.Enum.Name(runner_api_state)) @staticmethod def _pipeline_state_to_runner_api_state(pipeline_state): - return beam_job_api_pb2.JobState.JobStateType.Value(pipeline_state) + return beam_job_api_pb2.JobState.Enum.Value(pipeline_state) def wait_until_finish(self): def read_messages(): http://git-wip-us.apache.org/repos/asf/beam/blob/6fddd4ed/sdks/python/apache_beam/runners/worker/log_handler.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/worker/log_handler.py b/sdks/python/apache_beam/runners/worker/log_handler.py index 20bd49f..f878943 100644 --- a/sdks/python/apache_beam/runners/worker/log_handler.py +++ b/sdks/python/apache_beam/runners/worker/log_handler.py @@ -38,11 +38,11 @@ class FnApiLogRecordHandler(logging.Handler): # Mapping from logging levels to LogEntry levels. LOG_LEVEL_MAP = { - logging.FATAL: beam_fn_api_pb2.LogEntry.CRITICAL, - logging.ERROR: beam_fn_api_pb2.LogEntry.ERROR, - logging.WARNING: beam_fn_api_pb2.LogEntry.WARN, - logging.INFO: beam_fn_api_pb2.LogEntry.INFO, - logging.DEBUG: beam_fn_api_pb2.LogEntry.DEBUG + logging.FATAL: beam_fn_api_pb2.LogEntry.Severity.CRITICAL, + logging.ERROR: beam_fn_api_pb2.LogEntry.Severity.ERROR, + logging.WARNING: beam_fn_api_pb2.LogEntry.Severity.WARN, + logging.INFO: beam_fn_api_pb2.LogEntry.Severity.INFO, + logging.DEBUG: beam_fn_api_pb2.LogEntry.Severity.DEBUG } def __init__(self, log_service_descriptor): http://git-wip-us.apache.org/repos/asf/beam/blob/6fddd4ed/sdks/python/apache_beam/runners/worker/log_handler_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/worker/log_handler_test.py b/sdks/python/apache_beam/runners/worker/log_handler_test.py index 7edf667..e4323d2 100644 --- a/sdks/python/apache_beam/runners/worker/log_handler_test.py +++ b/sdks/python/apache_beam/runners/worker/log_handler_test.py @@ -73,7 +73,8 @@ class FnApiLogRecordHandlerTest(unittest.TestCase): num_received_log_entries = 0 for outer in self.test_logging_service.log_records_received: for log_entry in outer.log_entries: - self.assertEqual(beam_fn_api_pb2.LogEntry.INFO, log_entry.severity) + self.assertEqual(beam_fn_api_pb2.LogEntry.Severity.INFO, + log_entry.severity) self.assertEqual('%s: %s' % (msg, num_received_log_entries), log_entry.message) self.assertEqual(u'log_handler_test._verify_fn_log_handler', http://git-wip-us.apache.org/repos/asf/beam/blob/6fddd4ed/sdks/python/apache_beam/transforms/core.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 0a82de2..5d92fe9 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -1385,16 +1385,16 @@ class Windowing(object): return beam_runner_api_pb2.WindowingStrategy( window_fn=self.windowfn.to_runner_api(context), # TODO(robertwb): Prohibit implicit multi-level merging. - merge_status=(beam_runner_api_pb2.NEEDS_MERGE + merge_status=(beam_runner_api_pb2.MergeStatus.NEEDS_MERGE if self.windowfn.is_merging() - else beam_runner_api_pb2.NON_MERGING), + else beam_runner_api_pb2.MergeStatus.NON_MERGING), window_coder_id=context.coders.get_id( self.windowfn.get_window_coder()), trigger=self.triggerfn.to_runner_api(context), accumulation_mode=self.accumulation_mode, output_time=self.timestamp_combiner, # TODO(robertwb): Support EMIT_IF_NONEMPTY - closing_behavior=beam_runner_api_pb2.EMIT_ALWAYS, + closing_behavior=beam_runner_api_pb2.ClosingBehavior.EMIT_ALWAYS, allowed_lateness=0) @staticmethod http://git-wip-us.apache.org/repos/asf/beam/blob/6fddd4ed/sdks/python/apache_beam/transforms/trigger.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/trigger.py b/sdks/python/apache_beam/transforms/trigger.py index 3583e62..bd99401 100644 --- a/sdks/python/apache_beam/transforms/trigger.py +++ b/sdks/python/apache_beam/transforms/trigger.py @@ -58,8 +58,8 @@ __all__ = [ class AccumulationMode(object): """Controls what to do with data when a trigger fires multiple times. """ - DISCARDING = beam_runner_api_pb2.DISCARDING - ACCUMULATING = beam_runner_api_pb2.ACCUMULATING + DISCARDING = beam_runner_api_pb2.AccumulationMode.DISCARDING + ACCUMULATING = beam_runner_api_pb2.AccumulationMode.ACCUMULATING # TODO(robertwb): Provide retractions of previous outputs. # RETRACTING = 3 http://git-wip-us.apache.org/repos/asf/beam/blob/6fddd4ed/sdks/python/apache_beam/transforms/window.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py index a025019..8c8bf33 100644 --- a/sdks/python/apache_beam/transforms/window.py +++ b/sdks/python/apache_beam/transforms/window.py @@ -86,9 +86,9 @@ __all__ = [ class TimestampCombiner(object): """Determines how output timestamps of grouping operations are assigned.""" - OUTPUT_AT_EOW = beam_runner_api_pb2.END_OF_WINDOW - OUTPUT_AT_EARLIEST = beam_runner_api_pb2.EARLIEST_IN_PANE - OUTPUT_AT_LATEST = beam_runner_api_pb2.LATEST_IN_PANE + OUTPUT_AT_EOW = beam_runner_api_pb2.OutputTime.END_OF_WINDOW + OUTPUT_AT_EARLIEST = beam_runner_api_pb2.OutputTime.EARLIEST_IN_PANE + OUTPUT_AT_LATEST = beam_runner_api_pb2.OutputTime.LATEST_IN_PANE # TODO(robertwb): Add this to the runner API or remove it. OUTPUT_AT_EARLIEST_TRANSFORMED = 'OUTPUT_AT_EARLIEST_TRANSFORMED'