[ 
https://issues.apache.org/jira/browse/BEAM-5724?focusedWorklogId=160251&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-160251
 ]

ASF GitHub Bot logged work on BEAM-5724:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 29/Oct/18 22:24
            Start Date: 29/Oct/18 22:24
    Worklog Time Spent: 10m 
      Work Description: tweise closed pull request #6835: [BEAM-5724] 
Generalize flink executable context to allow more than 1 worker process per 
task manager 
URL: https://github.com/apache/beam/pull/6835
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
index 93dc6f0121c..024eee26932 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
@@ -33,7 +33,6 @@
 import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.options.PortablePipelineOptions;
 import org.kohsuke.args4j.CmdLineException;
 import org.kohsuke.args4j.CmdLineParser;
 import org.kohsuke.args4j.Option;
@@ -90,9 +89,9 @@ String getFlinkMasterUrl() {
       name = "--sdk-worker-parallelism",
       usage = "Default parallelism for SDK worker processes (see portable 
pipeline options)"
     )
-    String sdkWorkerParallelism = 
PortablePipelineOptions.SDK_WORKER_PARALLELISM_PIPELINE;
+    Long sdkWorkerParallelism = 1L;
 
-    String getSdkWorkerParallelism() {
+    Long getSdkWorkerParallelism() {
       return this.sdkWorkerParallelism;
     }
   }
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java
index e50d6f90566..b7bbd84f5c8 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java
@@ -17,7 +17,13 @@
  */
 package org.apache.beam.runners.flink.translation.functions;
 
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import org.apache.beam.model.pipeline.v1.RunnerApi.StandardEnvironments;
 import org.apache.beam.runners.core.construction.BeamUrns;
 import org.apache.beam.runners.core.construction.Environments;
@@ -30,6 +36,7 @@
 import 
org.apache.beam.runners.fnexecution.environment.EmbeddedEnvironmentFactory;
 import 
org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory;
 import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
 
 /** Implementation of a {@link FlinkExecutableStageContext}. */
 class FlinkDefaultExecutableStageContext implements 
