This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 9bd61e1  [FLINK-24474] Bind Jobmanager and Taskmanager RPC host 
addresses to localhost by default
9bd61e1 is described below

commit 9bd61e1334e4805541b2b3689fe500e949a86658
Author: Mika <m...@autophagy.io>
AuthorDate: Wed Feb 16 15:29:57 2022 +0100

    [FLINK-24474] Bind Jobmanager and Taskmanager RPC host addresses to 
localhost by default
---
 flink-dist/src/main/resources/flink-conf.yaml         | 15 +++++++++++++++
 .../util/flink/container/FlinkContainersBuilder.java  |  3 +++
 .../decorators/FlinkConfMountDecorator.java           |  4 ++++
 .../org/apache/flink/runtime/net/ConnectionUtils.java | 19 ++++++++++++++++++-
 .../flink/yarn/entrypoint/YarnEntrypointUtils.java    |  3 +++
 5 files changed, 43 insertions(+), 1 deletion(-)

diff --git a/flink-dist/src/main/resources/flink-conf.yaml 
b/flink-dist/src/main/resources/flink-conf.yaml
index 39b1f71..eb9bc83 100644
--- a/flink-dist/src/main/resources/flink-conf.yaml
+++ b/flink-dist/src/main/resources/flink-conf.yaml
@@ -36,6 +36,14 @@ jobmanager.rpc.address: localhost
 
 jobmanager.rpc.port: 6123
 
+# The host interface the JobManager will bind to. My default, this is 
localhost, and will prevent
+# the JobManager from communicating outside the machine/container it is 
running on.
+#
+# To enable this, set the bind-host address to one that has access to an 
outside facing network
+# interface, such as 0.0.0.0.
+
+jobmanager.bind-host: localhost
+
 
 # The total process memory size for the JobManager.
 #
@@ -43,6 +51,13 @@ jobmanager.rpc.port: 6123
 
 jobmanager.memory.process.size: 1600m
 
+# The host interface the TaskManager will bind to. My default, this is 
localhost, and will prevent
+# the TaskManager from communicating outside the machine/container it is 
running on.
+#
+# To enable this, set the bind-host address to one that has access to an 
outside facing network
+# interface, such as 0.0.0.0.
+
+taskmanager.bind-host: localhost
 
 # The total process memory size for the TaskManager.
 #
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainersBuilder.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainersBuilder.java
index c0e7b41..bcd18cb 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainersBuilder.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainersBuilder.java
@@ -165,6 +165,9 @@ public class FlinkContainersBuilder {
                 CHECKPOINT_PATH.toAbsolutePath().toUri().toString());
         this.conf.set(RestOptions.BIND_ADDRESS, "0.0.0.0");
 
+        this.conf.set(JobManagerOptions.BIND_HOST, "0.0.0.0");
+        this.conf.set(TaskManagerOptions.BIND_HOST, "0.0.0.0");
+
         // Create temporary directory for building Flink image
         final Path imageBuildingTempDir;
         try {
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
index 61587cb..e1edc10 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
@@ -21,7 +21,9 @@ package org.apache.flink.kubernetes.kubeclient.decorators;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptionsInternal;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.kubeclient.FlinkPod;
 import 
org.apache.flink.kubernetes.kubeclient.parameters.AbstractKubernetesParameters;
@@ -158,6 +160,8 @@ public class FlinkConfMountDecorator extends 
AbstractKubernetesStepDecorator {
         
clusterSideConfig.removeConfig(KubernetesConfigOptions.KUBE_CONFIG_FILE);
         clusterSideConfig.removeConfig(DeploymentOptionsInternal.CONF_DIR);
         clusterSideConfig.removeConfig(RestOptions.BIND_ADDRESS);
+        clusterSideConfig.removeConfig(JobManagerOptions.BIND_HOST);
+        clusterSideConfig.removeConfig(TaskManagerOptions.BIND_HOST);
         return clusterSideConfig.toMap();
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
index 9f61cae..84849c9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
@@ -58,6 +58,7 @@ public class ConnectionUtils {
      * state failed to determine the address.
      */
     private enum AddressDetectionState {
+        LOOPBACK(100),
         /** Connect from interface returned by InetAddress.getLocalHost(). * */
         LOCAL_HOST(200),
         /** Detect own IP address based on the target IP address. Look for 
common prefix */
@@ -115,6 +116,7 @@ public class ConnectionUtils {
         final List<AddressDetectionState> strategies =
                 Collections.unmodifiableList(
                         Arrays.asList(
+                                AddressDetectionState.LOOPBACK,
                                 AddressDetectionState.LOCAL_HOST,
                                 AddressDetectionState.ADDRESS,
                                 AddressDetectionState.FAST_CONNECT,
@@ -225,6 +227,18 @@ public class ConnectionUtils {
     private static InetAddress findAddressUsingStrategy(
             AddressDetectionState strategy, InetSocketAddress targetAddress, 
boolean logging)
             throws IOException {
+        if (strategy == AddressDetectionState.LOOPBACK) {
+            InetAddress loopback = InetAddress.getLoopbackAddress();
+
+            if (tryToConnect(loopback, targetAddress, strategy.getTimeout(), 
logging)) {
+                LOG.debug(
+                        "Using InetAddress.getLoopbackAddress() immediately 
for connecting address");
+                return loopback;
+            } else {
+                return null;
+            }
+        }
+
         // try LOCAL_HOST strategy independent of the network interfaces
         if (strategy == AddressDetectionState.LOCAL_HOST) {
             InetAddress localhostName;
@@ -432,7 +446,7 @@ public class ConnectionUtils {
                     }
 
                     if (targetAddress != null) {
-                        AddressDetectionState strategy = 
AddressDetectionState.LOCAL_HOST;
+                        AddressDetectionState strategy = 
AddressDetectionState.LOOPBACK;
 
                         boolean logging = elapsedTimeMillis >= 
startLoggingAfter.toMillis();
                         if (logging) {
@@ -449,6 +463,9 @@ public class ConnectionUtils {
 
                             // pick the next strategy
                             switch (strategy) {
+                                case LOOPBACK:
+                                    strategy = 
AddressDetectionState.LOCAL_HOST;
+                                    break;
                                 case LOCAL_HOST:
                                     strategy = AddressDetectionState.ADDRESS;
                                     break;
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
index 2321355..ec9cde3 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.util.Preconditions;
@@ -64,6 +65,8 @@ public class YarnEntrypointUtils {
                 ApplicationConstants.Environment.NM_HOST.key());
 
         configuration.setString(JobManagerOptions.ADDRESS, hostname);
+        configuration.removeConfig(JobManagerOptions.BIND_HOST);
+        configuration.removeConfig(TaskManagerOptions.BIND_HOST);
         configuration.setString(RestOptions.ADDRESS, hostname);
         configuration.setString(RestOptions.BIND_ADDRESS, hostname);
 

Reply via email to