[ 
https://issues.apache.org/jira/browse/BEAM-6176?focusedWorklogId=174034&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-174034
 ]

ASF GitHub Bot logged work on BEAM-6176:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 11/Dec/18 11:36
            Start Date: 11/Dec/18 11:36
    Worklog Time Spent: 10m 
      Work Description: mxm closed pull request #7196: [BEAM-6176] Support IPv6 
addresses for Flink master url
URL: https://github.com/apache/beam/pull/7196
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
index 25854cfb89a8..0b7c84334190 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.flink;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.net.HostAndPort;
 import java.net.URL;
 import java.util.Collections;
 import java.util.List;
@@ -77,26 +78,16 @@ static ExecutionEnvironment createBatchExecutionEnvironment(
     } else if ("[auto]".equals(masterUrl)) {
       flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment();
     } else {
-      String[] hostAndPort = masterUrl.split(":", 2);
-      final String host = hostAndPort[0];
-      final int port;
-      if (hostAndPort.length > 1) {
-        try {
-          port = Integer.parseInt(hostAndPort[1]);
-        } catch (NumberFormatException e) {
-          throw new IllegalArgumentException("Provided port is malformed: " + 
hostAndPort[1]);
-        }
-        flinkConfiguration.setInteger(RestOptions.PORT, port);
-      } else {
-        port = flinkConfiguration.getInteger(RestOptions.PORT);
-      }
+      int defaultPort = flinkConfiguration.getInteger(RestOptions.PORT);
+      HostAndPort hostAndPort = 
HostAndPort.fromString(masterUrl).withDefaultPort(defaultPort);
+      flinkConfiguration.setInteger(RestOptions.PORT, hostAndPort.getPort());
       flinkBatchEnv =
           ExecutionEnvironment.createRemoteEnvironment(
-              host,
-              port,
+              hostAndPort.getHost(),
+              hostAndPort.getPort(),
               flinkConfiguration,
               filesToStage.toArray(new String[filesToStage.size()]));
-      LOG.info("Using Flink Master URL {}:{}.", host, port);
+      LOG.info("Using Flink Master URL {}:{}.", hostAndPort.getHost(), 
hostAndPort.getPort());
     }
 
     // Set the execution more for data exchange.
