[ 
https://issues.apache.org/jira/browse/BEAM-3089?focusedWorklogId=145731&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-145731
 ]

ASF GitHub Bot logged work on BEAM-3089:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/Sep/18 17:19
            Start Date: 19/Sep/18 17:19
    Worklog Time Spent: 10m 
      Work Description: tweise closed pull request #6426: [BEAM-3089] Fix 
default values in FlinkPipelineOptions / Add tests
URL: https://github.com/apache/beam/pull/6426
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java
deleted file mode 100644
index d448bed2333..00000000000
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import org.apache.beam.sdk.options.DefaultValueFactory;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.GlobalConfiguration;
-
-/**
- * {@link DefaultValueFactory} for getting a default value for the parallelism 
option on {@link
- * FlinkPipelineOptions}.
- *
- * <p>This will return either the default value from {@link 
GlobalConfiguration} or {@code 1}. A
- * valid {@link GlobalConfiguration} is only available if the program is 
executed by the Flink run
- * scripts.
- */
-public class DefaultParallelismFactory implements DefaultValueFactory<Integer> 
{
-  @Override
-  public Integer create(PipelineOptions options) {
-    return GlobalConfiguration.loadConfiguration()
-        .getInteger(ConfigConstants.DEFAULT_PARALLELISM_KEY, 1);
-  }
-}
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
index 4ace1eccc37..40a8d51ee40 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
@@ -17,12 +17,16 @@
  */
 package org.apache.beam.runners.flink;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Splitter;
 import java.util.List;
