Repository: incubator-beam Updated Branches: refs/heads/master f62d04e22 -> 843275210
[BEAM-642] Support Flink Detached Mode for JOB execution Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dc69bc48 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dc69bc48 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dc69bc48 Branch: refs/heads/master Commit: dc69bc48b0057f45d849d3cfec848fa066ee0854 Parents: f62d04e Author: Sumit Chawla <sumic...@cisco.com> Authored: Mon Sep 19 15:10:53 2016 -0700 Committer: Maximilian Michels <m...@apache.org> Committed: Thu Sep 22 11:30:09 2016 +0200 ---------------------------------------------------------------------- .../apache/beam/runners/flink/FlinkRunner.java | 25 +++++++++++++------- 1 file changed, 16 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dc69bc48/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java index d3c65c0..137fdeb 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java @@ -25,6 +25,7 @@ import java.net.URL; import java.net.URLClassLoader; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -56,6 +57,7 @@ import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.PValue; import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.client.program.DetachedEnvironment; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -151,18 +153,23 @@ public class FlinkRunner extends PipelineRunner<FlinkRunnerResult> { throw new RuntimeException("Pipeline execution failed", e); } - LOG.info("Execution finished in {} msecs", result.getNetRuntime()); - - Map<String, Object> accumulators = result.getAllAccumulatorResults(); - if (accumulators != null && !accumulators.isEmpty()) { - LOG.info("Final aggregator values:"); + if (result instanceof DetachedEnvironment.DetachedJobExecutionResult) { + LOG.info("Pipeline submitted in Detached mode"); + Map<String, Object> accumulators = Collections.emptyMap(); + return new FlinkRunnerResult(accumulators, -1L); + } else { + LOG.info("Execution finished in {} msecs", result.getNetRuntime()); + Map<String, Object> accumulators = result.getAllAccumulatorResults(); + if (accumulators != null && !accumulators.isEmpty()) { + LOG.info("Final aggregator values:"); - for (Map.Entry<String, Object> entry : result.getAllAccumulatorResults().entrySet()) { - LOG.info("{} : {}", entry.getKey(), entry.getValue()); + for (Map.Entry<String, Object> entry : result.getAllAccumulatorResults().entrySet()) { + LOG.info("{} : {}", entry.getKey(), entry.getValue()); + } } - } - return new FlinkRunnerResult(accumulators, result.getNetRuntime()); + return new FlinkRunnerResult(accumulators, result.getNetRuntime()); + } } /**