Repository: incubator-beam Updated Branches: refs/heads/master 6ec9e9680 -> 123674f4b
[BEAM-272][flink] remove dependency on Dataflow Runner Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/50edd231 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/50edd231 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/50edd231 Branch: refs/heads/master Commit: 50edd2314e7c7b97b75d2e6759c5857f4f67a662 Parents: acb0406 Author: Maximilian Michels <[email protected]> Authored: Wed May 11 11:57:44 2016 +0200 Committer: Maximilian Michels <[email protected]> Committed: Thu May 12 10:52:40 2016 +0200 ---------------------------------------------------------------------- runners/flink/runner/pom.xml | 10 ------- .../runners/flink/FlinkPipelineOptions.java | 30 ++++++++++++++++++-- .../beam/runners/flink/FlinkPipelineRunner.java | 4 +-- 3 files changed, 28 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/50edd231/runners/flink/runner/pom.xml ---------------------------------------------------------------------- diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml index a1d5370..8958bdd 100644 --- a/runners/flink/runner/pom.xml +++ b/runners/flink/runner/pom.xml @@ -110,16 +110,6 @@ </exclusions> </dependency> - <dependency> - <groupId>org.apache.beam</groupId> - <artifactId>google-cloud-dataflow-java-runner</artifactId> - <exclusions> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-jdk14</artifactId> - </exclusion> - </exclusions> - </dependency> <!-- Test scoped --> <dependency> <groupId>org.apache.beam</groupId> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/50edd231/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java index fd86bc9..c40473e 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java @@ -18,14 +18,18 @@ package org.apache.beam.runners.flink; -import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.options.ApplicationNameOptions; import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.DefaultValueFactory; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.StreamingOptions; import com.fasterxml.jackson.annotation.JsonIgnore; +import org.joda.time.DateTimeUtils; +import org.joda.time.DateTimeZone; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; import java.util.List; @@ -50,9 +54,9 @@ public interface FlinkPipelineOptions extends PipelineOptions, ApplicationNameOp /** * The job name is used to identify jobs running on a Flink cluster. */ - @Description("Dataflow job name, to uniquely identify active jobs. " + @Description("Flink job name, to uniquely identify active jobs. " + "Defaults to using the ApplicationName-UserName-Date.") - @Default.InstanceFactory(DataflowPipelineOptions.JobNameFactory.class) + @Default.InstanceFactory(JobNameFactory.class) String getJobName(); void setJobName(String value); @@ -91,4 +95,24 @@ public interface FlinkPipelineOptions extends PipelineOptions, ApplicationNameOp @Default.Long(-1L) Long getExecutionRetryDelay(); void setExecutionRetryDelay(Long delay); + + + class JobNameFactory implements DefaultValueFactory<String> { + private static final DateTimeFormatter FORMATTER = + DateTimeFormat.forPattern("MMddHHmmss").withZone(DateTimeZone.UTC); + + @Override + public String create(PipelineOptions options) { + String appName = options.as(ApplicationNameOptions.class).getAppName(); + String normalizedAppName = appName == null || appName.length() == 0 ? "FlinkRunner" + : appName.toLowerCase() + .replaceAll("[^a-z0-9]", "0") + .replaceAll("^[^a-z]", "a"); + String userName = System.getProperty("user.name", ""); + String normalizedUserName = userName.toLowerCase() + .replaceAll("[^a-z0-9]", "0"); + String datePart = FORMATTER.print(DateTimeUtils.currentTimeMillis()); + return normalizedAppName + "-" + normalizedUserName + "-" + datePart; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/50edd231/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java index a389d7a..3edf6f3 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java @@ -17,7 +17,6 @@ */ package org.apache.beam.runners.flink; -import org.apache.beam.runners.dataflow.DataflowPipelineRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -46,7 +45,6 @@ import java.util.Map; * pipeline by first translating them to a Flink Plan and then executing them either locally * or on a Flink cluster, depending on the configuration. * <p> - * This is based on {@link org.apache.beam.runners.dataflow.DataflowPipelineRunner}. */ public class FlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> { @@ -80,7 +78,7 @@ public class FlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> { if (flinkOptions.getFilesToStage() == null) { flinkOptions.setFilesToStage(detectClassPathResourcesToStage( - DataflowPipelineRunner.class.getClassLoader())); + FlinkPipelineRunner.class.getClassLoader())); LOG.info("PipelineOptions.filesToStage was not specified. " + "Defaulting to files from the classpath: will stage {} files. " + "Enable logging at DEBUG level to see which files will be staged.",
