This is an automated email from the ASF dual-hosted git repository.
gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 6e55d223efd [FLINK-27118][yarn] TM ignores localhost BIND_HOST
6e55d223efd is described below
commit 6e55d223efdc2563f0b7600a393ff9a3d8767942
Author: Chesnay Schepler <[email protected]>
AuthorDate: Thu Apr 7 17:37:29 2022 +0200
[FLINK-27118][yarn] TM ignores localhost BIND_HOST
This closes #19395.
---
flink-dist/src/main/resources/flink-conf.yaml | 4 ++++
.../apache/flink/yarn/YarnClusterDescriptor.java | 22 ++++++++++++++++++++++
.../flink/yarn/entrypoint/YarnEntrypointUtils.java | 4 ----
3 files changed, 26 insertions(+), 4 deletions(-)
diff --git a/flink-dist/src/main/resources/flink-conf.yaml
b/flink-dist/src/main/resources/flink-conf.yaml
index e1ce1286c35..a8c0b7bb1a7 100644
--- a/flink-dist/src/main/resources/flink-conf.yaml
+++ b/flink-dist/src/main/resources/flink-conf.yaml
@@ -38,6 +38,8 @@ jobmanager.rpc.port: 6123
# The host interface the JobManager will bind to. My default, this is
localhost, and will prevent
# the JobManager from communicating outside the machine/container it is
running on.
+# On YARN this setting will be ignored if it is set to 'localhost', defaulting
to 0.0.0.0.
+# On Kubernetes this setting will be ignored, defaulting to 0.0.0.0.
#
# To enable this, set the bind-host address to one that has access to an
outside facing network
# interface, such as 0.0.0.0.
@@ -53,6 +55,8 @@ jobmanager.memory.process.size: 1600m
# The host interface the TaskManager will bind to. By default, this is
localhost, and will prevent
# the TaskManager from communicating outside the machine/container it is
running on.
+# On YARN this setting will be ignored if it is set to 'localhost', defaulting
to 0.0.0.0.
+# On Kubernetes this setting will be ignored, defaulting to 0.0.0.0.
#
# To enable this, set the bind-host address to one that has access to an
outside facing network
# interface, such as 0.0.0.0.
diff --git
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 2b541d46516..482e38f93ce 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -42,6 +42,7 @@ import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.plugin.PluginConfig;
import org.apache.flink.core.plugin.PluginUtils;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
@@ -1020,6 +1021,13 @@ public class YarnClusterDescriptor implements
ClusterDescriptor<ApplicationId> {
File tmpConfigurationFile = null;
try {
tmpConfigurationFile = File.createTempFile(appId +
"-flink-conf.yaml", null);
+
+ // remove localhost bind hosts as they render production clusters
unusable
+ removeLocalhostBindHostSetting(configuration,
JobManagerOptions.BIND_HOST);
+ removeLocalhostBindHostSetting(configuration,
TaskManagerOptions.BIND_HOST);
+ // this setting is unconditionally overridden anyway, so we remove
it for clarity
+ configuration.removeConfig(TaskManagerOptions.HOST);
+
BootstrapTools.writeConfiguration(configuration,
tmpConfigurationFile);
String flinkConfigKey = "flink-conf.yaml";
@@ -1266,6 +1274,20 @@ public class YarnClusterDescriptor implements
ClusterDescriptor<ApplicationId> {
return report;
}
+ private void removeLocalhostBindHostSetting(
+ Configuration configuration, ConfigOption<?> option) {
+ configuration
+ .getOptional(option)
+ .filter(bindHost -> bindHost.equals("localhost"))
+ .ifPresent(
+ bindHost -> {
+ LOG.info(
+ "Removing 'localhost' {} setting from
effective configuration; using '0.0.0.0' instead.",
+ option);
+ configuration.removeConfig(option);
+ });
+ }
+
private void setTokensFor(ContainerLaunchContext containerLaunchContext)
throws IOException {
LOG.info("Adding delegation tokens to the AM container.");
diff --git
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
index 986fd378581..2321355a390 100644
---
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
+++
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
@@ -25,7 +25,6 @@ import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.SecurityOptions;
-import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.util.Preconditions;
@@ -65,9 +64,6 @@ public class YarnEntrypointUtils {
ApplicationConstants.Environment.NM_HOST.key());
configuration.setString(JobManagerOptions.ADDRESS, hostname);
- configuration.removeConfig(JobManagerOptions.BIND_HOST);
- configuration.removeConfig(TaskManagerOptions.BIND_HOST);
- configuration.removeConfig(TaskManagerOptions.HOST);
configuration.setString(RestOptions.ADDRESS, hostname);
configuration.setString(RestOptions.BIND_ADDRESS, hostname);