[
https://issues.apache.org/jira/browse/BEAM-5520?focusedWorklogId=150278&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-150278
]
ASF GitHub Bot logged work on BEAM-5520:
----------------------------------------
Author: ASF GitHub Bot
Created on: 02/Oct/18 01:15
Start Date: 02/Oct/18 01:15
Worklog Time Spent: 10m
Work Description: tweise closed pull request #6524: [BEAM-5520] Flink
pipeline option to run SDK harness per subtask.
URL: https://github.com/apache/beam/pull/6524
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
index 196534041e9..bb3a8903100 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
@@ -38,7 +38,6 @@
import org.apache.beam.model.pipeline.v1.RunnerApi;
import
org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId;
import org.apache.beam.runners.core.construction.PTransformTranslation;
-import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.core.construction.RehydratedComponents;
import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
@@ -64,7 +63,6 @@
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
@@ -116,17 +114,10 @@
* {@link ExecutionEnvironment}.
*/
public static BatchTranslationContext createTranslationContext(
- JobInfo jobInfo, List<String> filesToStage) {
- PipelineOptions pipelineOptions;
- try {
- pipelineOptions =
PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions());
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
+ JobInfo jobInfo, FlinkPipelineOptions pipelineOptions, List<String>
filesToStage) {
ExecutionEnvironment executionEnvironment =
- FlinkExecutionEnvironments.createBatchExecutionEnvironment(
- pipelineOptions.as(FlinkPipelineOptions.class), filesToStage);
- return new BatchTranslationContext(jobInfo, executionEnvironment);
+
FlinkExecutionEnvironments.createBatchExecutionEnvironment(pipelineOptions,
filesToStage);
+ return new BatchTranslationContext(jobInfo, pipelineOptions,
executionEnvironment);
}
/** Creates a batch translator. */
@@ -160,12 +151,15 @@ public static FlinkBatchPortablePipelineTranslator
createTranslator() {
implements FlinkPortablePipelineTranslator.TranslationContext {
private final JobInfo jobInfo;
+ private final FlinkPipelineOptions options;
private final ExecutionEnvironment executionEnvironment;
private final Map<String, DataSet<?>> dataSets;
private final Set<String> danglingDataSets;
- private BatchTranslationContext(JobInfo jobInfo, ExecutionEnvironment
executionEnvironment) {
+ private BatchTranslationContext(
+ JobInfo jobInfo, FlinkPipelineOptions options, ExecutionEnvironment
executionEnvironment) {
this.jobInfo = jobInfo;
+ this.options = options;
this.executionEnvironment = executionEnvironment;
dataSets = new HashMap<>();
danglingDataSets = new HashSet<>();
@@ -176,6 +170,11 @@ public JobInfo getJobInfo() {
return jobInfo;
}
+ @Override
+ public FlinkPipelineOptions getPipelineOptions() {
+ return options;
+ }
+
public ExecutionEnvironment getExecutionEnvironment() {
return executionEnvironment;
}
@@ -336,7 +335,10 @@ public void translate(BatchTranslationContext context,
RunnerApi.Pipeline pipeli
}
FlinkExecutableStageFunction<InputT> function =
new FlinkExecutableStageFunction<>(
- stagePayload, context.getJobInfo(), outputMap,
FlinkExecutableStageContext.factory());
+ stagePayload,
+ context.getJobInfo(),
+ outputMap,
+ FlinkExecutableStageContext.factory(context.getPipelineOptions()));
DataSet<WindowedValue<InputT>> inputDataSet =
context.getDataSetOrThrow(stagePayload.getInput());
@@ -472,18 +474,13 @@ public void translate(BatchTranslationContext context,
RunnerApi.Pipeline pipeli
Grouping<WindowedValue<KV<K, V>>> inputGrouping =
inputDataSet.groupBy(new
KvKeySelector<>(inputElementCoder.getKeyCoder()));
- PipelineOptions options;
- try {
- options =
PipelineOptionsTranslation.fromProto(context.getJobInfo().pipelineOptions());
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
FlinkPartialReduceFunction<K, V, List<V>, ?> partialReduceFunction =
new FlinkPartialReduceFunction<>(
- combineFn, windowingStrategy, Collections.emptyMap(), options);
+ combineFn, windowingStrategy, Collections.emptyMap(),
context.getPipelineOptions());
FlinkReduceFunction<K, List<V>, List<V>, ?> reduceFunction =
- new FlinkReduceFunction<>(combineFn, windowingStrategy,
Collections.emptyMap(), options);
+ new FlinkReduceFunction<>(
+ combineFn, windowingStrategy, Collections.emptyMap(),
context.getPipelineOptions());
// Partially GroupReduce the values into the intermediate format AccumT
(combine)
GroupCombineOperator<WindowedValue<KV<K, V>>, WindowedValue<KV<K,
List<V>>>> groupCombine =
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
index 8855c3d1be3..44c52f1500c 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
@@ -108,7 +108,8 @@ private PipelineResult runPipeline() throws Exception {
FlinkBatchPortablePipelineTranslator translator =
FlinkBatchPortablePipelineTranslator.createTranslator();
FlinkBatchPortablePipelineTranslator.BatchTranslationContext context =
-
FlinkBatchPortablePipelineTranslator.createTranslationContext(jobInfo,
filesToStage);
+ FlinkBatchPortablePipelineTranslator.createTranslationContext(
+ jobInfo, pipelineOptions, filesToStage);
translator.translate(context, fusedPipeline);
result =
context.getExecutionEnvironment().execute(pipelineOptions.getJobName());
} else {
@@ -116,7 +117,8 @@ private PipelineResult runPipeline() throws Exception {
FlinkStreamingPortablePipelineTranslator translator =
new FlinkStreamingPortablePipelineTranslator();
FlinkStreamingPortablePipelineTranslator.StreamingTranslationContext
context =
-
FlinkStreamingPortablePipelineTranslator.createTranslationContext(jobInfo,
filesToStage);
+ FlinkStreamingPortablePipelineTranslator.createTranslationContext(
+ jobInfo, pipelineOptions, filesToStage);
translator.translate(context, fusedPipeline);
result =
context.getExecutionEnvironment().execute(pipelineOptions.getJobName());
}
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
index d004a67588f..76867bce2c6 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
@@ -27,6 +27,7 @@
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.vendor.protobuf.v3.com.google.protobuf.Struct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,16 +37,19 @@
private static final Logger LOG =
LoggerFactory.getLogger(FlinkJobInvoker.class);
public static FlinkJobInvoker create(
- ListeningExecutorService executorService, String flinkMasterUrl) {
- return new FlinkJobInvoker(executorService, flinkMasterUrl);
+ ListeningExecutorService executorService,
+ FlinkJobServerDriver.ServerConfiguration serverConfig) {
+ return new FlinkJobInvoker(executorService, serverConfig);
}
private final ListeningExecutorService executorService;
- private final String flinkMasterUrl;
+ private final FlinkJobServerDriver.ServerConfiguration serverConfig;
- private FlinkJobInvoker(ListeningExecutorService executorService, String
flinkMasterUrl) {
+ private FlinkJobInvoker(
+ ListeningExecutorService executorService,
+ FlinkJobServerDriver.ServerConfiguration serverConfig) {
this.executorService = executorService;
- this.flinkMasterUrl = flinkMasterUrl;
+ this.serverConfig = serverConfig;
}
@Override
@@ -61,7 +65,14 @@ public JobInvocation invoke(
String.format("%s_%s", flinkOptions.getJobName(),
UUID.randomUUID().toString());
LOG.info("Invoking job {}", invocationId);
- flinkOptions.setFlinkMaster(flinkMasterUrl);
+ if (FlinkPipelineOptions.AUTO.equals(flinkOptions.getFlinkMaster())) {
+ flinkOptions.setFlinkMaster(serverConfig.getFlinkMasterUrl());
+ }
+
+ PortablePipelineOptions portableOptions =
flinkOptions.as(PortablePipelineOptions.class);
+ if (portableOptions.getSdkWorkerParallelism() == null) {
+
portableOptions.setSdkWorkerParallelism(serverConfig.getSdkWorkerParallelism());
+ }
flinkOptions.setRunner(null);
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
index 0f567f6c6d5..5d82587dc72 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
@@ -22,6 +22,7 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
+import java.nio.file.Paths;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.apache.beam.model.pipeline.v1.Endpoints;
@@ -32,6 +33,7 @@
import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.kohsuke.args4j.CmdLineException;
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
@@ -62,7 +64,8 @@
private int artifactPort = 8098;
@Option(name = "--artifacts-dir", usage = "The location to store staged
artifact files")
- private String artifactStagingPath = "/tmp/beam-artifact-staging";
+ private String artifactStagingPath =
+ Paths.get(System.getProperty("java.io.tmpdir"),
"beam-artifact-staging").toString();
@Option(
name = "--clean-artifacts-per-job",
@@ -72,6 +75,20 @@
@Option(name = "--flink-master-url", usage = "Flink master url to submit
job.")
private String flinkMasterUrl = "[auto]";
+
+ public String getFlinkMasterUrl() {
+ return this.flinkMasterUrl;
+ }
+
+ @Option(
+ name = "--sdk-worker-parallelism",
+ usage = "Default parallelism for SDK worker processes (see portable
pipeline options)"
+ )
+ private String sdkWorkerParallelism =
PortablePipelineOptions.SDK_WORKER_PARALLELISM_PIPELINE;
+
+ public String getSdkWorkerParallelism() {
+ return this.sdkWorkerParallelism;
+ }
}
public static void main(String[] args) throws Exception {
@@ -232,7 +249,7 @@ private InMemoryJobService createJobService() throws
IOException {
return artifactStagingService;
}
- private JobInvoker createJobInvoker() throws IOException {
- return FlinkJobInvoker.create(executor, configuration.flinkMasterUrl);
+ private JobInvoker createJobInvoker() {
+ return FlinkJobInvoker.create(executor, configuration);
}
}
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
index 81b0e41bb5c..9c985a521c2 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -31,6 +31,8 @@
public interface FlinkPipelineOptions
extends PipelineOptions, ApplicationNameOptions, StreamingOptions {
+ String AUTO = "[auto]";
+
/**
* List of local files to make available to workers.
*
@@ -56,7 +58,7 @@
"Address of the Flink Master where the Pipeline should be executed. Can"
+ " either be of the form \"host:port\" or one of the special values
[local], "
+ "[collection] or [auto].")
- @Default.String("[auto]")
+ @Default.String(AUTO)
String getFlinkMaster();
void setFlinkMaster(String value);
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortablePipelineTranslator.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortablePipelineTranslator.java
index 45ee3cca68a..edbc6d31251 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortablePipelineTranslator.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortablePipelineTranslator.java
@@ -34,6 +34,8 @@
/** The context used for pipeline translation. */
interface TranslationContext {
JobInfo getJobInfo();
+
+ FlinkPipelineOptions getPipelineOptions();
}
/** Translates the given pipeline. */
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
index 3ce901c678c..3f4f7c8e34c 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
@@ -35,7 +35,6 @@
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.core.construction.PTransformTranslation;
-import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.core.construction.RehydratedComponents;
import org.apache.beam.runners.core.construction.RunnerPCollectionView;
import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
@@ -59,7 +58,6 @@
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.ViewFn;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.join.UnionCoder;
@@ -97,16 +95,9 @@
* {@link StreamExecutionEnvironment}.
*/
public static StreamingTranslationContext createTranslationContext(
- JobInfo jobInfo, List<String> filesToStage) {
- PipelineOptions pipelineOptions;
- try {
- pipelineOptions =
PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions());
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
+ JobInfo jobInfo, FlinkPipelineOptions pipelineOptions, List<String>
filesToStage) {
StreamExecutionEnvironment executionEnvironment =
- FlinkExecutionEnvironments.createStreamExecutionEnvironment(
- pipelineOptions.as(FlinkPipelineOptions.class), filesToStage);
+
FlinkExecutionEnvironments.createStreamExecutionEnvironment(pipelineOptions,
filesToStage);
return new StreamingTranslationContext(jobInfo, pipelineOptions,
executionEnvironment);
}
@@ -118,12 +109,14 @@ public static StreamingTranslationContext
createTranslationContext(
implements FlinkPortablePipelineTranslator.TranslationContext {
private final JobInfo jobInfo;
- private final PipelineOptions options;
+ private final FlinkPipelineOptions options;
private final StreamExecutionEnvironment executionEnvironment;
private final Map<String, DataStream<?>> dataStreams;
private StreamingTranslationContext(
- JobInfo jobInfo, PipelineOptions options, StreamExecutionEnvironment
executionEnvironment) {
+ JobInfo jobInfo,
+ FlinkPipelineOptions options,
+ StreamExecutionEnvironment executionEnvironment) {
this.jobInfo = jobInfo;
this.options = options;
this.executionEnvironment = executionEnvironment;
@@ -135,7 +128,8 @@ public JobInfo getJobInfo() {
return jobInfo;
}
- public PipelineOptions getPipelineOptions() {
+ @Override
+ public FlinkPipelineOptions getPipelineOptions() {
return options;
}
@@ -529,7 +523,7 @@ private void translateImpulse(
context.getPipelineOptions(),
stagePayload,
context.getJobInfo(),
- FlinkExecutableStageContext.factory(),
+ FlinkExecutableStageContext.factory(context.getPipelineOptions()),
collectionIdToTupleTag);
if (transformedSideInputs.unionTagToView.isEmpty()) {
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java
index 015af26c2e1..aac8cf91ed8 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java
@@ -34,7 +34,7 @@
class FlinkDefaultExecutableStageContext implements
FlinkExecutableStageContext, AutoCloseable {
private final JobBundleFactory jobBundleFactory;
- private static FlinkDefaultExecutableStageContext create(JobInfo jobInfo)
throws Exception {
+ private static FlinkDefaultExecutableStageContext create(JobInfo jobInfo) {
JobBundleFactory jobBundleFactory =
DefaultJobBundleFactory.create(
jobInfo,
@@ -74,4 +74,7 @@ public FlinkExecutableStageContext get(JobInfo jobInfo) {
return actualFactory.get(jobInfo);
}
}
+
+ static final Factory MULTI_INSTANCE_FACTORY =
+ (jobInfo) -> FlinkDefaultExecutableStageContext.create(jobInfo);
}
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageContext.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageContext.java
index 72586c018d5..4981430288a 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageContext.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageContext.java
@@ -19,8 +19,10 @@
import java.io.Serializable;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
/** The Flink context required in order to execute {@link ExecutableStage
stages}. */
public interface FlinkExecutableStageContext extends AutoCloseable {
@@ -35,8 +37,14 @@
FlinkExecutableStageContext get(JobInfo jobInfo);
}
- static Factory factory() {
- return
FlinkDefaultExecutableStageContext.ReferenceCountingFactory.REFERENCE_COUNTING;
+ static Factory factory(FlinkPipelineOptions options) {
+ PortablePipelineOptions portableOptions =
options.as(PortablePipelineOptions.class);
+ if (PortablePipelineOptions.SDK_WORKER_PARALLELISM_STAGE.equals(
+ portableOptions.getSdkWorkerParallelism())) {
+ return FlinkDefaultExecutableStageContext.MULTI_INSTANCE_FACTORY;
+ } else {
+ return
FlinkDefaultExecutableStageContext.ReferenceCountingFactory.REFERENCE_COUNTING;
+ }
}
StageBundleFactory getStageBundleFactory(ExecutableStage executableStage);
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
index 582c6d5622a..09c45fa719a 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
@@ -43,6 +43,8 @@
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Flink operator that passes its input DataSet through an SDK-executed {@link
@@ -54,6 +56,7 @@
*/
public class FlinkExecutableStageFunction<InputT>
extends RichMapPartitionFunction<WindowedValue<InputT>, RawUnionValue> {
+ private static final Logger LOG =
LoggerFactory.getLogger(FlinkExecutableStageFunction.class);
// Main constructor fields. All must be Serializable because Flink
distributes Functions to
// task managers via java serialization.
@@ -141,8 +144,17 @@ public void mapPartition(
@Override
public void close() throws Exception {
- try (AutoCloseable bundleFactoryCloser = stageBundleFactory) {}
- try (AutoCloseable closable = stageContext) {}
+ // close may be called multiple times when an exception is thrown
+ if (stageContext != null) {
+ try (@SuppressWarnings("unused")
+ AutoCloseable bundleFactoryCloser = stageBundleFactory;
+ @SuppressWarnings("unused")
+ AutoCloseable closable = stageContext) {
+ } catch (Exception e) {
+ LOG.error("Error in close: ", e);
+ throw e;
+ }
+ }
stageContext = null;
}
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java
index 2ba0902cbaa..fc71d0c0c0d 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java
@@ -318,6 +318,8 @@ public void testSerialization() {
new DoFnOperator.MultiOutputOutputManagerFactory(
mainOutput, tagsToOutputTags, tagsToCoders, tagsToIds);
+ FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+
ExecutableStageDoFnOperator<Integer, Integer> operator =
new ExecutableStageDoFnOperator<>(
"transform",
@@ -330,10 +332,10 @@ public void testSerialization() {
Collections.emptyMap() /* sideInputTagMapping */,
Collections.emptyList() /* sideInputs */,
Collections.emptyMap() /* sideInputId mapping */,
- PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+ options,
stagePayload,
jobInfo,
- FlinkExecutableStageContext.factory(),
+ FlinkExecutableStageContext.factory(options),
createOutputMap(mainOutput, ImmutableList.of(additionalOutput)));
ExecutableStageDoFnOperator<Integer, Integer> clone =
SerializationUtils.clone(operator);
diff --git
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
index e24a3319fab..6cbff13c9e5 100644
---
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
+++
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
@@ -172,6 +172,7 @@ public StageBundleFactory forStage(ExecutableStage
executableStage) {
@Override
public void close() throws Exception {
// Clear the cache. This closes all active environments.
+ // note this may cause open calls to be cancelled by the peer
environmentCache.invalidateAll();
environmentCache.cleanUp();
@@ -182,6 +183,7 @@ public void close() throws Exception {
loggingServer.close();
retrievalServer.close();
provisioningServer.close();
+
executor.shutdown();
}
@@ -306,9 +308,9 @@ private void checkAndInitialize(
return;
}
- EnvironmentFactory.Provider environmentProviderFactory =
+ EnvironmentFactory.Provider environmentFactoryProvider =
environmentFactoryProviderMap.get(environment.getUrn());
- ServerFactory serverFactory =
environmentProviderFactory.getServerFactory();
+ ServerFactory serverFactory =
environmentFactoryProvider.getServerFactory();
this.clientPool = MapControlClientPool.create();
this.executor = Executors.newCachedThreadPool();
@@ -334,15 +336,13 @@ private void checkAndInitialize(
GrpcFnServer.allocatePortAndCreateFor(GrpcStateService.create(),
serverFactory);
this.environmentFactory =
- environmentFactoryProviderMap
- .get(environment.getUrn())
- .createEnvironmentFactory(
- controlServer,
- loggingServer,
- retrievalServer,
- provisioningServer,
- clientPool,
- stageIdGenerator);
+ environmentFactoryProvider.createEnvironmentFactory(
+ controlServer,
+ loggingServer,
+ retrievalServer,
+ provisioningServer,
+ clientPool,
+ stageIdGenerator);
this.environment = environment;
}
}
diff --git
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java
index 38b21469d78..633d7255a88 100644
---
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java
+++
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java
@@ -28,6 +28,7 @@
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
import org.apache.beam.model.fnexecution.v1.BeamFnStateGrpc;
import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.vendor.grpc.v1.io.grpc.stub.ServerCallStreamObserver;
import org.apache.beam.vendor.grpc.v1.io.grpc.stub.StreamObserver;
/** An implementation of the Beam Fn State service. */
@@ -51,6 +52,14 @@ public void close() throws Exception {
Exception thrown = null;
for (Inbound inbound : clients) {
try {
+ // the call may be cancelled because the sdk harness hung up
+ // (we terminate the environment before terminating the service
endpoints)
+ if (inbound.outboundObserver instanceof ServerCallStreamObserver) {
+ if (((ServerCallStreamObserver)
inbound.outboundObserver).isCancelled()) {
+ // skip to avoid call already closed exception
+ continue;
+ }
+ }
inbound.outboundObserver.onCompleted();
} catch (Exception t) {
if (thrown == null) {
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java
index bdb1caf3b4d..5107c389bb1 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java
@@ -59,7 +59,7 @@
+ "Possible options are DOCKER and PROCESS.")
String getDefaultEnvironmentType();
- void setDefaultEnvironmentType(String envitonmentType);
+ void setDefaultEnvironmentType(String environmentType);
@Description(
"Set environment configuration for running the user code.\n"
@@ -72,4 +72,19 @@
String getDefaultEnvironmentConfig();
void setDefaultEnvironmentConfig(@Nullable String config);
+
+ String SDK_WORKER_PARALLELISM_PIPELINE = "pipeline";
+ String SDK_WORKER_PARALLELISM_STAGE = "stage";
+
+ @Description(
+ "SDK worker/harness process parallelism. Currently supported options are
"
+ + "<null> (let the runner decide) or '"
+ + SDK_WORKER_PARALLELISM_PIPELINE
+ + "' (single SDK harness process per pipeline and runner process) or
'"
+ + SDK_WORKER_PARALLELISM_STAGE
+ + "' (separate SDK harness for every executable stage).")
+ @Nullable
+ String getSdkWorkerParallelism();
+
+ void setSdkWorkerParallelism(@Nullable String parallelism);
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 150278)
Time Spent: 2.5h (was: 2h 20m)
> Flink runner per operator SDK harness option
> --------------------------------------------
>
> Key: BEAM-5520
> URL: https://issues.apache.org/jira/browse/BEAM-5520
> Project: Beam
> Issue Type: Improvement
> Components: runner-flink
> Reporter: Thomas Weise
> Assignee: Thomas Weise
> Priority: Major
> Labels: portability, portability-flink
> Time Spent: 2.5h
> Remaining Estimate: 0h
>
> For streaming pipelines, the runner currently uses a single SDK harness
> process for all subtasks of a job that get deployed on the same task manager.
> In common deployments with 16 or more tasks slots, many executable stage
> operators all use the same SDK harness process. To scale, we need an option
> to run separate harness processes per subtask.
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)