@@ -157,19 +148,9 @@ static StreamExecutionEnvironment 
createStreamExecutionEnvironment(
     } else if ("[auto]".equals(masterUrl)) {
       flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
     } else {
-      String[] hostAndPort = masterUrl.split(":", 2);
-      final String host = hostAndPort[0];
-      final int port;
-      if (hostAndPort.length > 1) {
-        try {
-          port = Integer.parseInt(hostAndPort[1]);
-        } catch (NumberFormatException e) {
-          throw new IllegalArgumentException("Provided port is malformed: " + 
hostAndPort[1]);
-        }
-        flinkConfig.setInteger(RestOptions.PORT, port);
-      } else {
-        port = flinkConfig.getInteger(RestOptions.PORT);
-      }
+      int defaultPort = flinkConfig.getInteger(RestOptions.PORT);
+      HostAndPort hostAndPort = 
HostAndPort.fromString(masterUrl).withDefaultPort(defaultPort);
+      flinkConfig.setInteger(RestOptions.PORT, hostAndPort.getPort());
       final SavepointRestoreSettings savepointRestoreSettings;
       if (options.getSavepointPath() != null) {
         savepointRestoreSettings =
@@ -180,12 +161,12 @@ static StreamExecutionEnvironment 
createStreamExecutionEnvironment(
       }
       flinkStreamEnv =
           new BeamFlinkRemoteStreamEnvironment(
-              host,
-              port,
+              hostAndPort.getHost(),
+              hostAndPort.getPort(),
               flinkConfig,
               savepointRestoreSettings,
               filesToStage.toArray(new String[filesToStage.size()]));
-      LOG.info("Using Flink Master URL {}:{}.", host, port);
+      LOG.info("Using Flink Master URL {}:{}.", hostAndPort.getHost(), 
hostAndPort.getPort());
     }
 
     // Set the parallelism, required by UnboundedSourceWrapper to generate 
consistent splits.
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java
index a16e81ecf73b..573b96ae771e 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java
@@ -267,7 +267,7 @@ public void shouldDetectMalformedPortBatch() {
     options.setFlinkMaster("host:p0rt");
 
     expectedException.expect(IllegalArgumentException.class);
-    expectedException.expectMessage("Provided port is malformed");
+    expectedException.expectMessage("Unparseable port number");
 
     FlinkExecutionEnvironments.createBatchExecutionEnvironment(options, 
Collections.emptyList());
   }
@@ -279,33 +279,93 @@ public void shouldDetectMalformedPortStreaming() {
     options.setFlinkMaster("host:p0rt");
 
     expectedException.expect(IllegalArgumentException.class);
-    expectedException.expectMessage("Provided port is malformed");
+    expectedException.expectMessage("Unparseable port number");
 
     FlinkExecutionEnvironments.createStreamExecutionEnvironment(options, 
Collections.emptyList());
   }
 
   @Test
-  public void shouldFailOnEmptyPortBatch() {
+  public void shouldSupportIPv4Batch() {
     FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
     options.setRunner(FlinkRunner.class);
-    options.setFlinkMaster("host:");
 
-    expectedException.expect(IllegalArgumentException.class);
-    expectedException.expectMessage("Provided port is malformed");
+    options.setFlinkMaster("192.168.1.1:1234");
+    ExecutionEnvironment bev =
+        FlinkExecutionEnvironments.createBatchExecutionEnvironment(
+            options, Collections.emptyList());
+    assertThat(Whitebox.getInternalState(bev, "host"), is("192.168.1.1"));
+    assertThat(Whitebox.getInternalState(bev, "port"), is(1234));
 
-    FlinkExecutionEnvironments.createBatchExecutionEnvironment(options, 
Collections.emptyList());
+    options.setFlinkMaster("192.168.1.1");
+    bev =
+        FlinkExecutionEnvironments.createBatchExecutionEnvironment(
+            options, Collections.emptyList());
+    assertThat(Whitebox.getInternalState(bev, "host"), is("192.168.1.1"));
+    assertThat(Whitebox.getInternalState(bev, "port"), 
is(RestOptions.PORT.defaultValue()));
   }
 
   @Test
-  public void shouldFailOnEmptyPortStreaming() {
+  public void shouldSupportIPv4Streaming() {
     FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
     options.setRunner(FlinkRunner.class);
-    options.setFlinkMaster("host:");
 
-    expectedException.expect(IllegalArgumentException.class);
-    expectedException.expectMessage("Provided port is malformed");
+    options.setFlinkMaster("192.168.1.1:1234");
+    ExecutionEnvironment bev =
+        FlinkExecutionEnvironments.createBatchExecutionEnvironment(
+            options, Collections.emptyList());
+    assertThat(Whitebox.getInternalState(bev, "host"), is("192.168.1.1"));
+    assertThat(Whitebox.getInternalState(bev, "port"), is(1234));
 
-    FlinkExecutionEnvironments.createStreamExecutionEnvironment(options, 
Collections.emptyList());
+    options.setFlinkMaster("192.168.1.1");
+    bev =
+        FlinkExecutionEnvironments.createBatchExecutionEnvironment(
+            options, Collections.emptyList());
+    assertThat(Whitebox.getInternalState(bev, "host"), is("192.168.1.1"));
+    assertThat(Whitebox.getInternalState(bev, "port"), 
is(RestOptions.PORT.defaultValue()));
+  }
+
+  @Test
+  public void shouldSupportIPv6Batch() {
+    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setRunner(FlinkRunner.class);
+
+    options.setFlinkMaster("[FE80:CD00:0000:0CDE:1257:0000:211E:729C]:1234");
+    ExecutionEnvironment bev =
+        FlinkExecutionEnvironments.createBatchExecutionEnvironment(
+            options, Collections.emptyList());
+    assertThat(
+        Whitebox.getInternalState(bev, "host"), 
is("FE80:CD00:0000:0CDE:1257:0000:211E:729C"));
+    assertThat(Whitebox.getInternalState(bev, "port"), is(1234));
+
+    options.setFlinkMaster("FE80:CD00:0000:0CDE:1257:0000:211E:729C");
+    bev =
+        FlinkExecutionEnvironments.createBatchExecutionEnvironment(
+            options, Collections.emptyList());
+    assertThat(
+        Whitebox.getInternalState(bev, "host"), 
is("FE80:CD00:0000:0CDE:1257:0000:211E:729C"));
+    assertThat(Whitebox.getInternalState(bev, "port"), 
is(RestOptions.PORT.defaultValue()));
+  }
+
+  @Test
+  public void shouldSupportIPv6Streaming() {
+    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setRunner(FlinkRunner.class);
+
+    options.setFlinkMaster("[FE80:CD00:0000:0CDE:1257:0000:211E:729C]:1234");
+    StreamExecutionEnvironment sev =
+        FlinkExecutionEnvironments.createStreamExecutionEnvironment(
+            options, Collections.emptyList());
+    assertThat(
+        Whitebox.getInternalState(sev, "host"), 
is("FE80:CD00:0000:0CDE:1257:0000:211E:729C"));
+    assertThat(Whitebox.getInternalState(sev, "port"), is(1234));
+
+    options.setFlinkMaster("FE80:CD00:0000:0CDE:1257:0000:211E:729C");
+    sev =
+        FlinkExecutionEnvironments.createStreamExecutionEnvironment(
+            options, Collections.emptyList());
+    assertThat(
+        Whitebox.getInternalState(sev, "host"), 
is("FE80:CD00:0000:0CDE:1257:0000:211E:729C"));
+    assertThat(Whitebox.getInternalState(sev, "port"), 
is(RestOptions.PORT.defaultValue()));
   }
 
   private String extractFlinkConfig() throws IOException {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 174034)
    Time Spent: 1h 40m  (was: 1.5h)

> Flink Runner's master url does not support IPv6 addresses
> ---------------------------------------------------------
>
>                 Key: BEAM-6176
>                 URL: https://issues.apache.org/jira/browse/BEAM-6176
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>            Reporter: Maximilian Michels
>            Assignee: Maximilian Michels
>            Priority: Major
>             Fix For: 2.10.0
>
>          Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> The port parsing logic does not support IPv6.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to