FlinkExecutableStageContext, AutoCloseable {
@@ -64,19 +71,68 @@ public void close() throws Exception {
     jobBundleFactory.close();
   }
 
-  enum ReferenceCountingFactory implements Factory {
-    REFERENCE_COUNTING;
+  private static class JobFactoryState {
+    private int index = 0;
+    private final List<ReferenceCountingFlinkExecutableStageContextFactory> 
factories =
+        new ArrayList<>();
+    private final int maxFactories;
 
-    private static final ReferenceCountingFlinkExecutableStageContextFactory 
actualFactory =
-        ReferenceCountingFlinkExecutableStageContextFactory.create(
-            FlinkDefaultExecutableStageContext::create);
+    private JobFactoryState(int maxFactories) {
+      Preconditions.checkArgument(maxFactories >= 0, "sdk_worker_parallelism 
must be >= 0");
+
+      if (maxFactories == 0) {
+        // if this is 0, use the auto behavior of num_cores - 1 so that we 
leave some resources available for the java process
+        this.maxFactories = 
Math.max(Runtime.getRuntime().availableProcessors() - 1, 1);
+      } else {
+        this.maxFactories = maxFactories;
+      }
+    }
+
+    private synchronized FlinkExecutableStageContext.Factory getFactory() {
+      ReferenceCountingFlinkExecutableStageContextFactory factory;
+      // If we haven't yet created maxFactories factories, create a new one. 
Otherwise use an
+      // existing one from factories.
+      if (factories.size() < maxFactories) {
+        factory =
+            ReferenceCountingFlinkExecutableStageContextFactory.create(
+                FlinkDefaultExecutableStageContext::create);
+        factories.add(factory);
+      } else {
+        factory = factories.get(index);
+      }
+
+      index = (index + 1) % maxFactories;
+
+      return factory;
+    }
+  }
+
+  enum MultiInstanceFactory implements Factory {
+    MULTI_INSTANCE;
+
+    // This map should only ever have a single element, as each job will have 
its own
+    // classloader and therefore its own instance of 
MultiInstanceFactory.INSTANCE. This
+    // code supports multiple JobInfos in order to provide a sensible 
implementation of
+    // Factory.get(JobInfo), which in theory could be called with different 
JobInfos.
+    private static final ConcurrentMap<String, JobFactoryState> jobFactories =
+        new ConcurrentHashMap<>();
 
     @Override
     public FlinkExecutableStageContext get(JobInfo jobInfo) {
-      return actualFactory.get(jobInfo);
+      JobFactoryState state =
+          jobFactories.computeIfAbsent(
+              jobInfo.jobId(),
+              k -> {
+                PortablePipelineOptions portableOptions =
+                    
PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions())
+                        .as(PortablePipelineOptions.class);
+
+                return new JobFactoryState(
+                    
MoreObjects.firstNonNull(portableOptions.getSdkWorkerParallelism(), 1L)
+                        .intValue());
+              });
+
+      return state.getFactory().get(jobInfo);
     }
   }
-
-  static final Factory MULTI_INSTANCE_FACTORY =
-      (jobInfo) -> FlinkDefaultExecutableStageContext.create(jobInfo);
 }
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageContext.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageContext.java
index 4981430288a..c3bc0abe2c3 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageContext.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageContext.java
@@ -20,9 +20,9 @@
 import java.io.Serializable;
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
 import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkDefaultExecutableStageContext.MultiInstanceFactory;
 import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
 import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
-import org.apache.beam.sdk.options.PortablePipelineOptions;
 
 /** The Flink context required in order to execute {@link ExecutableStage 
stages}. */
 public interface FlinkExecutableStageContext extends AutoCloseable {
@@ -38,13 +38,7 @@
   }
 
   static Factory factory(FlinkPipelineOptions options) {
-    PortablePipelineOptions portableOptions = 
options.as(PortablePipelineOptions.class);
-    if (PortablePipelineOptions.SDK_WORKER_PARALLELISM_STAGE.equals(
-        portableOptions.getSdkWorkerParallelism())) {
-      return FlinkDefaultExecutableStageContext.MULTI_INSTANCE_FACTORY;
-    } else {
-      return 
FlinkDefaultExecutableStageContext.ReferenceCountingFactory.REFERENCE_COUNTING;
-    }
+    return MultiInstanceFactory.MULTI_INSTANCE;
   }
 
   StageBundleFactory getStageBundleFactory(ExecutableStage executableStage);
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java
index 90d291ea28a..e22a724a82a 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java
@@ -185,10 +185,11 @@ void release(FlinkExecutableStageContext context) {
   /**
    * {@link WrappedContext} does not expose equals of actual {@link 
FlinkExecutableStageContext}.
    */
-  private class WrappedContext implements FlinkExecutableStageContext {
+  @VisibleForTesting
+  class WrappedContext implements FlinkExecutableStageContext {
     private JobInfo jobInfo;
     private AtomicInteger referenceCount;
-    private FlinkExecutableStageContext context;
+    @VisibleForTesting FlinkExecutableStageContext context;
 
     /** {@link WrappedContext#equals(Object)} is only based on {@link 
JobInfo#jobId()}. */
     WrappedContext(JobInfo jobInfo, FlinkExecutableStageContext context) {
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkJobServerDriverTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkJobServerDriverTest.java
index fc44d8edf31..fab70c7d9b9 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkJobServerDriverTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkJobServerDriverTest.java
@@ -17,8 +17,6 @@
  */
 package org.apache.beam.runners.flink;
 
-import static 
org.apache.beam.sdk.options.PortablePipelineOptions.SDK_WORKER_PARALLELISM_PIPELINE;
-import static 
org.apache.beam.sdk.options.PortablePipelineOptions.SDK_WORKER_PARALLELISM_STAGE;
 import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.core.Is.is;
@@ -46,7 +44,7 @@ public void testConfigurationDefaults() {
     assertThat(config.port, is(8099));
     assertThat(config.artifactPort, is(8098));
     assertThat(config.flinkMasterUrl, is("[auto]"));
-    assertThat(config.sdkWorkerParallelism, 
is(SDK_WORKER_PARALLELISM_PIPELINE));
+    assertThat(config.sdkWorkerParallelism, is(1L));
     assertThat(config.cleanArtifactsPerJob, is(false));
     FlinkJobServerDriver flinkJobServerDriver = 
FlinkJobServerDriver.fromConfig(config);
     assertThat(flinkJobServerDriver, is(not(nullValue())));
@@ -63,14 +61,14 @@ public void testConfigurationFromArgs() {
               "--artifact-port",
               "43",
               "--flink-master-url=jobmanager",
-              "--sdk-worker-parallelism=stage",
+              "--sdk-worker-parallelism=4",
               "--clean-artifacts-per-job",
             });
     assertThat(driver.configuration.host, is("test"));
     assertThat(driver.configuration.port, is(42));
     assertThat(driver.configuration.artifactPort, is(43));
     assertThat(driver.configuration.flinkMasterUrl, is("jobmanager"));
-    assertThat(driver.configuration.sdkWorkerParallelism, 
is(SDK_WORKER_PARALLELISM_STAGE));
+    assertThat(driver.configuration.sdkWorkerParallelism, is(4L));
     assertThat(driver.configuration.cleanArtifactsPerJob, is(true));
   }
 
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContextTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContextTest.java
new file mode 100644
index 00000000000..4960477c88f
--- /dev/null
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContextTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkDefaultExecutableStageContext.MultiInstanceFactory;
+import 
org.apache.beam.runners.flink.translation.functions.ReferenceCountingFlinkExecutableStageContextFactory.WrappedContext;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
+import org.apache.beam.vendor.protobuf.v3.com.google.protobuf.Struct;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link FlinkDefaultExecutableStageContext}. */
+@RunWith(JUnit4.class)
+public class FlinkDefaultExecutableStageContextTest {
+  private static JobInfo constructJobInfo(String jobId, long parallelism) {
+    PortablePipelineOptions portableOptions =
+        PipelineOptionsFactory.as(PortablePipelineOptions.class);
+    portableOptions.setSdkWorkerParallelism(parallelism);
+
+    Struct pipelineOptions = 
PipelineOptionsTranslation.toProto(portableOptions);
+    return JobInfo.create(jobId, "job-name", "retrieval-token", 
pipelineOptions);
+  }
+
+  @Test
+  public void testMultiInstanceFactory() {
+    JobInfo jobInfo = constructJobInfo("multi-instance-factory-test", 2);
+
+    WrappedContext f1 = (WrappedContext) 
MultiInstanceFactory.MULTI_INSTANCE.get(jobInfo);
+    WrappedContext f2 = (WrappedContext) 
MultiInstanceFactory.MULTI_INSTANCE.get(jobInfo);
+    WrappedContext f3 = (WrappedContext) 
MultiInstanceFactory.MULTI_INSTANCE.get(jobInfo);
+
+    Assert.assertNotEquals("We should create two different factories", 
f1.context, f2.context);
+    Assert.assertEquals(
+        "Future calls should be round-robbined to those two factories", 
f1.context, f3.context);
+  }
+
+  @Test
+  public void testDefault() {
+    JobInfo jobInfo = constructJobInfo("default-test", 0);
+
+    int expectedParallelism = Math.max(1, 
Runtime.getRuntime().availableProcessors() - 1);
+
+    WrappedContext f1 = (WrappedContext) 
MultiInstanceFactory.MULTI_INSTANCE.get(jobInfo);
+    for (int i = 1; i < expectedParallelism; i++) {
+      Assert.assertNotEquals(
+          "We should create " + expectedParallelism + " different factories",
+          f1.context,
+          ((WrappedContext) 
MultiInstanceFactory.MULTI_INSTANCE.get(jobInfo)).context);
+    }
+
+    Assert.assertEquals(
+        "Future calls should be round-robbined to those",
+        f1.context,
+        ((WrappedContext) 
MultiInstanceFactory.MULTI_INSTANCE.get(jobInfo)).context);
+  }
+}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java
index a8dfa8e0665..edd9c7130c5 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java
@@ -73,20 +73,13 @@
 
   void setDefaultEnvironmentConfig(@Nullable String config);
 
-  String SDK_WORKER_PARALLELISM_PIPELINE = "pipeline";
-  String SDK_WORKER_PARALLELISM_STAGE = "stage";
-
   @Description(
-      "SDK worker/harness process parallelism. Currently supported options are 
"
-          + "<null> (let the runner decide) or '"
-          + SDK_WORKER_PARALLELISM_PIPELINE
-          + "' (single SDK harness process per pipeline and runner process) or 
'"
-          + SDK_WORKER_PARALLELISM_STAGE
-          + "' (separate SDK harness for every executable stage).")
+      "Sets the number of sdk worker processes that will run on each worker 
node. Default is 1. If"
+          + " 0, it will be automatically set according to the number of CPU 
cores on the worker.")
   @Nullable
-  String getSdkWorkerParallelism();
+  Long getSdkWorkerParallelism();
 
-  void setSdkWorkerParallelism(@Nullable String parallelism);
+  void setSdkWorkerParallelism(@Nullable Long parallelism);
 
   @Description("Duration in milliseconds for environment cache within a job. 0 
means no caching.")
   @Default.Integer(0)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 160251)
    Time Spent: 4h 20m  (was: 4h 10m)

> Beam creates too many sdk_worker processes with --sdk-worker-parallelism=stage
> ------------------------------------------------------------------------------
>
>                 Key: BEAM-5724
>                 URL: https://issues.apache.org/jira/browse/BEAM-5724
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-flink
>            Reporter: Micah Wylde
>            Assignee: Micah Wylde
>            Priority: Major
>              Labels: portability-flink
>          Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> In the flink portable runner, we currently support two options for sdk worker 
> parallelism (how many python worker processes we run). The default is one per 
> taskmanager, and with --sdk-worker-parallelism=stage you get one per stage. 
> However, for complex pipelines with many beam operators that get fused into a 
> single flink task this can produce hundreds of worker processes per TM.
> Flink uses the notion of task slots to limit resource utilization on a box; I 
> think that beam should try to respect those limits as well. I think ideally 
> we'd produce a single python worker per task slot/flink operator chain.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to