This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 9bd61e1 [FLINK-24474] Bind Jobmanager and Taskmanager RPC host addresses to localhost by default 9bd61e1 is described below commit 9bd61e1334e4805541b2b3689fe500e949a86658 Author: Mika <m...@autophagy.io> AuthorDate: Wed Feb 16 15:29:57 2022 +0100 [FLINK-24474] Bind Jobmanager and Taskmanager RPC host addresses to localhost by default --- flink-dist/src/main/resources/flink-conf.yaml | 15 +++++++++++++++ .../util/flink/container/FlinkContainersBuilder.java | 3 +++ .../decorators/FlinkConfMountDecorator.java | 4 ++++ .../org/apache/flink/runtime/net/ConnectionUtils.java | 19 ++++++++++++++++++- .../flink/yarn/entrypoint/YarnEntrypointUtils.java | 3 +++ 5 files changed, 43 insertions(+), 1 deletion(-) diff --git a/flink-dist/src/main/resources/flink-conf.yaml b/flink-dist/src/main/resources/flink-conf.yaml index 39b1f71..eb9bc83 100644 --- a/flink-dist/src/main/resources/flink-conf.yaml +++ b/flink-dist/src/main/resources/flink-conf.yaml @@ -36,6 +36,14 @@ jobmanager.rpc.address: localhost jobmanager.rpc.port: 6123 +# The host interface the JobManager will bind to. My default, this is localhost, and will prevent +# the JobManager from communicating outside the machine/container it is running on. +# +# To enable this, set the bind-host address to one that has access to an outside facing network +# interface, such as 0.0.0.0. + +jobmanager.bind-host: localhost + # The total process memory size for the JobManager. # @@ -43,6 +51,13 @@ jobmanager.rpc.port: 6123 jobmanager.memory.process.size: 1600m +# The host interface the TaskManager will bind to. My default, this is localhost, and will prevent +# the TaskManager from communicating outside the machine/container it is running on. +# +# To enable this, set the bind-host address to one that has access to an outside facing network +# interface, such as 0.0.0.0. + +taskmanager.bind-host: localhost # The total process memory size for the TaskManager. # diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainersBuilder.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainersBuilder.java index c0e7b41..bcd18cb 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainersBuilder.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainersBuilder.java @@ -165,6 +165,9 @@ public class FlinkContainersBuilder { CHECKPOINT_PATH.toAbsolutePath().toUri().toString()); this.conf.set(RestOptions.BIND_ADDRESS, "0.0.0.0"); + this.conf.set(JobManagerOptions.BIND_HOST, "0.0.0.0"); + this.conf.set(TaskManagerOptions.BIND_HOST, "0.0.0.0"); + // Create temporary directory for building Flink image final Path imageBuildingTempDir; try { diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java index 61587cb..e1edc10 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java @@ -21,7 +21,9 @@ package org.apache.flink.kubernetes.kubeclient.decorators; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DeploymentOptionsInternal; +import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.kubeclient.FlinkPod; import org.apache.flink.kubernetes.kubeclient.parameters.AbstractKubernetesParameters; @@ -158,6 +160,8 @@ public class FlinkConfMountDecorator extends AbstractKubernetesStepDecorator { clusterSideConfig.removeConfig(KubernetesConfigOptions.KUBE_CONFIG_FILE); clusterSideConfig.removeConfig(DeploymentOptionsInternal.CONF_DIR); clusterSideConfig.removeConfig(RestOptions.BIND_ADDRESS); + clusterSideConfig.removeConfig(JobManagerOptions.BIND_HOST); + clusterSideConfig.removeConfig(TaskManagerOptions.BIND_HOST); return clusterSideConfig.toMap(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java index 9f61cae..84849c9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java @@ -58,6 +58,7 @@ public class ConnectionUtils { * state failed to determine the address. */ private enum AddressDetectionState { + LOOPBACK(100), /** Connect from interface returned by InetAddress.getLocalHost(). * */ LOCAL_HOST(200), /** Detect own IP address based on the target IP address. Look for common prefix */ @@ -115,6 +116,7 @@ public class ConnectionUtils { final List<AddressDetectionState> strategies = Collections.unmodifiableList( Arrays.asList( + AddressDetectionState.LOOPBACK, AddressDetectionState.LOCAL_HOST, AddressDetectionState.ADDRESS, AddressDetectionState.FAST_CONNECT, @@ -225,6 +227,18 @@ public class ConnectionUtils { private static InetAddress findAddressUsingStrategy( AddressDetectionState strategy, InetSocketAddress targetAddress, boolean logging) throws IOException { + if (strategy == AddressDetectionState.LOOPBACK) { + InetAddress loopback = InetAddress.getLoopbackAddress(); + + if (tryToConnect(loopback, targetAddress, strategy.getTimeout(), logging)) { + LOG.debug( + "Using InetAddress.getLoopbackAddress() immediately for connecting address"); + return loopback; + } else { + return null; + } + } + // try LOCAL_HOST strategy independent of the network interfaces if (strategy == AddressDetectionState.LOCAL_HOST) { InetAddress localhostName; @@ -432,7 +446,7 @@ public class ConnectionUtils { } if (targetAddress != null) { - AddressDetectionState strategy = AddressDetectionState.LOCAL_HOST; + AddressDetectionState strategy = AddressDetectionState.LOOPBACK; boolean logging = elapsedTimeMillis >= startLoggingAfter.toMillis(); if (logging) { @@ -449,6 +463,9 @@ public class ConnectionUtils { // pick the next strategy switch (strategy) { + case LOOPBACK: + strategy = AddressDetectionState.LOCAL_HOST; + break; case LOCAL_HOST: strategy = AddressDetectionState.ADDRESS; break; diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java index 2321355..ec9cde3 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java @@ -25,6 +25,7 @@ import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.ResourceManagerOptions; import org.apache.flink.configuration.RestOptions; import org.apache.flink.configuration.SecurityOptions; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.configuration.WebOptions; import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.util.Preconditions; @@ -64,6 +65,8 @@ public class YarnEntrypointUtils { ApplicationConstants.Environment.NM_HOST.key()); configuration.setString(JobManagerOptions.ADDRESS, hostname); + configuration.removeConfig(JobManagerOptions.BIND_HOST); + configuration.removeConfig(TaskManagerOptions.BIND_HOST); configuration.setString(RestOptions.ADDRESS, hostname); configuration.setString(RestOptions.BIND_ADDRESS, hostname);