Repository: beam
Updated Branches:
  refs/heads/master 7447147e0 -> 367d66cbd


Support DRAINING and DRAINED in the DataflowRunner

Dataflow's DRAINING is treated as RUNNING, and DRAINED as DONE.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b3c52b63
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b3c52b63
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b3c52b63

Branch: refs/heads/master
Commit: b3c52b63c8c8dee0af6c40cb32872f8f6650dd7a
Parents: 7447147
Author: Eugene Kirpichov <[email protected]>
Authored: Thu Mar 2 15:12:14 2017 -0800
Committer: Thomas Groh <[email protected]>
Committed: Mon Mar 13 11:25:00 2017 -0700

----------------------------------------------------------------------
 .../runners/dataflow/util/MonitoringUtil.java   |  4 +++
 .../dataflow/util/MonitoringUtilTest.java       | 31 ++++++++++----------
 2 files changed, 19 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b3c52b63/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java
index d0a24bf..c410afb 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java
@@ -60,6 +60,10 @@ public final class MonitoringUtil {
           .put("JOB_STATE_FAILED", State.FAILED)
           .put("JOB_STATE_CANCELLED", State.CANCELLED)
           .put("JOB_STATE_UPDATED", State.UPDATED)
+          // A DRAINING job is still running - the closest mapping is RUNNING.
+          .put("JOB_STATE_DRAINING", State.RUNNING)
+          // A DRAINED job has successfully terminated - the closest mapping 
is DONE.
+          .put("JOB_STATE_DRAINED", State.DONE)
           .build();
   private static final String JOB_MESSAGE_ERROR = "JOB_MESSAGE_ERROR";
   private static final String JOB_MESSAGE_WARNING = "JOB_MESSAGE_WARNING";

http://git-wip-us.apache.org/repos/asf/beam/blob/b3c52b63/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java
index 23ed26f..24b6c4e 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java
@@ -89,30 +89,29 @@ public class MonitoringUtilTest {
   }
 
   @Test
-  public void testToStateCreatesState() {
-    String stateName = "JOB_STATE_DONE";
-
-    State result = MonitoringUtil.toState(stateName);
-
-    assertEquals(State.DONE, result);
+  public void testToStateNormal() {
+    // Trivially mapped cases
+    assertEquals(State.UNKNOWN, MonitoringUtil.toState("JOB_STATE_UNKNOWN"));
+    assertEquals(State.STOPPED, MonitoringUtil.toState("JOB_STATE_STOPPED"));
+    assertEquals(State.RUNNING, MonitoringUtil.toState("JOB_STATE_RUNNING"));
+    assertEquals(State.DONE, MonitoringUtil.toState("JOB_STATE_DONE"));
+    assertEquals(State.FAILED, MonitoringUtil.toState("JOB_STATE_FAILED"));
+    assertEquals(State.CANCELLED, 
MonitoringUtil.toState("JOB_STATE_CANCELLED"));
+    assertEquals(State.UPDATED, MonitoringUtil.toState("JOB_STATE_UPDATED"));
+
+    // Non-trivially mapped cases
+    assertEquals(State.RUNNING, MonitoringUtil.toState("JOB_STATE_DRAINING"));
+    assertEquals(State.DONE, MonitoringUtil.toState("JOB_STATE_DRAINED"));
   }
 
   @Test
   public void testToStateWithNullReturnsUnknown() {
-    String stateName = null;
-
-    State result = MonitoringUtil.toState(stateName);
-
-    assertEquals(State.UNKNOWN, result);
+    assertEquals(State.UNKNOWN, MonitoringUtil.toState(null));
   }
 
   @Test
   public void testToStateWithOtherValueReturnsUnknown() {
-    String stateName = "FOO_BAR_BAZ";
-
-    State result = MonitoringUtil.toState(stateName);
-
-    assertEquals(State.UNKNOWN, result);
+    assertEquals(State.UNKNOWN, MonitoringUtil.toState("FOO_BAR_BAZ"));
   }
 
   @Test

Reply via email to