Repository: beam Updated Branches: refs/heads/master 7447147e0 -> 367d66cbd
Support DRAINING and DRAINED in the DataflowRunner Dataflow's DRAINING is treated as RUNNING, and DRAINED as DONE. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b3c52b63 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b3c52b63 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b3c52b63 Branch: refs/heads/master Commit: b3c52b63c8c8dee0af6c40cb32872f8f6650dd7a Parents: 7447147 Author: Eugene Kirpichov <[email protected]> Authored: Thu Mar 2 15:12:14 2017 -0800 Committer: Thomas Groh <[email protected]> Committed: Mon Mar 13 11:25:00 2017 -0700 ---------------------------------------------------------------------- .../runners/dataflow/util/MonitoringUtil.java | 4 +++ .../dataflow/util/MonitoringUtilTest.java | 31 ++++++++++---------- 2 files changed, 19 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/b3c52b63/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java index d0a24bf..c410afb 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java @@ -60,6 +60,10 @@ public final class MonitoringUtil { .put("JOB_STATE_FAILED", State.FAILED) .put("JOB_STATE_CANCELLED", State.CANCELLED) .put("JOB_STATE_UPDATED", State.UPDATED) + // A DRAINING job is still running - the closest mapping is RUNNING. + .put("JOB_STATE_DRAINING", State.RUNNING) + // A DRAINED job has successfully terminated - the closest mapping is DONE. + .put("JOB_STATE_DRAINED", State.DONE) .build(); private static final String JOB_MESSAGE_ERROR = "JOB_MESSAGE_ERROR"; private static final String JOB_MESSAGE_WARNING = "JOB_MESSAGE_WARNING"; http://git-wip-us.apache.org/repos/asf/beam/blob/b3c52b63/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java index 23ed26f..24b6c4e 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java @@ -89,30 +89,29 @@ public class MonitoringUtilTest { } @Test - public void testToStateCreatesState() { - String stateName = "JOB_STATE_DONE"; - - State result = MonitoringUtil.toState(stateName); - - assertEquals(State.DONE, result); + public void testToStateNormal() { + // Trivially mapped cases + assertEquals(State.UNKNOWN, MonitoringUtil.toState("JOB_STATE_UNKNOWN")); + assertEquals(State.STOPPED, MonitoringUtil.toState("JOB_STATE_STOPPED")); + assertEquals(State.RUNNING, MonitoringUtil.toState("JOB_STATE_RUNNING")); + assertEquals(State.DONE, MonitoringUtil.toState("JOB_STATE_DONE")); + assertEquals(State.FAILED, MonitoringUtil.toState("JOB_STATE_FAILED")); + assertEquals(State.CANCELLED, MonitoringUtil.toState("JOB_STATE_CANCELLED")); + assertEquals(State.UPDATED, MonitoringUtil.toState("JOB_STATE_UPDATED")); + + // Non-trivially mapped cases + assertEquals(State.RUNNING, MonitoringUtil.toState("JOB_STATE_DRAINING")); + assertEquals(State.DONE, MonitoringUtil.toState("JOB_STATE_DRAINED")); } @Test public void testToStateWithNullReturnsUnknown() { - String stateName = null; - - State result = MonitoringUtil.toState(stateName); - - assertEquals(State.UNKNOWN, result); + assertEquals(State.UNKNOWN, MonitoringUtil.toState(null)); } @Test public void testToStateWithOtherValueReturnsUnknown() { - String stateName = "FOO_BAR_BAZ"; - - State result = MonitoringUtil.toState(stateName); - - assertEquals(State.UNKNOWN, result); + assertEquals(State.UNKNOWN, MonitoringUtil.toState("FOO_BAR_BAZ")); } @Test
