This is an automated email from the ASF dual-hosted git repository.
goenka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new f3623e8 [BEAM-6853] Make sdkWorkerParallelism option consistent
new fe676c5 Merge pull request #8286 from angoenka/portable_options
f3623e8 is described below
commit f3623e8ba2257f7659ccb312dc2574f862ef41b5
Author: Ankur Goenka <[email protected]>
AuthorDate: Thu Apr 11 16:49:38 2019 -0700
[BEAM-6853] Make sdkWorkerParallelism option consistent
---
.../apache/beam/runners/flink/FlinkJobInvoker.java | 2 +-
.../fnexecution/jobsubmission/JobServerDriver.java | 10 +++++++---
.../beam/sdk/options/PortablePipelineOptions.java | 6 +++---
.../python/apache_beam/options/pipeline_options.py | 23 ++++++----------------
4 files changed, 17 insertions(+), 24 deletions(-)
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
index 3dc22c2..1db7806 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
@@ -70,7 +70,7 @@ public class FlinkJobInvoker extends JobInvoker {
}
PortablePipelineOptions portableOptions =
flinkOptions.as(PortablePipelineOptions.class);
- if (portableOptions.getSdkWorkerParallelism() == null) {
+ if (portableOptions.getSdkWorkerParallelism() == 0L) {
portableOptions.setSdkWorkerParallelism(serverConfig.getSdkWorkerParallelism());
}
diff --git
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobServerDriver.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobServerDriver.java
index 2724910..53f4ab8 100644
---
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobServerDriver.java
+++
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobServerDriver.java
@@ -96,8 +96,12 @@ public abstract class JobServerDriver implements Runnable {
@Option(
name = "--sdk-worker-parallelism",
- usage = "Default parallelism for SDK worker processes (see portable
pipeline options)")
- private Long sdkWorkerParallelism = 1L;
+ usage =
+ "Default parallelism for SDK worker processes. This option is only
applied when the "
+ + "pipeline option sdkWorkerParallelism is set to 0."
+ + "Default is 1, If 0, worker parallelism will be dynamically
decided by runner."
+ + "See also: sdkWorkerParallelism Pipeline Option")
+ private long sdkWorkerParallelism = 1L;
public String getHost() {
return host;
@@ -123,7 +127,7 @@ public abstract class JobServerDriver implements Runnable {
return cleanArtifactsPerJob;
}
- public Long getSdkWorkerParallelism() {
+ public long getSdkWorkerParallelism() {
return this.sdkWorkerParallelism;
}
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java
index 75c55fc..3f3cfcf 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java
@@ -77,10 +77,10 @@ public interface PortablePipelineOptions extends
PipelineOptions {
"Sets the number of sdk worker processes that will run on each worker
node. Default is 1. If"
+ " 0, it will be automatically set by the runner by looking at
different parameters "
+ "(e.g. number of CPU cores on the worker machine).")
- @Nullable
- Long getSdkWorkerParallelism();
+ @Default.Long(1L)
+ long getSdkWorkerParallelism();
- void setSdkWorkerParallelism(@Nullable Long parallelism);
+ void setSdkWorkerParallelism(long parallelism);
@Description("Duration in milliseconds for environment cache within a job. 0
means no caching.")
@Default.Integer(0)
diff --git a/sdks/python/apache_beam/options/pipeline_options.py
b/sdks/python/apache_beam/options/pipeline_options.py
index 745dcad..fde1a7e 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -780,7 +780,9 @@ class SetupOptions(PipelineOptions):
class PortableOptions(PipelineOptions):
-
+ """Portable options are common options expected to be understood by most of
+ the portable runners.
+ """
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument('--job_endpoint',
@@ -801,30 +803,17 @@ class PortableOptions(PipelineOptions):
'"<ENV_VAL>"} }. All fields in the json are optional except '
'command.'))
parser.add_argument(
- '--sdk_worker_parallelism', default=None,
+ '--sdk_worker_parallelism', default=0,
help=('Sets the number of sdk worker processes that will run on each '
- 'worker node. Default is 1. If 0, it will be automatically set '
+ 'worker node. Default is 0. If 0, it will be automatically set '
'by the runner by looking at different parameters (e.g. number '
- 'of CPU cores on the worker machine).'))
+ 'of CPU cores on the worker machine or configuration).'))
parser.add_argument(
'--environment_cache_millis', default=0,
help=('Duration in milliseconds for environment cache within a job. '
'0 means no caching.'))
-class RunnerOptions(PipelineOptions):
- """Runner options are provided by the job service.
-
- The SDK has no a priori knowledge of runner options.
- It should be able to work with any portable runner.
- Runner specific options are discovered from the job service endpoint.
- """
- @classmethod
- def _add_argparse_args(cls, parser):
- # TODO: help option to display discovered options
- pass
-
-
class TestOptions(PipelineOptions):
@classmethod