[
https://issues.apache.org/jira/browse/BEAM-593?focusedWorklogId=140693&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-140693
]
ASF GitHub Bot logged work on BEAM-593:
---------------------------------------
Author: ASF GitHub Bot
Created on: 03/Sep/18 18:14
Start Date: 03/Sep/18 18:14
Worklog Time Spent: 10m
Work Description: stale[bot] closed pull request #3295: [BEAM-593]
Refactor Flink execution as preparation for async job control
URL: https://github.com/apache/beam/pull/3295
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineExecutor.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineExecutor.java
new file mode 100644
index 00000000000..a0402445a2f
--- /dev/null
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineExecutor.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.CollectionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link FlinkPipelineExecutor} that executes a {@link Pipeline} using a Flink
+ * {@link ExecutionEnvironment}.
+ */
+class FlinkBatchPipelineExecutor implements FlinkPipelineExecutor {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(FlinkBatchPipelineExecutor.class);
+
+ @Override
+ public PipelineResult executePipeline(
+ FlinkRunner runner, Pipeline pipeline, FlinkPipelineOptions options)
throws Exception{
+
+ ExecutionEnvironment env = createBatchExecutionEnvironment(options);
+ FlinkBatchPipelineTranslator translator = new
FlinkBatchPipelineTranslator(env, options);
+ translator.translate(pipeline);
+
+ JobExecutionResult result = env.execute(options.getJobName());
+
+ return FlinkRunnerResultUtil.wrapFlinkRunnerResult(LOG, result);
+ }
+
+ private ExecutionEnvironment
createBatchExecutionEnvironment(FlinkPipelineOptions options) {
+
+ String masterUrl = options.getFlinkMaster();
+ ExecutionEnvironment flinkBatchEnv;
+
+ // depending on the master, create the right environment.
+ if (masterUrl.equals("[local]")) {
+ flinkBatchEnv = ExecutionEnvironment.createLocalEnvironment();
+ } else if (masterUrl.equals("[collection]")) {
+ flinkBatchEnv = new CollectionEnvironment();
+ } else if (masterUrl.equals("[auto]")) {
+ flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment();
+ } else if (masterUrl.matches(".*:\\d*")) {
+ String[] parts = masterUrl.split(":");
+ List<String> stagingFiles = options.getFilesToStage();
+ flinkBatchEnv = ExecutionEnvironment.createRemoteEnvironment(parts[0],
+ Integer.parseInt(parts[1]),
+ stagingFiles.toArray(new String[stagingFiles.size()]));
+ } else {
+ LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].",
masterUrl);
+ flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment();
+ }
+
+ // set the correct parallelism.
+ if (options.getParallelism() != -1 && !(flinkBatchEnv instanceof
CollectionEnvironment)) {
+ flinkBatchEnv.setParallelism(options.getParallelism());
+ }
+
+ // set parallelism in the options (required by some execution code)
+ options.setParallelism(flinkBatchEnv.getParallelism());
+
+ if (options.getObjectReuse()) {
+ flinkBatchEnv.getConfig().enableObjectReuse();
+ } else {
+ flinkBatchEnv.getConfig().disableObjectReuse();
+ }
+
+ return flinkBatchEnv;
+ }
+}
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
deleted file mode 100644
index d2a2016c98a..00000000000
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.List;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.CollectionEnvironment;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The class that instantiates and manages the execution of a given job.
- * Depending on if the job is a Streaming or Batch processing one, it creates
- * the adequate execution environment ({@link ExecutionEnvironment}
- * or {@link StreamExecutionEnvironment}), the necessary {@link
FlinkPipelineTranslator}
- * ({@link FlinkBatchPipelineTranslator} or {@link
FlinkStreamingPipelineTranslator}) to
- * transform the Beam job into a Flink one, and executes the (translated) job.
- */
-class FlinkPipelineExecutionEnvironment {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(FlinkPipelineExecutionEnvironment.class);
-
- private final FlinkPipelineOptions options;
-
- /**
- * The Flink Batch execution environment. This is instantiated to either a
- * {@link org.apache.flink.api.java.CollectionEnvironment},
- * a {@link org.apache.flink.api.java.LocalEnvironment} or
- * a {@link org.apache.flink.api.java.RemoteEnvironment}, depending on the
configuration
- * options.
- */
- private ExecutionEnvironment flinkBatchEnv;
-
- /**
- * The Flink Streaming execution environment. This is instantiated to either
a
- * {@link org.apache.flink.streaming.api.environment.LocalStreamEnvironment}
or
- * a {@link
org.apache.flink.streaming.api.environment.RemoteStreamEnvironment}, depending
- * on the configuration options, and more specifically, the url of the
master.
- */
- private StreamExecutionEnvironment flinkStreamEnv;
-
- /**
- * Creates a {@link FlinkPipelineExecutionEnvironment} with the
user-specified parameters in the
- * provided {@link FlinkPipelineOptions}.
- *
- * @param options the user-defined pipeline options.
- * */
- FlinkPipelineExecutionEnvironment(FlinkPipelineOptions options) {
- this.options = checkNotNull(options);
- }
-
- /**
- * Depending on if the job is a Streaming or a Batch one, this method creates
- * the necessary execution environment and pipeline translator, and
translates
- * the {@link org.apache.beam.sdk.values.PCollection} program into
- * a {@link org.apache.flink.api.java.DataSet}
- * or {@link org.apache.flink.streaming.api.datastream.DataStream} one.
- * */
- public void translate(FlinkRunner flinkRunner, Pipeline pipeline) {
- this.flinkBatchEnv = null;
- this.flinkStreamEnv = null;
-
-
pipeline.replaceAll(FlinkTransformOverrides.getDefaultOverrides(options.isStreaming()));
-
- PipelineTranslationOptimizer optimizer =
- new PipelineTranslationOptimizer(TranslationMode.BATCH, options);
-
- optimizer.translate(pipeline);
- TranslationMode translationMode = optimizer.getTranslationMode();
-
- FlinkPipelineTranslator translator;
- if (translationMode == TranslationMode.STREAMING) {
- this.flinkStreamEnv = createStreamExecutionEnvironment();
- translator = new FlinkStreamingPipelineTranslator(flinkRunner,
flinkStreamEnv, options);
- } else {
- this.flinkBatchEnv = createBatchExecutionEnvironment();
- translator = new FlinkBatchPipelineTranslator(flinkBatchEnv, options);
- }
-
- translator.translate(pipeline);
- }
-
- /**
- * Launches the program execution.
- * */
- public JobExecutionResult executePipeline() throws Exception {
- final String jobName = options.getJobName();
-
- if (flinkBatchEnv != null) {
- return flinkBatchEnv.execute(jobName);
- } else if (flinkStreamEnv != null) {
- return flinkStreamEnv.execute(jobName);
- } else {
- throw new IllegalStateException("The Pipeline has not yet been
translated.");
- }
- }
-
- /**
- * If the submitted job is a batch processing job, this method creates the
adequate
- * Flink {@link org.apache.flink.api.java.ExecutionEnvironment} depending
- * on the user-specified options.
- */
- private ExecutionEnvironment createBatchExecutionEnvironment() {
-
- LOG.info("Creating the required Batch Execution Environment.");
-
- String masterUrl = options.getFlinkMaster();
- ExecutionEnvironment flinkBatchEnv;
-
- // depending on the master, create the right environment.
- if (masterUrl.equals("[local]")) {
- flinkBatchEnv = ExecutionEnvironment.createLocalEnvironment();
- } else if (masterUrl.equals("[collection]")) {
- flinkBatchEnv = new CollectionEnvironment();
- } else if (masterUrl.equals("[auto]")) {
- flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment();
- } else if (masterUrl.matches(".*:\\d*")) {
- String[] parts = masterUrl.split(":");
- List<String> stagingFiles = options.getFilesToStage();
- flinkBatchEnv = ExecutionEnvironment.createRemoteEnvironment(parts[0],
- Integer.parseInt(parts[1]),
- stagingFiles.toArray(new String[stagingFiles.size()]));
- } else {
- LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].",
masterUrl);
- flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment();
- }
-
- // set the correct parallelism.
- if (options.getParallelism() != -1 && !(flinkBatchEnv instanceof
CollectionEnvironment)) {
- flinkBatchEnv.setParallelism(options.getParallelism());
- }
-
- // set parallelism in the options (required by some execution code)
- options.setParallelism(flinkBatchEnv.getParallelism());
-
- if (options.getObjectReuse()) {
- flinkBatchEnv.getConfig().enableObjectReuse();
- } else {
- flinkBatchEnv.getConfig().disableObjectReuse();
- }
-
- return flinkBatchEnv;
- }
-
- /**
- * If the submitted job is a stream processing job, this method creates the
adequate
- * Flink {@link
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment} depending
- * on the user-specified options.
- */
- private StreamExecutionEnvironment createStreamExecutionEnvironment() {
-
- LOG.info("Creating the required Streaming Environment.");
-
- String masterUrl = options.getFlinkMaster();
- StreamExecutionEnvironment flinkStreamEnv = null;
-
- // depending on the master, create the right environment.
- if (masterUrl.equals("[local]")) {
- flinkStreamEnv = StreamExecutionEnvironment.createLocalEnvironment();
- } else if (masterUrl.equals("[auto]")) {
- flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
- } else if (masterUrl.matches(".*:\\d*")) {
- String[] parts = masterUrl.split(":");
- List<String> stagingFiles = options.getFilesToStage();
- flinkStreamEnv =
StreamExecutionEnvironment.createRemoteEnvironment(parts[0],
- Integer.parseInt(parts[1]), stagingFiles.toArray(new
String[stagingFiles.size()]));
- } else {
- LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].",
masterUrl);
- flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
- }
-
- // set the correct parallelism.
- if (options.getParallelism() != -1) {
- flinkStreamEnv.setParallelism(options.getParallelism());
- }
-
- // set parallelism in the options (required by some execution code)
- options.setParallelism(flinkStreamEnv.getParallelism());
-
- if (options.getObjectReuse()) {
- flinkStreamEnv.getConfig().enableObjectReuse();
- } else {
- flinkStreamEnv.getConfig().disableObjectReuse();
- }
-
- // default to event time
- flinkStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-
- // for the following 2 parameters, a value of -1 means that Flink will use
- // the default values as specified in the configuration.
- int numRetries = options.getNumberOfExecutionRetries();
- if (numRetries != -1) {
- flinkStreamEnv.setNumberOfExecutionRetries(numRetries);
- }
- long retryDelay = options.getExecutionRetryDelay();
- if (retryDelay != -1) {
- flinkStreamEnv.getConfig().setExecutionRetryDelay(retryDelay);
- }
-
- // A value of -1 corresponds to disabled checkpointing (see
CheckpointConfig in Flink).
- // If the value is not -1, then the validity checks are applied.
- // By default, checkpointing is disabled.
- long checkpointInterval = options.getCheckpointingInterval();
- if (checkpointInterval != -1) {
- if (checkpointInterval < 1) {
- throw new IllegalArgumentException("The checkpoint interval must be
positive");
- }
- flinkStreamEnv.enableCheckpointing(checkpointInterval,
options.getCheckpointingMode());
- flinkStreamEnv.getCheckpointConfig().setCheckpointTimeout(
- options.getCheckpointTimeoutMillis());
- boolean externalizedCheckpoint =
options.isExternalizedCheckpointsEnabled();
- boolean retainOnCancellation =
options.getRetainExternalizedCheckpointsOnCancellation();
- if (externalizedCheckpoint) {
- flinkStreamEnv.getCheckpointConfig().enableExternalizedCheckpoints(
- retainOnCancellation ?
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
- : ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
- }
- }
-
- // State backend
- final AbstractStateBackend stateBackend = options.getStateBackend();
- if (stateBackend != null) {
- flinkStreamEnv.setStateBackend(stateBackend);
- }
-
- return flinkStreamEnv;
- }
-
-}
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutor.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutor.java
new file mode 100644
index 00000000000..593605e4152
--- /dev/null
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutor.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+
+/**
+ * A {@link FlinkPipelineExecutor} can execute a {@link Pipeline} on Flink.
+ *
+ * <p>There are subclasses for batch and for streaming execution.
+ */
+interface FlinkPipelineExecutor {
+
+ /**
+ * Executes the given pipeline.
+ */
+ PipelineResult executePipeline(
+ FlinkRunner runner, Pipeline pipeline, FlinkPipelineOptions options)
throws Exception;
+}
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
index ca12615be03..7e9cd2e6ce3 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
@@ -25,7 +25,6 @@
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
@@ -38,8 +37,8 @@
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.View;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.client.program.DetachedEnvironment;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -107,34 +106,21 @@ public PipelineResult run(Pipeline pipeline) {
LOG.info("Executing pipeline using FlinkRunner.");
- FlinkPipelineExecutionEnvironment env = new
FlinkPipelineExecutionEnvironment(options);
+ boolean streaming = options.isStreaming() ||
containsUnboundedPCollection(pipeline);
- LOG.info("Translating pipeline to Flink program.");
- env.translate(this, pipeline);
+
pipeline.replaceAll(FlinkTransformOverrides.getDefaultOverrides(options.isStreaming()));
- JobExecutionResult result;
- try {
- LOG.info("Starting execution of Flink program.");
- result = env.executePipeline();
- } catch (Exception e) {
- LOG.error("Pipeline execution failed", e);
- throw new RuntimeException("Pipeline execution failed", e);
- }
-
- if (result instanceof DetachedEnvironment.DetachedJobExecutionResult) {
- LOG.info("Pipeline submitted in Detached mode");
- return new FlinkDetachedRunnerResult();
+ FlinkPipelineExecutor executor;
+ if (streaming) {
+ executor = new FlinkStreamingPipelineExecutor();
} else {
- LOG.info("Execution finished in {} msecs", result.getNetRuntime());
- Map<String, Object> accumulators = result.getAllAccumulatorResults();
- if (accumulators != null && !accumulators.isEmpty()) {
- LOG.info("Final accumulator values:");
- for (Map.Entry<String, Object> entry :
result.getAllAccumulatorResults().entrySet()) {
- LOG.info("{} : {}", entry.getKey(), entry.getValue());
- }
- }
+ executor = new FlinkBatchPipelineExecutor();
+ }
- return new FlinkRunnerResult(accumulators, result.getNetRuntime());
+ try {
+ return executor.executePipeline(this, pipeline, options);
+ } catch (Exception e) {
+ throw new RuntimeException("Pipeline execution failed.", e);
}
}
@@ -223,4 +209,20 @@ public CompositeBehavior
enterCompositeTransform(TransformHierarchy.Node node) {
ptransformViewNamesWithNonDeterministicKeyCoders);
}
}
+
+ private boolean containsUnboundedPCollection(Pipeline p) {
+ class BoundednessVisitor extends Pipeline.PipelineVisitor.Defaults {
+ PCollection.IsBounded boundedness = PCollection.IsBounded.BOUNDED;
+
+ @Override
+ public void visitValue(PValue value, TransformHierarchy.Node producer) {
+ if (value instanceof PCollection) {
+ boundedness = boundedness.and(((PCollection) value).isBounded());
+ }
+ }
+ }
+ BoundednessVisitor visitor = new BoundednessVisitor();
+ p.traverseTopologically(visitor);
+ return visitor.boundedness == PCollection.IsBounded.UNBOUNDED;
+ }
}
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResultUtil.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResultUtil.java
new file mode 100644
index 00000000000..f46a0ec1be3
--- /dev/null
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResultUtil.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import java.util.Map;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.client.program.DetachedEnvironment;
+import org.slf4j.Logger;
+
+/**
+ * Static utility methods for wrapping a Flink job result as a
+ * {@link org.apache.beam.sdk.PipelineResult}.
+ */
+class FlinkRunnerResultUtil {
+
+ static PipelineResult wrapFlinkRunnerResult(Logger log, JobExecutionResult
jobResult) {
+ if (jobResult instanceof DetachedEnvironment.DetachedJobExecutionResult) {
+ log.info("Pipeline submitted in Detached mode");
+ return new FlinkDetachedRunnerResult();
+ } else {
+ log.info("Execution finished in {} msecs", jobResult.getNetRuntime());
+ Map<String, Object> accumulators = jobResult.getAllAccumulatorResults();
+ if (accumulators != null && !accumulators.isEmpty()) {
+ log.info("Final accumulator values:");
+ for (Map.Entry<String, Object> entry : accumulators.entrySet()) {
+ log.info("{} : {}", entry.getKey(), entry.getValue());
+ }
+ }
+
+ return new FlinkRunnerResult(accumulators, jobResult.getNetRuntime());
+ }
+ }
+}
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineExecutor.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineExecutor.java
new file mode 100644
index 00000000000..c031366aee9
--- /dev/null
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineExecutor.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link FlinkPipelineExecutor} that executes a {@link Pipeline} using a Flink
+ * {@link StreamExecutionEnvironment}.
+ */
+class FlinkStreamingPipelineExecutor implements FlinkPipelineExecutor {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(FlinkStreamingPipelineExecutor.class);
+
+ @Override
+ public PipelineResult executePipeline(
+ FlinkRunner runner, Pipeline pipeline, FlinkPipelineOptions options)
throws Exception {
+
+ StreamExecutionEnvironment env = createStreamExecutionEnvironment(options);
+
+ FlinkStreamingPipelineTranslator translator =
+ new FlinkStreamingPipelineTranslator(runner, env, options);
+ translator.translate(pipeline);
+
+ return runPipeline(options, env);
+ }
+
+ private StreamExecutionEnvironment createStreamExecutionEnvironment(
+ FlinkPipelineOptions options) {
+
+ String masterUrl = options.getFlinkMaster();
+ StreamExecutionEnvironment flinkStreamEnv = null;
+
+ // depending on the master, create the right environment.
+ if (masterUrl.equals("[local]")) {
+ flinkStreamEnv = StreamExecutionEnvironment.createLocalEnvironment();
+ } else if (masterUrl.equals("[auto]")) {
+ flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+ } else if (masterUrl.matches(".*:\\d*")) {
+ String[] parts = masterUrl.split(":");
+ List<String> stagingFiles = options.getFilesToStage();
+ flinkStreamEnv =
StreamExecutionEnvironment.createRemoteEnvironment(parts[0],
+ Integer.parseInt(parts[1]), stagingFiles.toArray(new
String[stagingFiles.size()]));
+ } else {
+ LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].",
masterUrl);
+ flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+ }
+
+ // set the correct parallelism.
+ if (options.getParallelism() != -1) {
+ flinkStreamEnv.setParallelism(options.getParallelism());
+ }
+
+ // set parallelism in the options (required by some execution code)
+ options.setParallelism(flinkStreamEnv.getParallelism());
+
+ if (options.getObjectReuse()) {
+ flinkStreamEnv.getConfig().enableObjectReuse();
+ } else {
+ flinkStreamEnv.getConfig().disableObjectReuse();
+ }
+
+ // default to event time
+ flinkStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+ // for the following 2 parameters, a value of -1 means that Flink will use
+ // the default values as specified in the configuration.
+ int numRetries = options.getNumberOfExecutionRetries();
+ if (numRetries != -1) {
+ flinkStreamEnv.setNumberOfExecutionRetries(numRetries);
+ }
+ long retryDelay = options.getExecutionRetryDelay();
+ if (retryDelay != -1) {
+ flinkStreamEnv.getConfig().setExecutionRetryDelay(retryDelay);
+ }
+
+ // A value of -1 corresponds to disabled checkpointing (see
CheckpointConfig in Flink).
+ // If the value is not -1, then the validity checks are applied.
+ // By default, checkpointing is disabled.
+ long checkpointInterval = options.getCheckpointingInterval();
+ if (checkpointInterval != -1) {
+ if (checkpointInterval < 1) {
+ throw new IllegalArgumentException("The checkpoint interval must be
positive");
+ }
+ flinkStreamEnv.enableCheckpointing(checkpointInterval,
options.getCheckpointingMode());
+ flinkStreamEnv.getCheckpointConfig().setCheckpointTimeout(
+ options.getCheckpointTimeoutMillis());
+ boolean externalizedCheckpoint =
options.isExternalizedCheckpointsEnabled();
+ boolean retainOnCancellation =
options.getRetainExternalizedCheckpointsOnCancellation();
+ if (externalizedCheckpoint) {
+ flinkStreamEnv.getCheckpointConfig().enableExternalizedCheckpoints(
+ retainOnCancellation ?
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
+ : ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
+ }
+ }
+
+ // State backend
+ final AbstractStateBackend stateBackend = options.getStateBackend();
+ if (stateBackend != null) {
+ flinkStreamEnv.setStateBackend(stateBackend);
+ }
+
+ return flinkStreamEnv;
+ }
+
+ /**
+ * This will use blocking submission so the job-control features of {@link
PipelineResult} don't
+ * work.
+ */
+ private static PipelineResult runPipeline(
+ FlinkPipelineOptions options, StreamExecutionEnvironment env) throws
Exception {
+
+ JobExecutionResult jobResult = env.execute(options.getJobName());
+
+ return FlinkRunnerResultUtil.wrapFlinkRunnerResult(LOG, jobResult);
+ }
+}
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java
deleted file mode 100644
index 3acc3eafca1..00000000000
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.runners.TransformHierarchy;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Traverses the Pipeline to determine the {@link TranslationMode} for this
pipeline.
- */
-class PipelineTranslationOptimizer extends FlinkPipelineTranslator {
-
- private static final Logger LOG =
LoggerFactory.getLogger(PipelineTranslationOptimizer.class);
-
- private TranslationMode translationMode;
-
- private final FlinkPipelineOptions options;
-
- public PipelineTranslationOptimizer(TranslationMode defaultMode,
FlinkPipelineOptions options) {
- this.translationMode = defaultMode;
- this.options = options;
- }
-
- public TranslationMode getTranslationMode() {
-
- // override user-specified translation mode
- if (options.isStreaming()) {
- return TranslationMode.STREAMING;
- }
-
- return translationMode;
- }
-
- @Override
- public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node
node) {
- return CompositeBehavior.ENTER_TRANSFORM;
- }
-
- @Override
- public void leaveCompositeTransform(TransformHierarchy.Node node) {}
-
- @Override
- public void visitPrimitiveTransform(TransformHierarchy.Node node) {
- Class<? extends PTransform> transformClass =
node.getTransform().getClass();
- if (transformClass == Read.Unbounded.class) {
- LOG.info("Found {}. Switching to streaming execution.", transformClass);
- translationMode = TranslationMode.STREAMING;
- }
- }
-
- @Override
- public void visitValue(PValue value, TransformHierarchy.Node producer) {}
-}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 140693)
Time Spent: 1h (was: 50m)
> Support unblocking run() in FlinkRunner and cancel() and waitUntilFinish() in
> FlinkRunnerResult
> -----------------------------------------------------------------------------------------------
>
> Key: BEAM-593
> URL: https://issues.apache.org/jira/browse/BEAM-593
> Project: Beam
> Issue Type: New Feature
> Components: runner-flink
> Reporter: Pei He
> Assignee: Aljoscha Krettek
> Priority: Major
> Time Spent: 1h
> Remaining Estimate: 0h
>
> We introduced both functions to PipelineResult.
> Currently, both of them throw UnsupportedOperationException in Flink runner.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)