[
https://issues.apache.org/jira/browse/BEAM-3089?focusedWorklogId=111771&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111771
]
ASF GitHub Bot logged work on BEAM-3089:
----------------------------------------
Author: ASF GitHub Bot
Created on: 14/Jun/18 04:55
Start Date: 14/Jun/18 04:55
Worklog Time Spent: 10m
Work Description: stale[bot] closed pull request #4766: [BEAM-3089] Fix
job parallelism resolution
URL: https://github.com/apache/beam/pull/4766
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/runners/flink/build.gradle b/runners/flink/build.gradle
index d6673ae246d..1ed3ac32ad8 100644
--- a/runners/flink/build.gradle
+++ b/runners/flink/build.gradle
@@ -93,7 +93,11 @@ def createValidatesRunnerTask(Map m) {
group = "Verification"
def runnerType = config.streaming ? "streaming" : "batch"
description = "Validates the ${runnerType} runner"
- def pipelineOptions = JsonOutput.toJson(["--runner=TestFlinkRunner",
"--streaming=${config.streaming}"])
+ def pipelineOptions = JsonOutput.toJson([
+ "--runner=TestFlinkRunner",
+ "--streaming=${config.streaming}",
+ "--parallelism=1"
+ ])
systemProperty "beamTestPipelineOptions", pipelineOptions
classpath = configurations.validatesRunner
testClassesDirs =
files(project(":sdks:java:core").sourceSets.test.output.classesDirs)
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index 610bc9d200d..dac092ee44c 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -72,7 +72,8 @@
<beamTestPipelineOptions>
[
"--runner=TestFlinkRunner",
- "--streaming=false"
+ "--streaming=false",
+ "--parallelism=1"
]
</beamTestPipelineOptions>
</systemPropertyVariables>
@@ -104,7 +105,8 @@
<beamTestPipelineOptions>
[
"--runner=TestFlinkRunner",
- "--streaming=true"
+ "--streaming=true",
+ "--parallelism=1"
]
</beamTestPipelineOptions>
</systemPropertyVariables>
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java
deleted file mode 100644
index b745f0bd441..00000000000
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import org.apache.beam.sdk.options.DefaultValueFactory;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.GlobalConfiguration;
-
-/**
- * {@link DefaultValueFactory} for getting a default value for the parallelism
option
- * on {@link FlinkPipelineOptions}.
- *
- * <p>This will return either the default value from {@link
GlobalConfiguration} or {@code 1}.
- * A valid {@link GlobalConfiguration} is only available if the program is
executed by the Flink
- * run scripts.
- */
-public class DefaultParallelismFactory implements DefaultValueFactory<Integer>
{
- @Override
- public Integer create(PipelineOptions options) {
- return GlobalConfiguration.loadConfiguration()
- .getInteger(ConfigConstants.DEFAULT_PARALLELISM_KEY, 1);
- }
-}
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
index b2cbefbc5b0..51c81650ba9 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -60,7 +60,7 @@
void setFlinkMaster(String value);
@Description("The degree of parallelism to be used when distributing
operations onto workers.")
- @Default.InstanceFactory(DefaultParallelismFactory.class)
+ @Default.Integer(-1)
Integer getParallelism();
void setParallelism(Integer value);
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
index 47d44947498..b7ca1ca2f04 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
@@ -50,6 +50,7 @@ public static TestFlinkRunner create(boolean streaming) {
FlinkPipelineOptions flinkOptions =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
flinkOptions.setRunner(TestFlinkRunner.class);
flinkOptions.setStreaming(streaming);
+ flinkOptions.setParallelism(1);
return TestFlinkRunner.fromOptions(flinkOptions);
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 111771)
Time Spent: 1h 20m (was: 1h 10m)
> Issue with setting the parallelism at client level using Flink runner
> ---------------------------------------------------------------------
>
> Key: BEAM-3089
> URL: https://issues.apache.org/jira/browse/BEAM-3089
> Project: Beam
> Issue Type: Bug
> Components: runner-flink
> Affects Versions: 2.0.0
> Environment: I am using Flink 1.2.1 running on Docker, with Task
> Managers distributed across different VMs as part of a Docker Swarm.
> Reporter: Thalita Vergilio
> Assignee: Grzegorz KoĊakowski
> Priority: Major
> Labels: docker, flink, parallel-deployment
> Attachments: flink-ui-parallelism.png
>
> Time Spent: 1h 20m
> Remaining Estimate: 0h
>
> When uploading an Apache Beam application using the Flink Web UI, the
> parallelism set at job submission doesn't get picked up. The same happens
> when submitting a job using the Flink CLI.
> In both cases, the parallelism ends up defaulting to 1.
> When I set the parallelism programmatically within the Apache Beam code, it
> works: {{flinkPipelineOptions.setParallelism(4);}}
> I suspect the root of the problem may be in the
> org.apache.beam.runners.flink.DefaultParallelismFactory class, as it checks
> for Flink's GlobalConfiguration, which may not pick up runtime values passed
> to Flink, then defaults to 1 if it doesn't find anything.
> Any ideas on how this could be fixed or worked around? I need to be able to
> change the parallelism dynamically, so the programmatic approach won't really
> work for me, nor will setting the Flink configuration at system level.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)