[
https://issues.apache.org/jira/browse/BEAM-1628?focusedWorklogId=171911&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-171911
]
ASF GitHub Bot logged work on BEAM-1628:
----------------------------------------
Author: ASF GitHub Bot
Created on: 04/Dec/18 14:27
Start Date: 04/Dec/18 14:27
Worklog Time Spent: 10m
Work Description: mxm closed pull request #7187: [BEAM-1628] Allow empty
port for flink master url
URL: https://github.com/apache/beam/pull/7187
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
index 80a9ef860a4c..25854cfb89a8 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
@@ -18,7 +18,6 @@
package org.apache.beam.runners.flink;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Splitter;
import java.net.URL;
import java.util.Collections;
import java.util.List;
@@ -77,17 +76,27 @@ static ExecutionEnvironment createBatchExecutionEnvironment(
flinkBatchEnv = new CollectionEnvironment();
} else if ("[auto]".equals(masterUrl)) {
flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment();
- } else if (masterUrl.matches(".*:\\d*")) {
- List<String> parts = Splitter.on(':').splitToList(masterUrl);
+ } else {
+ String[] hostAndPort = masterUrl.split(":", 2);
+ final String host = hostAndPort[0];
+ final int port;
+ if (hostAndPort.length > 1) {
+ try {
+ port = Integer.parseInt(hostAndPort[1]);
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Provided port is malformed: " +
hostAndPort[1]);
+ }
+ flinkConfiguration.setInteger(RestOptions.PORT, port);
+ } else {
+ port = flinkConfiguration.getInteger(RestOptions.PORT);
+ }
flinkBatchEnv =
ExecutionEnvironment.createRemoteEnvironment(
- parts.get(0),
- Integer.parseInt(parts.get(1)),
+ host,
+ port,
flinkConfiguration,
filesToStage.toArray(new String[filesToStage.size()]));
- } else {
- LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].",
masterUrl);
- flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment();
+ LOG.info("Using Flink Master URL {}:{}.", host, port);
}
// Set the execution more for data exchange.
@@ -147,10 +156,20 @@ static StreamExecutionEnvironment
createStreamExecutionEnvironment(
flinkStreamEnv = StreamExecutionEnvironment.createLocalEnvironment();
} else if ("[auto]".equals(masterUrl)) {
flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
- } else if (masterUrl.matches(".*:\\d*")) {
- List<String> parts = Splitter.on(':').splitToList(masterUrl);
- flinkConfig.setInteger(RestOptions.PORT, Integer.parseInt(parts.get(1)));
-
+ } else {
+ String[] hostAndPort = masterUrl.split(":", 2);
+ final String host = hostAndPort[0];
+ final int port;
+ if (hostAndPort.length > 1) {
+ try {
+ port = Integer.parseInt(hostAndPort[1]);
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Provided port is malformed: " +
hostAndPort[1]);
+ }
+ flinkConfig.setInteger(RestOptions.PORT, port);
+ } else {
+ port = flinkConfig.getInteger(RestOptions.PORT);
+ }
final SavepointRestoreSettings savepointRestoreSettings;
if (options.getSavepointPath() != null) {
savepointRestoreSettings =
@@ -159,17 +178,14 @@ static StreamExecutionEnvironment
createStreamExecutionEnvironment(
} else {
savepointRestoreSettings = SavepointRestoreSettings.none();
}
-
flinkStreamEnv =
new BeamFlinkRemoteStreamEnvironment(
- parts.get(0),
- Integer.parseInt(parts.get(1)),
+ host,
+ port,
flinkConfig,
savepointRestoreSettings,
filesToStage.toArray(new String[filesToStage.size()]));
- } else {
- LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].",
masterUrl);
- flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+ LOG.info("Using Flink Master URL {}:{}.", host, port);
}
// Set the parallelism, required by UnboundedSourceWrapper to generate
consistent splits.
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java
index f8131e4de934..a16e81ecf73b 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java
@@ -17,8 +17,9 @@
*/
package org.apache.beam.runners.flink;
+import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.core.Is.is;
-import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import java.io.File;
@@ -28,12 +29,16 @@
import java.util.Collections;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.LocalEnvironment;
+import org.apache.flink.api.java.RemoteEnvironment;
+import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.mockito.internal.util.reflection.Whitebox;
@@ -41,6 +46,7 @@
public class FlinkExecutionEnvironmentsTest {
@Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
+ @Rule public ExpectedException expectedException = ExpectedException.none();
@Test
public void shouldSetParallelismBatch() {
@@ -139,6 +145,7 @@ public void useDefaultParallelismFromContextBatch() {
FlinkExecutionEnvironments.createBatchExecutionEnvironment(
options, Collections.emptyList());
+ assertThat(bev, instanceOf(LocalEnvironment.class));
assertThat(options.getParallelism(),
is(LocalStreamEnvironment.getDefaultLocalParallelism()));
assertThat(bev.getParallelism(),
is(LocalStreamEnvironment.getDefaultLocalParallelism()));
}
@@ -152,10 +159,155 @@ public void useDefaultParallelismFromContextStreaming() {
FlinkExecutionEnvironments.createStreamExecutionEnvironment(
options, Collections.emptyList());
+ assertThat(sev, instanceOf(LocalStreamEnvironment.class));
assertThat(options.getParallelism(),
is(LocalStreamEnvironment.getDefaultLocalParallelism()));
assertThat(sev.getParallelism(),
is(LocalStreamEnvironment.getDefaultLocalParallelism()));
}
+ @Test
+ public void shouldParsePortForRemoteEnvironmentBatch() {
+ FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ options.setRunner(FlinkRunner.class);
+ options.setFlinkMaster("host:1234");
+
+ ExecutionEnvironment bev =
+ FlinkExecutionEnvironments.createBatchExecutionEnvironment(
+ options, Collections.emptyList());
+
+ assertThat(bev, instanceOf(RemoteEnvironment.class));
+ assertThat(Whitebox.getInternalState(bev, "host"), is("host"));
+ assertThat(Whitebox.getInternalState(bev, "port"), is(1234));
+ }
+
+ @Test
+ public void shouldParsePortForRemoteEnvironmentStreaming() {
+ FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ options.setRunner(FlinkRunner.class);
+ options.setFlinkMaster("host:1234");
+
+ StreamExecutionEnvironment sev =
+ FlinkExecutionEnvironments.createStreamExecutionEnvironment(
+ options, Collections.emptyList());
+
+ assertThat(sev, instanceOf(RemoteStreamEnvironment.class));
+ assertThat(Whitebox.getInternalState(sev, "host"), is("host"));
+ assertThat(Whitebox.getInternalState(sev, "port"), is(1234));
+ }
+
+ @Test
+ public void shouldAllowPortOmissionForRemoteEnvironmentBatch() {
+ FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ options.setRunner(FlinkRunner.class);
+ options.setFlinkMaster("host");
+
+ ExecutionEnvironment bev =
+ FlinkExecutionEnvironments.createBatchExecutionEnvironment(
+ options, Collections.emptyList());
+
+ assertThat(bev, instanceOf(RemoteEnvironment.class));
+ assertThat(Whitebox.getInternalState(bev, "host"), is("host"));
+ assertThat(Whitebox.getInternalState(bev, "port"),
is(RestOptions.PORT.defaultValue()));
+ }
+
+ @Test
+ public void shouldAllowPortOmissionForRemoteEnvironmentStreaming() {
+ FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ options.setRunner(FlinkRunner.class);
+ options.setFlinkMaster("host");
+
+ StreamExecutionEnvironment sev =
+ FlinkExecutionEnvironments.createStreamExecutionEnvironment(
+ options, Collections.emptyList());
+
+ assertThat(sev, instanceOf(RemoteStreamEnvironment.class));
+ assertThat(Whitebox.getInternalState(sev, "host"), is("host"));
+ assertThat(Whitebox.getInternalState(sev, "port"),
is(RestOptions.PORT.defaultValue()));
+ }
+
+ @Test
+ public void shouldTreatAutoAndEmptyHostTheSameBatch() {
+ FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ options.setRunner(FlinkRunner.class);
+
+ ExecutionEnvironment sev =
+ FlinkExecutionEnvironments.createBatchExecutionEnvironment(
+ options, Collections.emptyList());
+
+ options.setFlinkMaster("[auto]");
+
+ ExecutionEnvironment sev2 =
+ FlinkExecutionEnvironments.createBatchExecutionEnvironment(
+ options, Collections.emptyList());
+
+ assertEquals(sev.getClass(), sev2.getClass());
+ }
+
+ @Test
+ public void shouldTreatAutoAndEmptyHostTheSameStreaming() {
+ FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ options.setRunner(FlinkRunner.class);
+
+ StreamExecutionEnvironment sev =
+ FlinkExecutionEnvironments.createStreamExecutionEnvironment(
+ options, Collections.emptyList());
+
+ options.setFlinkMaster("[auto]");
+
+ StreamExecutionEnvironment sev2 =
+ FlinkExecutionEnvironments.createStreamExecutionEnvironment(
+ options, Collections.emptyList());
+
+ assertEquals(sev.getClass(), sev2.getClass());
+ }
+
+ @Test
+ public void shouldDetectMalformedPortBatch() {
+ FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ options.setRunner(FlinkRunner.class);
+ options.setFlinkMaster("host:p0rt");
+
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage("Provided port is malformed");
+
+ FlinkExecutionEnvironments.createBatchExecutionEnvironment(options,
Collections.emptyList());
+ }
+
+ @Test
+ public void shouldDetectMalformedPortStreaming() {
+ FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ options.setRunner(FlinkRunner.class);
+ options.setFlinkMaster("host:p0rt");
+
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage("Provided port is malformed");
+
+ FlinkExecutionEnvironments.createStreamExecutionEnvironment(options,
Collections.emptyList());
+ }
+
+ @Test
+ public void shouldFailOnEmptyPortBatch() {
+ FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ options.setRunner(FlinkRunner.class);
+ options.setFlinkMaster("host:");
+
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage("Provided port is malformed");
+
+ FlinkExecutionEnvironments.createBatchExecutionEnvironment(options,
Collections.emptyList());
+ }
+
+ @Test
+ public void shouldFailOnEmptyPortStreaming() {
+ FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ options.setRunner(FlinkRunner.class);
+ options.setFlinkMaster("host:");
+
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage("Provided port is malformed");
+
+ FlinkExecutionEnvironments.createStreamExecutionEnvironment(options,
Collections.emptyList());
+ }
+
private String extractFlinkConfig() throws IOException {
InputStream inputStream =
getClass().getResourceAsStream("/flink-conf.yaml");
File root = temporaryFolder.getRoot();
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 171911)
Time Spent: 1h 20m (was: 1h 10m)
> Flink runner: logic around --flinkMaster is error-prone
> -------------------------------------------------------
>
> Key: BEAM-1628
> URL: https://issues.apache.org/jira/browse/BEAM-1628
> Project: Beam
> Issue Type: Bug
> Components: runner-flink
> Reporter: Davor Bonaci
> Assignee: Maximilian Michels
> Priority: Minor
> Labels: newbie, starter
> Time Spent: 1h 20m
> Remaining Estimate: 0h
>
> The logic for handling {{--flinkMaster}} seems not particularly user-friendly.
> https://github.com/apache/beam/blob/fbcde4cdc7d68de8734bf540c079b2747631a854/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java#L132
> {code}
> if (masterUrl.equals("[local]")) {
> } else if (masterUrl.equals("[collection]")) {
> } else if (masterUrl.equals("[auto]")) {
> } else if (masterUrl.matches(".*:\\d*")) {
> } else {
> // use auto.
> }
> {code}
> The options are constructed with "auto" set as default.
> I think we should do the following:
> * I assume there's a default port for the Flink master. We should default to
> it.
> * We should treat a string without a colon as a host name. (Not default to
> local execution.)
> This is super easy fix, hopefully someone can pick it up quickly ;-)
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)