+import javax.annotation.Nullable;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.java.CollectionEnvironment;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.streaming.api.TimeCharacteristic;
@@ -42,6 +46,12 @@
    */
   public static ExecutionEnvironment createBatchExecutionEnvironment(
       FlinkPipelineOptions options, List<String> filesToStage) {
+    return createBatchExecutionEnvironment(options, filesToStage, null);
+  }
+
+  @VisibleForTesting
+  static ExecutionEnvironment createBatchExecutionEnvironment(
+      FlinkPipelineOptions options, List<String> filesToStage, @Nullable 
String confDir) {
 
     LOG.info("Creating a Batch Execution Environment.");
 
@@ -71,9 +81,18 @@ public static ExecutionEnvironment 
createBatchExecutionEnvironment(
     if (options.getParallelism() != -1 && !(flinkBatchEnv instanceof 
CollectionEnvironment)) {
       flinkBatchEnv.setParallelism(options.getParallelism());
     }
+    // Set the correct parallelism, required by UnboundedSourceWrapper to 
generate consistent splits.
+    final int parallelism;
+    if (flinkBatchEnv instanceof CollectionEnvironment) {
+      parallelism = 1;
+    } else {
+      parallelism =
+          determineParallelism(options.getParallelism(), 
flinkBatchEnv.getParallelism(), confDir);
+    }
 
+    flinkBatchEnv.setParallelism(parallelism);
     // set parallelism in the options (required by some execution code)
-    options.setParallelism(flinkBatchEnv.getParallelism());
+    options.setParallelism(parallelism);
 
     if (options.getObjectReuse()) {
       flinkBatchEnv.getConfig().enableObjectReuse();
@@ -93,6 +112,12 @@ public static ExecutionEnvironment 
createBatchExecutionEnvironment(
    */
   public static StreamExecutionEnvironment createStreamExecutionEnvironment(
       FlinkPipelineOptions options, List<String> filesToStage) {
+    return createStreamExecutionEnvironment(options, filesToStage, null);
+  }
+
+  @VisibleForTesting
+  static StreamExecutionEnvironment createStreamExecutionEnvironment(
+      FlinkPipelineOptions options, List<String> filesToStage, @Nullable 
String flinkConfigDir) {
 
     LOG.info("Creating a Streaming Environment.");
 
@@ -119,13 +144,13 @@ public static StreamExecutionEnvironment 
createStreamExecutionEnvironment(
       flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
     }
 
-    // set the correct parallelism.
-    if (options.getParallelism() != -1) {
-      flinkStreamEnv.setParallelism(options.getParallelism());
-    }
-
+    // Set the parallelism, required by UnboundedSourceWrapper to generate 
consistent splits.
+    final int parallelism =
+        determineParallelism(
+            options.getParallelism(), flinkStreamEnv.getParallelism(), 
flinkConfigDir);
+    flinkStreamEnv.setParallelism(parallelism);
     // set parallelism in the options (required by some execution code)
-    options.setParallelism(flinkStreamEnv.getParallelism());
+    options.setParallelism(parallelism);
 
     if (options.getObjectReuse()) {
       flinkStreamEnv.getConfig().enableObjectReuse();
@@ -156,9 +181,11 @@ public static StreamExecutionEnvironment 
createStreamExecutionEnvironment(
         throw new IllegalArgumentException("The checkpoint interval must be 
positive");
       }
       flinkStreamEnv.enableCheckpointing(checkpointInterval, 
options.getCheckpointingMode());
-      flinkStreamEnv
-          .getCheckpointConfig()
-          .setCheckpointTimeout(options.getCheckpointTimeoutMillis());
+      if (options.getCheckpointTimeoutMillis() != -1) {
+        flinkStreamEnv
+            .getCheckpointConfig()
+            .setCheckpointTimeout(options.getCheckpointTimeoutMillis());
+      }
       boolean externalizedCheckpoint = 
options.isExternalizedCheckpointsEnabled();
       boolean retainOnCancellation = 
options.getRetainExternalizedCheckpointsOnCancellation();
       if (externalizedCheckpoint) {
@@ -189,6 +216,35 @@ public static StreamExecutionEnvironment 
createStreamExecutionEnvironment(
     return flinkStreamEnv;
   }
 
+  private static int determineParallelism(
+      final int pipelineOptionsParallelism,
+      final int envParallelism,
+      @Nullable String flinkConfDir) {
+    if (pipelineOptionsParallelism > 0) {
+      return pipelineOptionsParallelism;
+    }
+    if (envParallelism > 0) {
+      // If the user supplies a parallelism on the command-line, this is set 
on the execution environment during creation
+      return envParallelism;
+    }
+
+    final Configuration configuration;
+    if (flinkConfDir == null) {
+      configuration = GlobalConfiguration.loadConfiguration();
+    } else {
+      configuration = GlobalConfiguration.loadConfiguration(flinkConfDir);
+    }
+    final int flinkConfigParallelism =
+        configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM.key(), -1);
+    if (flinkConfigParallelism > 0) {
+      return flinkConfigParallelism;
+    }
+    LOG.warn(
+        "No default parallelism could be found. Defaulting to parallelism 1. "
+            + "Please set an explicit parallelism with --parallelism");
+    return 1;
+  }
+
   private static void applyLatencyTrackingInterval(
       ExecutionConfig config, FlinkPipelineOptions options) {
     long latencyTrackingInterval = options.getLatencyTrackingInterval();
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
index 5989a6675b4..d004a67588f 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.runners.flink;
 
-import static com.google.common.base.MoreObjects.firstNonNull;
 import static 
org.apache.beam.runners.core.construction.PipelineResources.detectClassPathResourcesToStage;
 
 import com.google.common.util.concurrent.ListeningExecutorService;
@@ -38,7 +37,7 @@
 
   public static FlinkJobInvoker create(
       ListeningExecutorService executorService, String flinkMasterUrl) {
-    return new FlinkJobInvoker(executorService, firstNonNull(flinkMasterUrl, 
"[auto]"));
+    return new FlinkJobInvoker(executorService, flinkMasterUrl);
   }
 
   private final ListeningExecutorService executorService;
@@ -62,7 +61,7 @@ public JobInvocation invoke(
         String.format("%s_%s", flinkOptions.getJobName(), 
UUID.randomUUID().toString());
     LOG.info("Invoking job {}", invocationId);
 
-    flinkOptions.setFlinkMaster(firstNonNull(flinkOptions.getFlinkMaster(), 
flinkMasterUrl));
+    flinkOptions.setFlinkMaster(flinkMasterUrl);
 
     flinkOptions.setRunner(null);
 
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
index 0db01448c4d..81b0e41bb5c 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -56,12 +56,15 @@
       "Address of the Flink Master where the Pipeline should be executed. Can"
           + " either be of the form \"host:port\" or one of the special values 
[local], "
           + "[collection] or [auto].")
+  @Default.String("[auto]")
   String getFlinkMaster();
 
   void setFlinkMaster(String value);
 
-  @Description("The degree of parallelism to be used when distributing 
operations onto workers.")
-  @Default.InstanceFactory(DefaultParallelismFactory.class)
+  @Description(
+      "The degree of parallelism to be used when distributing operations onto 
workers. "
+          + "If the parallelism is not set, the configured Flink default is 
used, or 1 if none can be found.")
+  @Default.Integer(-1)
   Integer getParallelism();
 
   void setParallelism(Integer value);
@@ -75,13 +78,13 @@
   void setCheckpointingInterval(Long interval);
 
   @Description("The checkpointing mode that defines consistency guarantee.")
-  @Default.Enum("AT_LEAST_ONCE")
+  @Default.Enum("EXACTLY_ONCE")
   CheckpointingMode getCheckpointingMode();
 
   void setCheckpointingMode(CheckpointingMode mode);
 
   @Description("The maximum time that a checkpoint may take before being 
discarded.")
-  @Default.Long(20 * 60 * 1000)
+  @Default.Long(-1L)
   Long getCheckpointTimeoutMillis();
 
   void setCheckpointTimeoutMillis(Long checkpointTimeoutMillis);
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
index 34f6479bbcb..8af87b23430 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
@@ -83,11 +83,6 @@ public static FlinkRunner fromOptions(PipelineOptions 
options) {
       LOG.debug("Classpath elements: {}", flinkOptions.getFilesToStage());
     }
 
-    // Set Flink Master to [auto] if no option was specified.
-    if (flinkOptions.getFlinkMaster() == null) {
-      flinkOptions.setFlinkMaster("[auto]");
-    }
-
     return new FlinkRunner(flinkOptions);
   }
 
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java
new file mode 100644
index 00000000000..2e4c06cdfec
--- /dev/null
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.flink;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.util.Collections;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/** Tests for {@link FlinkExecutionEnvironments}. */
+public class FlinkExecutionEnvironmentsTest {
+
+  @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  @Test
+  public void shouldSetParallelismBatch() {
+    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setRunner(TestFlinkRunner.class);
+    options.setParallelism(42);
+
+    ExecutionEnvironment bev =
+        FlinkExecutionEnvironments.createBatchExecutionEnvironment(
+            options, Collections.emptyList());
+
+    assertThat(options.getParallelism(), is(42));
+    assertThat(bev.getParallelism(), is(42));
+  }
+
+  @Test
+  public void shouldSetParallelismStreaming() {
+    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setRunner(TestFlinkRunner.class);
+    options.setParallelism(42);
+
+    StreamExecutionEnvironment sev =
+        FlinkExecutionEnvironments.createStreamExecutionEnvironment(
+            options, Collections.emptyList());
+
+    assertThat(options.getParallelism(), is(42));
+    assertThat(sev.getParallelism(), is(42));
+  }
+
+  @Test
+  public void shouldInferParallelismFromEnvironmentBatch() throws IOException {
+    String flinkConfDir = extractFlinkConfig();
+
+    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setRunner(TestFlinkRunner.class);
+    options.setFlinkMaster("host:80");
+
+    ExecutionEnvironment bev =
+        FlinkExecutionEnvironments.createBatchExecutionEnvironment(
+            options, Collections.emptyList(), flinkConfDir);
+
+    assertThat(options.getParallelism(), is(23));
+    assertThat(bev.getParallelism(), is(23));
+  }
+
+  @Test
+  public void shouldInferParallelismFromEnvironmentStreaming() throws 
IOException {
+    String flinkConfDir = extractFlinkConfig();
+
+    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setRunner(TestFlinkRunner.class);
+    options.setFlinkMaster("host:80");
+
+    StreamExecutionEnvironment sev =
+        FlinkExecutionEnvironments.createStreamExecutionEnvironment(
+            options, Collections.emptyList(), flinkConfDir);
+
+    assertThat(options.getParallelism(), is(23));
+    assertThat(sev.getParallelism(), is(23));
+  }
+
+  @Test
+  public void shouldFallbackToDefaultParallelismBatch() {
+    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setRunner(TestFlinkRunner.class);
+    options.setFlinkMaster("host:80");
+
+    ExecutionEnvironment bev =
+        FlinkExecutionEnvironments.createBatchExecutionEnvironment(
+            options, Collections.emptyList());
+
+    assertThat(options.getParallelism(), is(1));
+    assertThat(bev.getParallelism(), is(1));
+  }
+
+  @Test
+  public void shouldFallbackToDefaultParallelismStreaming() {
+    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setRunner(TestFlinkRunner.class);
+    options.setFlinkMaster("host:80");
+
+    StreamExecutionEnvironment sev =
+        FlinkExecutionEnvironments.createStreamExecutionEnvironment(
+            options, Collections.emptyList());
+
+    assertThat(options.getParallelism(), is(1));
+    assertThat(sev.getParallelism(), is(1));
+  }
+
+  @Test
+  public void useDefaultParallelismFromContextBatch() {
+    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setRunner(TestFlinkRunner.class);
+
+    ExecutionEnvironment bev =
+        FlinkExecutionEnvironments.createBatchExecutionEnvironment(
+            options, Collections.emptyList());
+
+    assertThat(options.getParallelism(), 
is(LocalStreamEnvironment.getDefaultLocalParallelism()));
+    assertThat(bev.getParallelism(), 
is(LocalStreamEnvironment.getDefaultLocalParallelism()));
+  }
+
+  @Test
+  public void useDefaultParallelismFromContextStreaming() {
+    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setRunner(TestFlinkRunner.class);
+
+    StreamExecutionEnvironment sev =
+        FlinkExecutionEnvironments.createStreamExecutionEnvironment(
+            options, Collections.emptyList());
+
+    assertThat(options.getParallelism(), 
is(LocalStreamEnvironment.getDefaultLocalParallelism()));
+    assertThat(sev.getParallelism(), 
is(LocalStreamEnvironment.getDefaultLocalParallelism()));
+  }
+
+  private String extractFlinkConfig() throws IOException {
+    InputStream inputStream = 
getClass().getResourceAsStream("/flink-conf.yaml");
+    File root = temporaryFolder.getRoot();
+    Files.copy(inputStream, new File(root, "flink-conf.yaml").toPath());
+    return root.getAbsolutePath();
+  }
+}
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
index 2edf3c73d1e..49cdbe83f93 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
@@ -17,6 +17,10 @@
  */
 package org.apache.beam.runners.flink;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsNull.nullValue;
+
 import java.util.Collections;
 import java.util.HashMap;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
@@ -36,6 +40,7 @@
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.joda.time.Instant;
@@ -59,6 +64,28 @@
   private static MyOptions options =
       
PipelineOptionsFactory.fromArgs("--testOption=nothing").as(MyOptions.class);
 
+  /** These defaults should only be changed with a very good reason. */
+  @Test
+  public void testDefaults() {
+    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    assertThat(options.getParallelism(), is(-1));
+    assertThat(options.getFlinkMaster(), is("[auto]"));
+    assertThat(options.getFilesToStage(), is(nullValue()));
+    assertThat(options.getLatencyTrackingInterval(), is(0L));
+    assertThat(options.isShutdownSourcesOnFinalWatermark(), is(false));
+    assertThat(options.getObjectReuse(), is(false));
+    assertThat(options.getCheckpointingMode(), 
is(CheckpointingMode.EXACTLY_ONCE));
+    assertThat(options.getMinPauseBetweenCheckpoints(), is(-1L));
+    assertThat(options.getCheckpointingInterval(), is(-1L));
+    assertThat(options.getCheckpointTimeoutMillis(), is(-1L));
+    assertThat(options.getNumberOfExecutionRetries(), is(-1));
+    assertThat(options.getExecutionRetryDelay(), is(-1L));
+    assertThat(options.getRetainExternalizedCheckpointsOnCancellation(), 
is(false));
+    assertThat(options.getStateBackend(), is(nullValue()));
+    assertThat(options.getMaxBundleSize(), is(1000L));
+    assertThat(options.getMaxBundleTimeMills(), is(1000L));
+  }
+
   @Test(expected = Exception.class)
   public void parDoBaseClassPipelineOptionsNullTest() {
     TupleTag<String> mainTag = new TupleTag<>("main-output");
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
index 66600193f0b..17d803d193a 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
@@ -21,6 +21,7 @@
 import java.io.File;
 import java.io.Serializable;
 import java.util.Arrays;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
 import org.apache.beam.runners.flink.FlinkTestPipeline;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.TextIO;
@@ -85,6 +86,7 @@ public void processElement(ProcessContext c) {
   public void testProgram() throws Exception {
 
     Pipeline p = FlinkTestPipeline.createForStreaming();
+    p.getOptions().as(FlinkPipelineOptions.class).setParallelism(1);
 
     PCollection<String> output =
         p.apply(
diff --git a/runners/flink/src/test/resources/flink-conf.yaml 
b/runners/flink/src/test/resources/flink-conf.yaml
new file mode 100644
index 00000000000..54a03c99a7f
--- /dev/null
+++ b/runners/flink/src/test/resources/flink-conf.yaml
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+parallelism.default: 23


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 145731)
    Time Spent: 2h 50m  (was: 2h 40m)

> Issue with setting the parallelism at client level using Flink runner
> ---------------------------------------------------------------------
>
>                 Key: BEAM-3089
>                 URL: https://issues.apache.org/jira/browse/BEAM-3089
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>    Affects Versions: 2.0.0
>         Environment: I am using Flink 1.2.1 running on Docker, with Task 
> Managers distributed across different VMs as part of a Docker Swarm.
>            Reporter: Thalita Vergilio
>            Assignee: Grzegorz KoĊ‚akowski
>            Priority: Major
>              Labels: docker, flink, parallel-deployment
>             Fix For: 2.8.0
>
>         Attachments: flink-ui-parallelism.png
>
>          Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> When uploading an Apache Beam application using the Flink Web UI, the 
> parallelism set at job submission doesn't get picked up. The same happens 
> when submitting a job using the Flink CLI.
> In both cases, the parallelism ends up defaulting to 1.
> When I set the parallelism programmatically within the Apache Beam code, it 
> works: {{flinkPipelineOptions.setParallelism(4);}}
> I suspect the root of the problem may be in the 
> org.apache.beam.runners.flink.DefaultParallelismFactory class, as it checks 
> for Flink's GlobalConfiguration, which may not pick up runtime values passed 
> to Flink, then defaults to 1 if it doesn't find anything.
> Any ideas on how this could be fixed or worked around? I need to be able to 
> change the parallelism dynamically, so the programmatic approach won't really 
> work for me, nor will setting the Flink configuration at system level.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to