Repository: beam
Updated Branches:
  refs/heads/master 294f6339e -> 25b9c35a9


[BEAM-2997] Encapsulate enums within a message so that C++/Python have 
meaningful namespaces when importing.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6fddd4ed
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6fddd4ed
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6fddd4ed

Branch: refs/heads/master
Commit: 6fddd4ed951c6e5618add72f49211bedcf046edc
Parents: da531b7
Author: Luke Cwik <lc...@google.com>
Authored: Wed Sep 27 13:33:16 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Sep 28 11:40:43 2017 -0700

----------------------------------------------------------------------
 .../construction/DisplayDataTranslation.java    |   2 +-
 .../construction/PCollectionTranslation.java    |   8 +-
 .../core/construction/ParDoTranslation.java     |  12 +-
 .../core/construction/ReadTranslation.java      |   8 +-
 .../core/construction/TriggerTranslation.java   |   8 +-
 .../WindowingStrategyTranslation.java           |  34 ++--
 .../core/construction/ReadTranslationTest.java  |   4 +-
 .../fn-api/src/main/proto/beam_fn_api.proto     |  42 ++--
 .../src/main/proto/beam_job_api.proto           |  35 ++--
 .../src/main/proto/beam_runner_api.proto        | 200 ++++++++++---------
 .../fn/harness/logging/BeamFnLoggingClient.java |  16 +-
 .../logging/BeamFnLoggingClientTest.java        |   4 +-
 sdks/python/apache_beam/io/iobase.py            |   4 +-
 sdks/python/apache_beam/pvalue.py               |   2 +-
 .../python_rpc_direct_runner.py                 |   2 +-
 .../experimental/python_rpc_direct/server.py    |   2 +-
 .../portability/universal_local_runner.py       |   6 +-
 .../apache_beam/runners/worker/log_handler.py   |  10 +-
 .../runners/worker/log_handler_test.py          |   3 +-
 sdks/python/apache_beam/transforms/core.py      |   6 +-
 sdks/python/apache_beam/transforms/trigger.py   |   4 +-
 sdks/python/apache_beam/transforms/window.py    |   6 +-
 22 files changed, 219 insertions(+), 199 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6fddd4ed/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/DisplayDataTranslation.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/DisplayDataTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/DisplayDataTranslation.java
index ff7f9f2..5186caf 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/DisplayDataTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/DisplayDataTranslation.java
@@ -32,7 +32,7 @@ public class DisplayDataTranslation {
             RunnerApi.DisplayData.Item.newBuilder()
                 
.setId(RunnerApi.DisplayData.Identifier.newBuilder().setKey("stubImplementation"))
                 .setLabel("Stub implementation")
-                .setType(RunnerApi.DisplayData.Type.BOOLEAN)
+                .setType(RunnerApi.DisplayData.Type.Enum.BOOLEAN)
                 
.setValue(Any.pack(BoolValue.newBuilder().setValue(true).build())))
         .build();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/6fddd4ed/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
index c256e4c..84b3386 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
@@ -62,19 +62,19 @@ public class PCollectionTranslation {
     return fromProto(pCollection.getIsBounded());
   }
 
-  static RunnerApi.IsBounded toProto(IsBounded bounded) {
+  static RunnerApi.IsBounded.Enum toProto(IsBounded bounded) {
     switch (bounded) {
       case BOUNDED:
-        return RunnerApi.IsBounded.BOUNDED;
+        return RunnerApi.IsBounded.Enum.BOUNDED;
       case UNBOUNDED:
-        return RunnerApi.IsBounded.UNBOUNDED;
+        return RunnerApi.IsBounded.Enum.UNBOUNDED;
       default:
         throw new IllegalArgumentException(
             String.format("Unknown %s %s", IsBounded.class.getSimpleName(), 
bounded));
     }
   }
 
-  static IsBounded fromProto(RunnerApi.IsBounded isBounded) {
+  static IsBounded fromProto(RunnerApi.IsBounded.Enum isBounded) {
     switch (isBounded) {
       case BOUNDED:
         return IsBounded.BOUNDED;

http://git-wip-us.apache.org/repos/asf/beam/blob/6fddd4ed/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
index 037ffe3..714c59d 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
@@ -430,14 +430,14 @@ public class ParDoTranslation {
     return 
RunnerApi.TimerSpec.newBuilder().setTimeDomain(toProto(timer.getTimeDomain())).build();
   }
 
-  private static RunnerApi.TimeDomain toProto(TimeDomain timeDomain) {
+  private static RunnerApi.TimeDomain.Enum toProto(TimeDomain timeDomain) {
     switch(timeDomain) {
       case EVENT_TIME:
-        return RunnerApi.TimeDomain.EVENT_TIME;
+        return RunnerApi.TimeDomain.Enum.EVENT_TIME;
       case PROCESSING_TIME:
-        return RunnerApi.TimeDomain.PROCESSING_TIME;
+        return RunnerApi.TimeDomain.Enum.PROCESSING_TIME;
       case SYNCHRONIZED_PROCESSING_TIME:
-        return RunnerApi.TimeDomain.SYNCHRONIZED_PROCESSING_TIME;
+        return RunnerApi.TimeDomain.Enum.SYNCHRONIZED_PROCESSING_TIME;
       default:
         throw new IllegalArgumentException("Unknown time domain");
     }
@@ -486,13 +486,13 @@ public class ParDoTranslation {
         new Cases.WithDefault<Optional<RunnerApi.Parameter>>() {
           @Override
           public Optional<RunnerApi.Parameter> dispatch(WindowParameter p) {
-            return 
Optional.of(RunnerApi.Parameter.newBuilder().setType(Type.WINDOW).build());
+            return 
Optional.of(RunnerApi.Parameter.newBuilder().setType(Type.Enum.WINDOW).build());
           }
 
           @Override
           public Optional<RunnerApi.Parameter> 
dispatch(RestrictionTrackerParameter p) {
             return Optional.of(
-                
RunnerApi.Parameter.newBuilder().setType(Type.RESTRICTION_TRACKER).build());
+                
RunnerApi.Parameter.newBuilder().setType(Type.Enum.RESTRICTION_TRACKER).build());
           }
 
           @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/6fddd4ed/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
index 06d1074..4cc31e8 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
@@ -53,14 +53,14 @@ public class ReadTranslation {
 
   public static ReadPayload toProto(Read.Bounded<?> read) {
     return ReadPayload.newBuilder()
-        .setIsBounded(IsBounded.BOUNDED)
+        .setIsBounded(IsBounded.Enum.BOUNDED)
         .setSource(toProto(read.getSource()))
         .build();
   }
 
   public static ReadPayload toProto(Read.Unbounded<?> read) {
     return ReadPayload.newBuilder()
-        .setIsBounded(IsBounded.UNBOUNDED)
+        .setIsBounded(IsBounded.Enum.UNBOUNDED)
         .setSource(toProto(read.getSource()))
         .build();
   }
@@ -88,7 +88,7 @@ public class ReadTranslation {
 
   public static BoundedSource<?> boundedSourceFromProto(ReadPayload payload)
       throws InvalidProtocolBufferException {
-    checkArgument(payload.getIsBounded().equals(IsBounded.BOUNDED));
+    checkArgument(payload.getIsBounded().equals(IsBounded.Enum.BOUNDED));
     return (BoundedSource<?>) SerializableUtils.deserializeFromByteArray(
         payload
             .getSource()
@@ -135,7 +135,7 @@ public class ReadTranslation {
 
   public static UnboundedSource<?, ?> unboundedSourceFromProto(ReadPayload 
payload)
       throws InvalidProtocolBufferException {
-    checkArgument(payload.getIsBounded().equals(IsBounded.UNBOUNDED));
+    checkArgument(payload.getIsBounded().equals(IsBounded.Enum.UNBOUNDED));
     return (UnboundedSource<?, ?>) SerializableUtils.deserializeFromByteArray(
         payload
             .getSource()

http://git-wip-us.apache.org/repos/asf/beam/blob/6fddd4ed/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TriggerTranslation.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TriggerTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TriggerTranslation.java
index 777b165..b23f686 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TriggerTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TriggerTranslation.java
@@ -123,14 +123,14 @@ public class TriggerTranslation implements Serializable {
           .build();
     }
 
-    private RunnerApi.TimeDomain convertTimeDomain(TimeDomain timeDomain) {
+    private RunnerApi.TimeDomain.Enum convertTimeDomain(TimeDomain timeDomain) 
{
       switch (timeDomain) {
         case EVENT_TIME:
-          return RunnerApi.TimeDomain.EVENT_TIME;
+          return RunnerApi.TimeDomain.Enum.EVENT_TIME;
         case PROCESSING_TIME:
-          return RunnerApi.TimeDomain.PROCESSING_TIME;
+          return RunnerApi.TimeDomain.Enum.PROCESSING_TIME;
         case SYNCHRONIZED_PROCESSING_TIME:
-          return RunnerApi.TimeDomain.SYNCHRONIZED_PROCESSING_TIME;
+          return RunnerApi.TimeDomain.Enum.SYNCHRONIZED_PROCESSING_TIME;
         default:
           throw new IllegalArgumentException(String.format("Unknown time 
domain: %s", timeDomain));
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/6fddd4ed/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
index 7e02da8..1b4786c 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
@@ -51,7 +51,7 @@ import org.joda.time.Duration;
 /** Utilities for working with {@link WindowingStrategy WindowingStrategies}. 
*/
 public class WindowingStrategyTranslation implements Serializable {
 
-  public static AccumulationMode fromProto(RunnerApi.AccumulationMode proto) {
+  public static AccumulationMode fromProto(RunnerApi.AccumulationMode.Enum 
proto) {
     switch (proto) {
       case DISCARDING:
         return AccumulationMode.DISCARDING_FIRED_PANES;
@@ -71,12 +71,12 @@ public class WindowingStrategyTranslation implements 
Serializable {
     }
   }
 
-  public static RunnerApi.AccumulationMode toProto(AccumulationMode 
accumulationMode) {
+  public static RunnerApi.AccumulationMode.Enum toProto(AccumulationMode 
accumulationMode) {
     switch (accumulationMode) {
       case DISCARDING_FIRED_PANES:
-        return RunnerApi.AccumulationMode.DISCARDING;
+        return RunnerApi.AccumulationMode.Enum.DISCARDING;
       case ACCUMULATING_FIRED_PANES:
-        return RunnerApi.AccumulationMode.ACCUMULATING;
+        return RunnerApi.AccumulationMode.Enum.ACCUMULATING;
       default:
         throw new IllegalArgumentException(
             String.format(
@@ -87,12 +87,12 @@ public class WindowingStrategyTranslation implements 
Serializable {
     }
   }
 
-  public static RunnerApi.ClosingBehavior toProto(ClosingBehavior 
closingBehavior) {
+  public static RunnerApi.ClosingBehavior.Enum toProto(ClosingBehavior 
closingBehavior) {
     switch (closingBehavior) {
       case FIRE_ALWAYS:
-        return RunnerApi.ClosingBehavior.EMIT_ALWAYS;
+        return RunnerApi.ClosingBehavior.Enum.EMIT_ALWAYS;
       case FIRE_IF_NON_EMPTY:
-        return RunnerApi.ClosingBehavior.EMIT_IF_NONEMPTY;
+        return RunnerApi.ClosingBehavior.Enum.EMIT_IF_NONEMPTY;
       default:
         throw new IllegalArgumentException(
             String.format(
@@ -103,7 +103,7 @@ public class WindowingStrategyTranslation implements 
Serializable {
     }
   }
 
-  public static ClosingBehavior fromProto(RunnerApi.ClosingBehavior proto) {
+  public static ClosingBehavior fromProto(RunnerApi.ClosingBehavior.Enum 
proto) {
     switch (proto) {
       case EMIT_ALWAYS:
         return ClosingBehavior.FIRE_ALWAYS;
@@ -123,12 +123,12 @@ public class WindowingStrategyTranslation implements 
Serializable {
     }
   }
 
-  public static RunnerApi.OnTimeBehavior toProto(OnTimeBehavior 
onTimeBehavior) {
+  public static RunnerApi.OnTimeBehavior.Enum toProto(OnTimeBehavior 
onTimeBehavior) {
     switch (onTimeBehavior) {
       case FIRE_ALWAYS:
-        return RunnerApi.OnTimeBehavior.FIRE_ALWAYS;
+        return RunnerApi.OnTimeBehavior.Enum.FIRE_ALWAYS;
       case FIRE_IF_NON_EMPTY:
-        return RunnerApi.OnTimeBehavior.FIRE_IF_NONEMPTY;
+        return RunnerApi.OnTimeBehavior.Enum.FIRE_IF_NONEMPTY;
       default:
         throw new IllegalArgumentException(
             String.format(
@@ -139,7 +139,7 @@ public class WindowingStrategyTranslation implements 
Serializable {
     }
   }
 
-  public static OnTimeBehavior fromProto(RunnerApi.OnTimeBehavior proto) {
+  public static OnTimeBehavior fromProto(RunnerApi.OnTimeBehavior.Enum proto) {
     switch (proto) {
       case FIRE_ALWAYS:
         return OnTimeBehavior.FIRE_ALWAYS;
@@ -159,14 +159,14 @@ public class WindowingStrategyTranslation implements 
Serializable {
     }
   }
 
-  public static RunnerApi.OutputTime toProto(TimestampCombiner 
timestampCombiner) {
+  public static RunnerApi.OutputTime.Enum toProto(TimestampCombiner 
timestampCombiner) {
     switch(timestampCombiner) {
       case EARLIEST:
-        return OutputTime.EARLIEST_IN_PANE;
+        return OutputTime.Enum.EARLIEST_IN_PANE;
       case END_OF_WINDOW:
-        return OutputTime.END_OF_WINDOW;
+        return OutputTime.Enum.END_OF_WINDOW;
       case LATEST:
-        return OutputTime.LATEST_IN_PANE;
+        return OutputTime.Enum.LATEST_IN_PANE;
       default:
         throw new IllegalArgumentException(
             String.format(
@@ -176,7 +176,7 @@ public class WindowingStrategyTranslation implements 
Serializable {
     }
   }
 
-  public static TimestampCombiner 
timestampCombinerFromProto(RunnerApi.OutputTime proto) {
+  public static TimestampCombiner 
timestampCombinerFromProto(RunnerApi.OutputTime.Enum proto) {
     switch (proto) {
       case EARLIEST_IN_PANE:
         return TimestampCombiner.EARLIEST;

http://git-wip-us.apache.org/repos/asf/beam/blob/6fddd4ed/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslationTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslationTest.java
index 3eee78c..22c79b3 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslationTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslationTest.java
@@ -75,7 +75,7 @@ public class ReadTranslationTest {
     BoundedSource<?> boundedSource = (BoundedSource<?>) this.source;
     Read.Bounded<?> boundedRead = Read.from(boundedSource);
     ReadPayload payload = ReadTranslation.toProto(boundedRead);
-    assertThat(payload.getIsBounded(), equalTo(RunnerApi.IsBounded.BOUNDED));
+    assertThat(payload.getIsBounded(), 
equalTo(RunnerApi.IsBounded.Enum.BOUNDED));
     BoundedSource<?> deserializedSource = 
ReadTranslation.boundedSourceFromProto(payload);
     assertThat(deserializedSource, Matchers.<Source<?>>equalTo(source));
   }
@@ -86,7 +86,7 @@ public class ReadTranslationTest {
     UnboundedSource<?, ?> unboundedSource = (UnboundedSource<?, ?>) 
this.source;
     Read.Unbounded<?> unboundedRead = Read.from(unboundedSource);
     ReadPayload payload = ReadTranslation.toProto(unboundedRead);
-    assertThat(payload.getIsBounded(), equalTo(RunnerApi.IsBounded.UNBOUNDED));
+    assertThat(payload.getIsBounded(), 
equalTo(RunnerApi.IsBounded.Enum.UNBOUNDED));
     UnboundedSource<?, ?> deserializedSource = 
ReadTranslation.unboundedSourceFromProto(payload);
     assertThat(deserializedSource, Matchers.<Source<?>>equalTo(source));
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/6fddd4ed/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
----------------------------------------------------------------------
diff --git a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto 
b/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
index f2bbd3c..9d4c5f6 100644
--- a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
+++ b/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
@@ -630,29 +630,31 @@ message LogEntry {
   // common set of "good enough" severity levels so that logging front ends
   // can provide filtering and searching across log types. Users of the API are
   // free not to use all severity levels in their log messages.
-  enum Severity {
-    SEVERITY_UNSPECIFIED = 0;
-    // Trace level information, also the default log level unless
-    // another severity is specified.
-    TRACE = 1;
-    // Debugging information.
-    DEBUG = 2;
-    // Normal events.
-    INFO = 3;
-    // Normal but significant events, such as start up, shut down, or
-    // configuration.
-    NOTICE = 4;
-    // Warning events might cause problems.
-    WARN = 5;
-    // Error events are likely to cause problems.
-    ERROR = 6;
-    // Critical events cause severe problems or brief outages and may
-    // indicate that a person must take action.
-    CRITICAL = 7;
+  message Severity {
+    enum Enum {
+      UNSPECIFIED = 0;
+      // Trace level information, also the default log level unless
+      // another severity is specified.
+      TRACE = 1;
+      // Debugging information.
+      DEBUG = 2;
+      // Normal events.
+      INFO = 3;
+      // Normal but significant events, such as start up, shut down, or
+      // configuration.
+      NOTICE = 4;
+      // Warning events might cause problems.
+      WARN = 5;
+      // Error events are likely to cause problems.
+      ERROR = 6;
+      // Critical events cause severe problems or brief outages and may
+      // indicate that a person must take action.
+      CRITICAL = 7;
+    }
   }
 
   // (Required) The severity of the log statement.
-  Severity severity = 1;
+  Severity.Enum severity = 1;
 
   // (Required) The time at which this log statement occurred.
   google.protobuf.Timestamp timestamp = 2;

http://git-wip-us.apache.org/repos/asf/beam/blob/6fddd4ed/sdks/common/runner-api/src/main/proto/beam_job_api.proto
----------------------------------------------------------------------
diff --git a/sdks/common/runner-api/src/main/proto/beam_job_api.proto 
b/sdks/common/runner-api/src/main/proto/beam_job_api.proto
index 9d826ff..d76e907 100644
--- a/sdks/common/runner-api/src/main/proto/beam_job_api.proto
+++ b/sdks/common/runner-api/src/main/proto/beam_job_api.proto
@@ -91,7 +91,7 @@ message RunJobResponse {
 }
 
 
-// Cancel is a synchronus request that returns a jobState back
+// Cancel is a synchronus request that returns a job state back
 // Throws error GRPC_STATUS_UNAVAILABLE if server is down
 // Throws error NOT_FOUND if the jobId is not found
 message CancelJobRequest {
@@ -101,11 +101,11 @@ message CancelJobRequest {
 
 // Valid responses include any terminal state or CANCELLING
 message CancelJobResponse {
-  JobState.JobStateType state = 1; // (required)
+  JobState.Enum state = 1; // (required)
 }
 
 
-// GetState is a synchronus request that returns a jobState back
+// GetState is a synchronus request that returns a job state back
 // Throws error GRPC_STATUS_UNAVAILABLE if server is down
 // Throws error NOT_FOUND if the jobId is not found
 message GetJobStateRequest {
@@ -114,7 +114,7 @@ message GetJobStateRequest {
 }
 
 message GetJobStateResponse {
-  JobState.JobStateType state = 1; // (required)
+  JobState.Enum state = 1; // (required)
 }
 
 
@@ -150,20 +150,19 @@ message JobMessagesResponse {
   }
 }
 
+// Enumeration of all JobStates
 message JobState {
-  // Enumeration of all JobStates
-  enum JobStateType {
-    JOB_STATE_TYPE_UNSPECIFIED = 0;
-    UNKNOWN = 1;
-    STOPPED = 2;
-    RUNNING = 3;
-    DONE = 4;
-    FAILED = 5;
-    CANCELLED = 6;
-    UPDATED = 7;
-    DRAINING = 8;
-    DRAINED = 9;
-    STARTING = 10;
-    CANCELLING = 11;
+  enum Enum {
+    UNSPECIFIED = 0;
+    STOPPED = 1;
+    RUNNING = 2;
+    DONE = 3;
+    FAILED = 4;
+    CANCELLED = 5;
+    UPDATED = 6;
+    DRAINING = 7;
+    DRAINED = 8;
+    STARTING = 9;
+    CANCELLING = 10;
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/6fddd4ed/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
----------------------------------------------------------------------
diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto 
b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
index 3b68993..9ba5577 100644
--- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
+++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
@@ -191,7 +191,7 @@ message PCollection {
   string coder_id = 2;
 
   // (Required) Whether this PCollection is bounded or unbounded
-  IsBounded is_bounded = 3;
+  IsBounded.Enum is_bounded = 3;
 
   // (Required) The id of the windowing strategy for this PCollection.
   string windowing_strategy_id = 4;
@@ -242,13 +242,15 @@ message ParDoPayload {
 // TODO: the evolution of the Fn API will influence what needs explicit
 // representation here
 message Parameter {
-  Type type = 1;
-
-  enum Type {
-    TYPE_UNSPECIFIED = 0;
-    WINDOW = 1;
-    PIPELINE_OPTIONS = 2;
-    RESTRICTION_TRACKER = 3;
+  Type.Enum type = 1;
+
+  message Type {
+    enum Enum {
+      UNSPECIFIED = 0;
+      WINDOW = 1;
+      PIPELINE_OPTIONS = 2;
+      RESTRICTION_TRACKER = 3;
+    }
   }
 }
 
@@ -285,13 +287,15 @@ message SetStateSpec {
 }
 
 message TimerSpec {
-  TimeDomain time_domain = 1;
+  TimeDomain.Enum time_domain = 1;
 }
 
-enum IsBounded {
-  IS_BOUNDED_UNSPECIFIED = 0;
-  UNBOUNDED = 1;
-  BOUNDED = 2;
+message IsBounded {
+  enum Enum {
+    UNSPECIFIED = 0;
+    UNBOUNDED = 1;
+    BOUNDED = 2;
+  }
 }
 
 // The payload for the primitive Read transform.
@@ -301,7 +305,7 @@ message ReadPayload {
   SdkFunctionSpec source = 1;
 
   // (Required) Whether the source is bounded or unbounded
-  IsBounded is_bounded = 2;
+  IsBounded.Enum is_bounded = 2;
 
   // TODO: full audit of fields required by runners as opposed to SDK harness
 }
@@ -412,7 +416,7 @@ message WindowingStrategy {
   // (Required) Whether or not the window fn is merging.
   //
   // This knowledge is required for many optimizations.
-  MergeStatus merge_status = 2;
+  MergeStatus.Enum merge_status = 2;
 
   // (Required) The coder for the windows of this PCollection.
   string window_coder_id = 3;
@@ -424,7 +428,7 @@ message WindowingStrategy {
   // replacement for prior panes or whether they are deltas to be combined
   // with other panes (the combine should correspond to whatever the upstream
   // grouping transform is).
-  AccumulationMode accumulation_mode = 5;
+  AccumulationMode.Enum accumulation_mode = 5;
 
   // (Required) The OutputTime specifies, for a grouping transform, how to
   // compute the aggregate timestamp. The window_fn will first possibly shift
@@ -434,17 +438,17 @@ message WindowingStrategy {
   // This is actually only for input to grouping transforms, but since they
   // may be introduced in runner-specific ways, it is carried along with the
   // windowing strategy.
-  OutputTime output_time = 6;
+  OutputTime.Enum output_time = 6;
 
   // (Required) Indicate when output should be omitted upon window expiration.
-  ClosingBehavior closing_behavior = 7;
+  ClosingBehavior.Enum closing_behavior = 7;
 
   // (Required) The duration, in milliseconds, beyond the end of a window at
   // which the window becomes droppable.
   int64 allowed_lateness = 8;
 
   // (Required) Indicate whether empty on-time panes should be omitted.
-  OnTimeBehavior OnTimeBehavior = 9;
+  OnTimeBehavior.Enum OnTimeBehavior = 9;
 
   // (Required) Whether or not the window fn assigns inputs to exactly one 
window
   //
@@ -455,97 +459,109 @@ message WindowingStrategy {
 // Whether or not a PCollection's WindowFn is non-merging, merging, or
 // merging-but-already-merged, in which case a subsequent GroupByKey is almost
 // always going to do something the user does not want
-enum MergeStatus {
-  MERGE_STATUS_UNSPECIFIED = 0;
-
-  // The WindowFn does not require merging.
-  // Examples: global window, FixedWindows, SlidingWindows
-  NON_MERGING = 1;
-
-  // The WindowFn is merging and the PCollection has not had merging
-  // performed.
-  // Example: Sessions prior to a GroupByKey
-  NEEDS_MERGE = 2;
-
-  // The WindowFn is merging and the PCollection has had merging occur
-  // already.
-  // Example: Sessions after a GroupByKey
-  ALREADY_MERGED = 3;
+message MergeStatus {
+  enum Enum {
+    UNSPECIFIED = 0;
+
+    // The WindowFn does not require merging.
+    // Examples: global window, FixedWindows, SlidingWindows
+    NON_MERGING = 1;
+
+    // The WindowFn is merging and the PCollection has not had merging
+    // performed.
+    // Example: Sessions prior to a GroupByKey
+    NEEDS_MERGE = 2;
+
+    // The WindowFn is merging and the PCollection has had merging occur
+    // already.
+    // Example: Sessions after a GroupByKey
+    ALREADY_MERGED = 3;
+  }
 }
 
 // Whether or not subsequent outputs of aggregations should be entire
 // replacement values or just the aggregation of inputs received since
 // the prior output.
-enum AccumulationMode {
-  ACCUMULATION_MODE_UNSPECIFIED = 0;
+message AccumulationMode {
+  enum Enum {
+    UNSPECIFIED = 0;
 
-  // The aggregation is discarded when it is output
-  DISCARDING = 1;
+    // The aggregation is discarded when it is output
+    DISCARDING = 1;
 
-  // The aggregation is accumulated across outputs
-  ACCUMULATING = 2;
+    // The aggregation is accumulated across outputs
+    ACCUMULATING = 2;
+  }
 }
 
 // Controls whether or not an aggregating transform should output data
 // when a window expires.
-enum ClosingBehavior {
-  CLOSING_BEHVAIOR_UNSPECIFIED = 0;
+message ClosingBehavior {
+  enum Enum {
+    UNSPECIFIED = 0;
 
-  // Emit output when a window expires, whether or not there has been
-  // any new data since the last output.
-  EMIT_ALWAYS = 1;
+    // Emit output when a window expires, whether or not there has been
+    // any new data since the last output.
+    EMIT_ALWAYS = 1;
 
-  // Only emit output when new data has arrives since the last output
-  EMIT_IF_NONEMPTY = 2;
+    // Only emit output when new data has arrives since the last output
+    EMIT_IF_NONEMPTY = 2;
+  }
 }
 
 // Controls whether or not an aggregating transform should output data
 // when an on-time pane is empty.
-enum OnTimeBehavior {
-  ON_TIME_BEHAVIOR_UNSPECIFIED = 0;
+message OnTimeBehavior {
+  enum Enum {
+    UNSPECIFIED = 0;
 
-  // Always fire the on-time pane. Even if there is no new data since
-  // the previous firing, an element will be produced.
-  FIRE_ALWAYS = 1;
+    // Always fire the on-time pane. Even if there is no new data since
+    // the previous firing, an element will be produced.
+    FIRE_ALWAYS = 1;
 
-  // Only fire the on-time pane if there is new data since the previous firing.
-  FIRE_IF_NONEMPTY = 2;
+    // Only fire the on-time pane if there is new data since the previous 
firing.
+    FIRE_IF_NONEMPTY = 2;
+  }
 }
 
 // When a number of windowed, timestamped inputs are aggregated, the timestamp
 // for the resulting output.
-enum OutputTime {
-  OUTPUT_TIME_UNSPECIFIED = 0;
+message OutputTime {
+  enum Enum {
+    UNSPECIFIED = 0;
 
-  // The output has the timestamp of the end of the window.
-  END_OF_WINDOW = 1;
+    // The output has the timestamp of the end of the window.
+    END_OF_WINDOW = 1;
 
-  // The output has the latest timestamp of the input elements since
-  // the last output.
-  LATEST_IN_PANE = 2;
+    // The output has the latest timestamp of the input elements since
+    // the last output.
+    LATEST_IN_PANE = 2;
 
-  // The output has the earliest timestamp of the input elements since
-  // the last output.
-  EARLIEST_IN_PANE = 3;
+    // The output has the earliest timestamp of the input elements since
+    // the last output.
+    EARLIEST_IN_PANE = 3;
+  }
 }
 
 // The different time domains in the Beam model.
-enum TimeDomain {
-  TIME_DOMAIN_UNSPECIFIED = 0;
-
-  // Event time is time from the perspective of the data
-  EVENT_TIME = 1;
-
-  // Processing time is time from the perspective of the
-  // execution of your pipeline
-  PROCESSING_TIME = 2;
-
-  // Synchronized processing time is the minimum of the
-  // processing time of all pending elements.
-  //
-  // The "processing time" of an element refers to
-  // the local processing time at which it was emitted
-  SYNCHRONIZED_PROCESSING_TIME = 3;
+message TimeDomain {
+  enum Enum {
+    UNSPECIFIED = 0;
+
+    // Event time is time from the perspective of the data
+    EVENT_TIME = 1;
+
+    // Processing time is time from the perspective of the
+    // execution of your pipeline
+    PROCESSING_TIME = 2;
+
+    // Synchronized processing time is the minimum of the
+    // processing time of all pending elements.
+    //
+    // The "processing time" of an element refers to
+    // the local processing time at which it was emitted
+    SYNCHRONIZED_PROCESSING_TIME = 3;
+  }
 }
 
 // A small DSL for expressing when to emit new aggregations
@@ -799,7 +815,7 @@ message DisplayData {
     Identifier id = 1;
 
     // (Required)
-    Type type = 2;
+    Type.Enum type = 2;
 
     // (Required)
     google.protobuf.Any value = 3;
@@ -814,14 +830,16 @@ message DisplayData {
     string link_url = 6;
   }
 
-  enum Type {
-    TYPE_UNSPECIFIED = 0;
-    STRING = 1;
-    INTEGER = 2;
-    FLOAT = 3;
-    BOOLEAN = 4;
-    TIMESTAMP = 5;
-    DURATION = 6;
-    JAVA_CLASS = 7;
+  message Type {
+    enum Enum {
+      UNSPECIFIED = 0;
+      STRING = 1;
+      INTEGER = 2;
+      FLOAT = 3;
+      BOOLEAN = 4;
+      TIMESTAMP = 5;
+      DURATION = 6;
+      JAVA_CLASS = 7;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/6fddd4ed/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
index d56ee6d..c9f5d80 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
@@ -57,13 +57,13 @@ import org.apache.beam.sdk.options.PipelineOptions;
  */
 public class BeamFnLoggingClient implements AutoCloseable {
   private static final String ROOT_LOGGER_NAME = "";
-  private static final ImmutableMap<Level, BeamFnApi.LogEntry.Severity> 
LOG_LEVEL_MAP =
-      ImmutableMap.<Level, BeamFnApi.LogEntry.Severity>builder()
-      .put(Level.SEVERE, BeamFnApi.LogEntry.Severity.ERROR)
-      .put(Level.WARNING, BeamFnApi.LogEntry.Severity.WARN)
-      .put(Level.INFO, BeamFnApi.LogEntry.Severity.INFO)
-      .put(Level.FINE, BeamFnApi.LogEntry.Severity.DEBUG)
-      .put(Level.FINEST, BeamFnApi.LogEntry.Severity.TRACE)
+  private static final ImmutableMap<Level, BeamFnApi.LogEntry.Severity.Enum> 
LOG_LEVEL_MAP =
+      ImmutableMap.<Level, BeamFnApi.LogEntry.Severity.Enum>builder()
+      .put(Level.SEVERE, BeamFnApi.LogEntry.Severity.Enum.ERROR)
+      .put(Level.WARNING, BeamFnApi.LogEntry.Severity.Enum.WARN)
+      .put(Level.INFO, BeamFnApi.LogEntry.Severity.Enum.INFO)
+      .put(Level.FINE, BeamFnApi.LogEntry.Severity.Enum.DEBUG)
+      .put(Level.FINEST, BeamFnApi.LogEntry.Severity.Enum.TRACE)
       .build();
 
   private static final ImmutableMap<DataflowWorkerLoggingOptions.Level, Level> 
LEVEL_CONFIGURATION =
@@ -190,7 +190,7 @@ public class BeamFnLoggingClient implements AutoCloseable {
 
     @Override
     public void publish(LogRecord record) {
-      BeamFnApi.LogEntry.Severity severity = 
LOG_LEVEL_MAP.get(record.getLevel());
+      BeamFnApi.LogEntry.Severity.Enum severity = 
LOG_LEVEL_MAP.get(record.getLevel());
       if (severity == null) {
         return;
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/6fddd4ed/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
index bb6a501..c2c26e7 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
@@ -74,7 +74,7 @@ public class BeamFnLoggingClientTest {
 
   private static final BeamFnApi.LogEntry TEST_ENTRY =
       BeamFnApi.LogEntry.newBuilder()
-          .setSeverity(BeamFnApi.LogEntry.Severity.DEBUG)
+          .setSeverity(BeamFnApi.LogEntry.Severity.Enum.DEBUG)
           .setMessage("Message")
           .setThread("12345")
           
.setTimestamp(Timestamp.newBuilder().setSeconds(1234567).setNanos(890000000).build())
@@ -82,7 +82,7 @@ public class BeamFnLoggingClientTest {
           .build();
   private static final BeamFnApi.LogEntry TEST_ENTRY_WITH_EXCEPTION =
       BeamFnApi.LogEntry.newBuilder()
-          .setSeverity(BeamFnApi.LogEntry.Severity.WARN)
+          .setSeverity(BeamFnApi.LogEntry.Severity.Enum.WARN)
           .setMessage("MessageWithException")
           
.setTrace(getStackTraceAsString(TEST_RECORD_WITH_EXCEPTION.getThrown()))
           .setThread("12345")

http://git-wip-us.apache.org/repos/asf/beam/blob/6fddd4ed/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py 
b/sdks/python/apache_beam/io/iobase.py
index 043666d..1f2a8bf 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -834,9 +834,9 @@ class Read(ptransform.PTransform):
     return (urns.READ_TRANSFORM,
             beam_runner_api_pb2.ReadPayload(
                 source=self.source.to_runner_api(context),
-                is_bounded=beam_runner_api_pb2.BOUNDED
+                is_bounded=beam_runner_api_pb2.IsBounded.BOUNDED
                 if self.source.is_bounded()
-                else beam_runner_api_pb2.UNBOUNDED))
+                else beam_runner_api_pb2.IsBounded.UNBOUNDED))
 
   @staticmethod
   def from_runner_api_parameter(parameter, context):

http://git-wip-us.apache.org/repos/asf/beam/blob/6fddd4ed/sdks/python/apache_beam/pvalue.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pvalue.py 
b/sdks/python/apache_beam/pvalue.py
index 53a6121..d2d3653 100644
--- a/sdks/python/apache_beam/pvalue.py
+++ b/sdks/python/apache_beam/pvalue.py
@@ -133,7 +133,7 @@ class PCollection(PValue):
         unique_name='%d%s.%s' % (
             len(self.producer.full_label), self.producer.full_label, self.tag),
         coder_id=pickler.dumps(self.element_type),
-        is_bounded=beam_runner_api_pb2.BOUNDED,
+        is_bounded=beam_runner_api_pb2.IsBounded.BOUNDED,
         windowing_strategy_id=context.windowing_strategies.get_id(
             self.windowing))
 

http://git-wip-us.apache.org/repos/asf/beam/blob/6fddd4ed/sdks/python/apache_beam/runners/experimental/python_rpc_direct/python_rpc_direct_runner.py
----------------------------------------------------------------------
diff --git 
a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/python_rpc_direct_runner.py
 
b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/python_rpc_direct_runner.py
index 85e3f75..84bed42 100644
--- 
a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/python_rpc_direct_runner.py
+++ 
b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/python_rpc_direct_runner.py
@@ -96,7 +96,7 @@ class PythonRPCDirectPipelineResult(PipelineResult):
       if message.HasField('stateResponse'):
         logging.info(
             'Current state of job: %s',
-            beam_job_api_pb2.JobState.JobStateType.Name(
+            beam_job_api_pb2.JobState.Enum.Name(
                 message.stateResponse.state))
       else:
         logging.info('Message %s', message.messageResponse)

http://git-wip-us.apache.org/repos/asf/beam/blob/6fddd4ed/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py
----------------------------------------------------------------------
diff --git 
a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py 
b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py
index 1d07e71..4986dc4 100644
--- a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py
+++ b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py
@@ -72,7 +72,7 @@ class JobService(beam_job_api_pb2_grpc.JobServiceServicer):
   @staticmethod
   def _map_state_to_jobState(state):
     if state == PipelineState.UNKNOWN:
-      return beam_job_api_pb2.JobState.UNKNOWN
+      return beam_job_api_pb2.JobState.UNSPECIFIED
     elif state == PipelineState.STOPPED:
       return beam_job_api_pb2.JobState.STOPPED
     elif state == PipelineState.RUNNING:

http://git-wip-us.apache.org/repos/asf/beam/blob/6fddd4ed/sdks/python/apache_beam/runners/portability/universal_local_runner.py
----------------------------------------------------------------------
diff --git 
a/sdks/python/apache_beam/runners/portability/universal_local_runner.py 
b/sdks/python/apache_beam/runners/portability/universal_local_runner.py
index 844b3a8..bc62823 100644
--- a/sdks/python/apache_beam/runners/portability/universal_local_runner.py
+++ b/sdks/python/apache_beam/runners/portability/universal_local_runner.py
@@ -148,7 +148,7 @@ class UniversalLocalRunner(runner.PipelineRunner):
 
 class PipelineResult(runner.PipelineResult):
   def __init__(self, job_service, job_id):
-    super(PipelineResult, self).__init__(beam_job_api_pb2.JobState.UNKNOWN)
+    super(PipelineResult, self).__init__(beam_job_api_pb2.JobState.UNSPECIFIED)
     self._job_service = job_service
     self._job_id = job_id
     self._messages = []
@@ -167,11 +167,11 @@ class PipelineResult(runner.PipelineResult):
   def _runner_api_state_to_pipeline_state(runner_api_state):
     return getattr(
         runner.PipelineState,
-        beam_job_api_pb2.JobState.JobStateType.Name(runner_api_state))
+        beam_job_api_pb2.JobState.Enum.Name(runner_api_state))
 
   @staticmethod
   def _pipeline_state_to_runner_api_state(pipeline_state):
-    return beam_job_api_pb2.JobState.JobStateType.Value(pipeline_state)
+    return beam_job_api_pb2.JobState.Enum.Value(pipeline_state)
 
   def wait_until_finish(self):
     def read_messages():

http://git-wip-us.apache.org/repos/asf/beam/blob/6fddd4ed/sdks/python/apache_beam/runners/worker/log_handler.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/log_handler.py 
b/sdks/python/apache_beam/runners/worker/log_handler.py
index 20bd49f..f878943 100644
--- a/sdks/python/apache_beam/runners/worker/log_handler.py
+++ b/sdks/python/apache_beam/runners/worker/log_handler.py
@@ -38,11 +38,11 @@ class FnApiLogRecordHandler(logging.Handler):
 
   # Mapping from logging levels to LogEntry levels.
   LOG_LEVEL_MAP = {
-      logging.FATAL: beam_fn_api_pb2.LogEntry.CRITICAL,
-      logging.ERROR: beam_fn_api_pb2.LogEntry.ERROR,
-      logging.WARNING: beam_fn_api_pb2.LogEntry.WARN,
-      logging.INFO: beam_fn_api_pb2.LogEntry.INFO,
-      logging.DEBUG: beam_fn_api_pb2.LogEntry.DEBUG
+      logging.FATAL: beam_fn_api_pb2.LogEntry.Severity.CRITICAL,
+      logging.ERROR: beam_fn_api_pb2.LogEntry.Severity.ERROR,
+      logging.WARNING: beam_fn_api_pb2.LogEntry.Severity.WARN,
+      logging.INFO: beam_fn_api_pb2.LogEntry.Severity.INFO,
+      logging.DEBUG: beam_fn_api_pb2.LogEntry.Severity.DEBUG
   }
 
   def __init__(self, log_service_descriptor):

http://git-wip-us.apache.org/repos/asf/beam/blob/6fddd4ed/sdks/python/apache_beam/runners/worker/log_handler_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/log_handler_test.py 
b/sdks/python/apache_beam/runners/worker/log_handler_test.py
index 7edf667..e4323d2 100644
--- a/sdks/python/apache_beam/runners/worker/log_handler_test.py
+++ b/sdks/python/apache_beam/runners/worker/log_handler_test.py
@@ -73,7 +73,8 @@ class FnApiLogRecordHandlerTest(unittest.TestCase):
     num_received_log_entries = 0
     for outer in self.test_logging_service.log_records_received:
       for log_entry in outer.log_entries:
-        self.assertEqual(beam_fn_api_pb2.LogEntry.INFO, log_entry.severity)
+        self.assertEqual(beam_fn_api_pb2.LogEntry.Severity.INFO,
+                         log_entry.severity)
         self.assertEqual('%s: %s' % (msg, num_received_log_entries),
                          log_entry.message)
         self.assertEqual(u'log_handler_test._verify_fn_log_handler',

http://git-wip-us.apache.org/repos/asf/beam/blob/6fddd4ed/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py 
b/sdks/python/apache_beam/transforms/core.py
index 0a82de2..5d92fe9 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -1385,16 +1385,16 @@ class Windowing(object):
     return beam_runner_api_pb2.WindowingStrategy(
         window_fn=self.windowfn.to_runner_api(context),
         # TODO(robertwb): Prohibit implicit multi-level merging.
-        merge_status=(beam_runner_api_pb2.NEEDS_MERGE
+        merge_status=(beam_runner_api_pb2.MergeStatus.NEEDS_MERGE
                       if self.windowfn.is_merging()
-                      else beam_runner_api_pb2.NON_MERGING),
+                      else beam_runner_api_pb2.MergeStatus.NON_MERGING),
         window_coder_id=context.coders.get_id(
             self.windowfn.get_window_coder()),
         trigger=self.triggerfn.to_runner_api(context),
         accumulation_mode=self.accumulation_mode,
         output_time=self.timestamp_combiner,
         # TODO(robertwb): Support EMIT_IF_NONEMPTY
-        closing_behavior=beam_runner_api_pb2.EMIT_ALWAYS,
+        closing_behavior=beam_runner_api_pb2.ClosingBehavior.EMIT_ALWAYS,
         allowed_lateness=0)
 
   @staticmethod

http://git-wip-us.apache.org/repos/asf/beam/blob/6fddd4ed/sdks/python/apache_beam/transforms/trigger.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/trigger.py 
b/sdks/python/apache_beam/transforms/trigger.py
index 3583e62..bd99401 100644
--- a/sdks/python/apache_beam/transforms/trigger.py
+++ b/sdks/python/apache_beam/transforms/trigger.py
@@ -58,8 +58,8 @@ __all__ = [
 class AccumulationMode(object):
   """Controls what to do with data when a trigger fires multiple times.
   """
-  DISCARDING = beam_runner_api_pb2.DISCARDING
-  ACCUMULATING = beam_runner_api_pb2.ACCUMULATING
+  DISCARDING = beam_runner_api_pb2.AccumulationMode.DISCARDING
+  ACCUMULATING = beam_runner_api_pb2.AccumulationMode.ACCUMULATING
   # TODO(robertwb): Provide retractions of previous outputs.
   # RETRACTING = 3
 

http://git-wip-us.apache.org/repos/asf/beam/blob/6fddd4ed/sdks/python/apache_beam/transforms/window.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window.py 
b/sdks/python/apache_beam/transforms/window.py
index a025019..8c8bf33 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -86,9 +86,9 @@ __all__ = [
 class TimestampCombiner(object):
   """Determines how output timestamps of grouping operations are assigned."""
 
-  OUTPUT_AT_EOW = beam_runner_api_pb2.END_OF_WINDOW
-  OUTPUT_AT_EARLIEST = beam_runner_api_pb2.EARLIEST_IN_PANE
-  OUTPUT_AT_LATEST = beam_runner_api_pb2.LATEST_IN_PANE
+  OUTPUT_AT_EOW = beam_runner_api_pb2.OutputTime.END_OF_WINDOW
+  OUTPUT_AT_EARLIEST = beam_runner_api_pb2.OutputTime.EARLIEST_IN_PANE
+  OUTPUT_AT_LATEST = beam_runner_api_pb2.OutputTime.LATEST_IN_PANE
   # TODO(robertwb): Add this to the runner API or remove it.
   OUTPUT_AT_EARLIEST_TRANSFORMED = 'OUTPUT_AT_EARLIEST_TRANSFORMED'
 

Reply via email to