This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 51c139b601806cc4f4272fb678f4d1bed4cf06ab
Author: Lijie Wang <[email protected]>
AuthorDate: Tue Jun 28 22:53:50 2022 +0800

    [FLINK-28142][runtime] Enrich TaskManagerLocation with node information
---
 .../configuration/TaskManagerOptionsInternal.java  | 11 +++++
 .../decorators/InitTaskManagerDecorator.java       | 15 ++++++-
 .../taskmanager/KubernetesTaskExecutorRunner.java  | 29 ++++++++++++-
 .../apache/flink/kubernetes/utils/Constants.java   |  4 ++
 .../decorators/InitTaskManagerDecoratorTest.java   | 24 +++++++++--
 .../factory/KubernetesTaskManagerFactoryTest.java  |  2 +-
 .../runtime/taskexecutor/TaskManagerRunner.java    |  2 +-
 .../runtime/taskexecutor/TaskManagerServices.java  |  3 +-
 .../TaskManagerServicesConfiguration.java          | 22 +++++++++-
 .../runtime/taskmanager/TaskManagerLocation.java   | 49 ++++++++++++++++++----
 .../taskmanager/UnresolvedTaskManagerLocation.java | 11 ++++-
 .../TaskManagerRunnerConfigurationTest.java        | 35 ++++++++++++++++
 .../LocalUnresolvedTaskManagerLocation.java        |  2 +-
 .../taskmanager/TaskManagerLocationTest.java       | 36 +++++++++++++++-
 .../apache/flink/yarn/YarnTaskExecutorRunner.java  |  3 ++
 .../flink/yarn/YarnTaskExecutorRunnerTest.java     | 15 +++++++
 16 files changed, 241 insertions(+), 22 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptionsInternal.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptionsInternal.java
index 314bb3abd0a..9a0b88202e6 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptionsInternal.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptionsInternal.java
@@ -32,4 +32,15 @@ public class TaskManagerOptionsInternal {
                     .noDefaultValue()
                     .withDescription(
                             "**DO NOT USE** The metadata of TaskManager's 
ResourceID to be used for logging.");
+
+    /**
+     * The ID of the node where the TaskManager is located on. In Yarn and 
Native Kubernetes mode,
+     * this option will be set by resource manager when launch a container for 
the task executor. In
+     * other modes, this option will not be set. This option is only used 
internally.
+     */
+    public static final ConfigOption<String> TASK_MANAGER_NODE_ID =
+            key("internal.taskmanager.node-id")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("ID of the node where the TaskManager is 
located on.");
 }
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecorator.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecorator.java
index 38ea8315063..09eac71eb3e 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecorator.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecorator.java
@@ -31,6 +31,7 @@ import io.fabric8.kubernetes.api.model.ContainerBuilder;
 import io.fabric8.kubernetes.api.model.ContainerPort;
 import io.fabric8.kubernetes.api.model.ContainerPortBuilder;
 import io.fabric8.kubernetes.api.model.EnvVar;
+import io.fabric8.kubernetes.api.model.EnvVarSourceBuilder;
 import io.fabric8.kubernetes.api.model.PodBuilder;
 import io.fabric8.kubernetes.api.model.ResourceRequirements;
 
@@ -39,8 +40,11 @@ import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.kubernetes.utils.Constants.API_VERSION;
 import static org.apache.flink.kubernetes.utils.Constants.DNS_PLOICY_DEFAULT;
 import static 
org.apache.flink.kubernetes.utils.Constants.DNS_PLOICY_HOSTNETWORK;
+import static 
org.apache.flink.kubernetes.utils.Constants.ENV_FLINK_POD_NODE_ID;
+import static 
org.apache.flink.kubernetes.utils.Constants.POD_NODE_ID_FIELD_PATH;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** An initializer for the TaskManager {@link 
org.apache.flink.kubernetes.kubeclient.FlinkPod}. */
@@ -149,7 +153,16 @@ public class InitTaskManagerDecorator extends 
AbstractKubernetesStepDecorator {
                 .withResources(resourceRequirements);
 
         // Merge fields
-        
mainContainerBuilder.addAllToPorts(getContainerPorts()).addAllToEnv(getCustomizedEnvs());
+        mainContainerBuilder
+                .addAllToPorts(getContainerPorts())
+                .addAllToEnv(getCustomizedEnvs())
+                .addNewEnv()
+                .withName(ENV_FLINK_POD_NODE_ID)
+                .withValueFrom(
+                        new EnvVarSourceBuilder()
+                                .withNewFieldRef(API_VERSION, 
POD_NODE_ID_FIELD_PATH)
+                                .build())
+                .endEnv();
         getFlinkLogDirEnv().ifPresent(mainContainerBuilder::addToEnv);
 
         return mainContainerBuilder.build();
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/taskmanager/KubernetesTaskExecutorRunner.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/taskmanager/KubernetesTaskExecutorRunner.java
index cf95c70db4f..049d1ad2b50 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/taskmanager/KubernetesTaskExecutorRunner.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/taskmanager/KubernetesTaskExecutorRunner.java
@@ -18,14 +18,21 @@
 
 package org.apache.flink.kubernetes.taskmanager;
 
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptionsInternal;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.runtime.entrypoint.FlinkParseException;
 import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /** This class is the executable entry point for running a TaskExecutor in a 
Kubernetes pod. */
 public class KubernetesTaskExecutorRunner {
 
@@ -36,6 +43,26 @@ public class KubernetesTaskExecutorRunner {
         SignalHandler.register(LOG);
         JvmShutdownSafeguard.installAsShutdownHook(LOG);
 
-        TaskManagerRunner.runTaskManagerProcessSecurely(args);
+        runTaskManagerSecurely(args);
+    }
+
+    private static void runTaskManagerSecurely(String[] args) {
+        Configuration configuration = null;
+
+        try {
+            configuration = TaskManagerRunner.loadConfiguration(args);
+            final String nodeId = 
System.getenv().get(Constants.ENV_FLINK_POD_NODE_ID);
+            Preconditions.checkState(
+                    nodeId != null,
+                    "The environment variable %s is not set, "
+                            + "which is used to identify the node where the 
task manager is located.",
+                    Constants.ENV_FLINK_POD_NODE_ID);
+            
configuration.setString(TaskManagerOptionsInternal.TASK_MANAGER_NODE_ID, 
nodeId);
+        } catch (FlinkParseException fpe) {
+            LOG.error("Could not load the configuration.", fpe);
+            System.exit(TaskManagerRunner.FAILURE_EXIT_CODE);
+        }
+
+        
TaskManagerRunner.runTaskManagerProcessSecurely(checkNotNull(configuration));
     }
 }
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java
index bde37d933cb..7c1828c4562 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java
@@ -85,6 +85,10 @@ public class Constants {
 
     public static final String POD_IP_FIELD_PATH = "status.podIP";
 
+    public static final String ENV_FLINK_POD_NODE_ID = "_POD_NODE_ID";
+
+    public static final String POD_NODE_ID_FIELD_PATH = "spec.nodeName";
+
     public static final int MAXIMUM_CHARACTERS_OF_CLUSTER_ID = 45;
 
     public static final String RESTART_POLICY_OF_NEVER = "Never";
diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorTest.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorTest.java
index d4acbb4e30e..f7040e440fa 100644
--- 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorTest.java
+++ 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorTest.java
@@ -179,13 +179,29 @@ class InitTaskManagerDecoratorTest extends 
KubernetesTaskManagerTestBase {
     @Test
     void testMainContainerEnv() {
         final Map<String, String> expectedEnvVars = new 
HashMap<>(customizedEnvs);
-
-        final Map<String, String> resultEnvVars =
-                this.resultMainContainer.getEnv().stream()
-                        .collect(Collectors.toMap(EnvVar::getName, 
EnvVar::getValue));
+        final Map<String, String> resultEnvVars = new HashMap<>();
+        this.resultMainContainer
+                .getEnv()
+                .forEach(envVar -> resultEnvVars.put(envVar.getName(), 
envVar.getValue()));
         expectedEnvVars.forEach((k, v) -> 
assertThat(resultEnvVars.get(k)).isEqualTo(v));
     }
 
+    @Test
+    void testNodeIdEnv() {
+        assertThat(this.resultMainContainer.getEnv())
+                .anyMatch(
+                        envVar ->
+                                
envVar.getName().equals(Constants.ENV_FLINK_POD_NODE_ID)
+                                        && envVar.getValueFrom()
+                                                .getFieldRef()
+                                                .getApiVersion()
+                                                .equals(Constants.API_VERSION)
+                                        && envVar.getValueFrom()
+                                                .getFieldRef()
+                                                .getFieldPath()
+                                                
.equals(Constants.POD_NODE_ID_FIELD_PATH));
+    }
+
     @Test
     void testPodName() {
         assertThat(this.resultPod.getMetadata().getName()).isEqualTo(POD_NAME);
diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesTaskManagerFactoryTest.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesTaskManagerFactoryTest.java
index 01a059fc2c2..f49b8ad996f 100644
--- 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesTaskManagerFactoryTest.java
+++ 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesTaskManagerFactoryTest.java
@@ -86,7 +86,7 @@ class KubernetesTaskManagerFactoryTest extends 
KubernetesTaskManagerTestBase {
         assertThat(resultMainContainer.getImagePullPolicy())
                 .isEqualTo(CONTAINER_IMAGE_PULL_POLICY.name());
 
-        assertThat(resultMainContainer.getEnv()).hasSize(4);
+        assertThat(resultMainContainer.getEnv()).hasSize(5);
         assertThat(
                         resultMainContainer.getEnv().stream()
                                 .anyMatch(envVar -> 
envVar.getName().equals("key1")))
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 0762b1558a2..0c2b82ff2b5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -113,7 +113,7 @@ public class TaskManagerRunner implements FatalErrorHandler 
{
     private static final long FATAL_ERROR_SHUTDOWN_TIMEOUT_MS = 10000L;
 
     private static final int SUCCESS_EXIT_CODE = 0;
-    @VisibleForTesting static final int FAILURE_EXIT_CODE = 1;
+    @VisibleForTesting public static final int FAILURE_EXIT_CODE = 1;
 
     private final Thread shutdownHook;
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index a02a3b34df7..78c1e282823 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -304,7 +304,8 @@ public class TaskManagerServices {
                         // iff the external data port is not explicitly defined
                         taskManagerServicesConfiguration.getExternalDataPort() 
> 0
                                 ? 
taskManagerServicesConfiguration.getExternalDataPort()
-                                : listeningDataPort);
+                                : listeningDataPort,
+                        taskManagerServicesConfiguration.getNodeId());
 
         final BroadcastVariableManager broadcastVariableManager = new 
BroadcastVariableManager();
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index 5f0116b6204..301997a7c7c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -27,6 +27,7 @@ import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.TaskManagerOptionsInternal;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.entrypoint.ClusterEntrypointUtils;
 import org.apache.flink.runtime.entrypoint.WorkingDirectory;
@@ -59,6 +60,8 @@ public class TaskManagerServicesConfiguration {
 
     private final String externalAddress;
 
+    private final String nodeId;
+
     private final InetAddress bindAddress;
 
     private final int externalDataPort;
@@ -110,7 +113,8 @@ public class TaskManagerServicesConfiguration {
             Optional<Time> systemResourceMetricsProbingInterval,
             FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder,
             String[] alwaysParentFirstLoaderPatterns,
-            int numIoThreads) {
+            int numIoThreads,
+            String nodeId) {
         this.configuration = checkNotNull(configuration);
         this.resourceID = checkNotNull(resourceID);
 
@@ -139,6 +143,8 @@ public class TaskManagerServicesConfiguration {
 
         this.systemResourceMetricsProbingInterval =
                 checkNotNull(systemResourceMetricsProbingInterval);
+
+        this.nodeId = checkNotNull(nodeId);
     }
 
     // 
--------------------------------------------------------------------------------------------
@@ -230,6 +236,10 @@ public class TaskManagerServicesConfiguration {
         return numIoThreads;
     }
 
+    public String getNodeId() {
+        return nodeId;
+    }
+
     // 
--------------------------------------------------------------------------------------------
     //  Parsing of Flink configuration
     // 
--------------------------------------------------------------------------------------------
@@ -302,6 +312,13 @@ public class TaskManagerServicesConfiguration {
 
         final String[] tmpDirs = 
ConfigurationUtils.parseTempDirectories(configuration);
 
+        // If TaskManagerOptionsInternal.TASK_MANAGER_NODE_ID is not set, use 
the external address
+        // as the node id.
+        final String nodeId =
+                configuration
+                        
.getOptional(TaskManagerOptionsInternal.TASK_MANAGER_NODE_ID)
+                        .orElse(externalAddress);
+
         return new TaskManagerServicesConfiguration(
                 configuration,
                 resourceID,
@@ -321,6 +338,7 @@ public class TaskManagerServicesConfiguration {
                 
ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration),
                 
FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder),
                 alwaysParentFirstLoaderPatterns,
-                numIoThreads);
+                numIoThreads,
+                nodeId);
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
index ce33c8ba7db..36903248548 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.taskmanager;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.TaskManagerOptionsInternal;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.util.NetUtils;
 
@@ -68,6 +69,14 @@ public class TaskManagerLocation implements 
Comparable<TaskManagerLocation>, jav
      */
     private String stringRepresentation;
 
+    /**
+     * ID of the node where the TaskManager is located on. In Yarn and Native 
Kubernetes mode, this
+     * value will be set by resource manager when launch this TaskManager(via 
the config option
+     * {@link TaskManagerOptionsInternal#TASK_MANAGER_NODE_ID}). In other 
modes, this value will be
+     * the external address of the TaskManager.
+     */
+    private final String nodeId;
+
     /**
      * Constructs a new instance connection info object. The constructor will 
attempt to retrieve
      * the instance's host name and domain name through the operating system's 
lookup mechanisms.
@@ -76,13 +85,15 @@ public class TaskManagerLocation implements 
Comparable<TaskManagerLocation>, jav
      * @param dataPort the port instance's task manager expects to receive 
transfer envelopes on
      * @param hostNameSupplier the supplier for obtaining fully-qualified 
domain name and pure
      *     hostname of the task manager
+     * @param nodeId the ID of node where the task manager is located on.
      */
     @VisibleForTesting
     public TaskManagerLocation(
             ResourceID resourceID,
             InetAddress inetAddress,
             int dataPort,
-            HostNameSupplier hostNameSupplier) {
+            HostNameSupplier hostNameSupplier,
+            String nodeId) {
         // -1 indicates a local instance connection info
         checkArgument(dataPort > 0 || dataPort == -1, "dataPort must be > 0, 
or -1 (local)");
 
@@ -90,6 +101,7 @@ public class TaskManagerLocation implements 
Comparable<TaskManagerLocation>, jav
         this.inetAddress = checkNotNull(inetAddress);
         this.dataPort = dataPort;
         this.hostNameSupplier = checkNotNull(hostNameSupplier);
+        this.nodeId = checkNotNull(nodeId);
     }
 
     /**
@@ -101,7 +113,12 @@ public class TaskManagerLocation implements 
Comparable<TaskManagerLocation>, jav
      */
     @VisibleForTesting
     public TaskManagerLocation(ResourceID resourceID, InetAddress inetAddress, 
int dataPort) {
-        this(resourceID, inetAddress, dataPort, new 
DefaultHostNameSupplier(inetAddress));
+        this(
+                resourceID,
+                inetAddress,
+                dataPort,
+                new DefaultHostNameSupplier(inetAddress),
+                getHostName(inetAddress));
     }
 
     public static TaskManagerLocation fromUnresolvedLocation(
@@ -117,13 +134,15 @@ public class TaskManagerLocation implements 
Comparable<TaskManagerLocation>, jav
                         unresolvedLocation.getResourceID(),
                         inetAddress,
                         unresolvedLocation.getDataPort(),
-                        new DefaultHostNameSupplier(inetAddress));
+                        new DefaultHostNameSupplier(inetAddress),
+                        unresolvedLocation.getNodeId());
             case USE_IP_ONLY:
                 return new TaskManagerLocation(
                         unresolvedLocation.getResourceID(),
                         inetAddress,
                         unresolvedLocation.getDataPort(),
-                        new IpOnlyHostNameSupplier(inetAddress));
+                        new IpOnlyHostNameSupplier(inetAddress),
+                        unresolvedLocation.getNodeId());
             default:
                 throw new UnsupportedOperationException("Unsupported 
resolution mode provided.");
         }
@@ -196,6 +215,15 @@ public class TaskManagerLocation implements 
Comparable<TaskManagerLocation>, jav
         return hostNameSupplier.getHostName();
     }
 
+    /**
+     * Return the ID of node where the task manager is located on.
+     *
+     * @return The ID of node where the task manager is located on.
+     */
+    public String getNodeId() {
+        return nodeId;
+    }
+
     // 
--------------------------------------------------------------------------------------------
     // Utilities
     // 
--------------------------------------------------------------------------------------------
@@ -264,7 +292,8 @@ public class TaskManagerLocation implements 
Comparable<TaskManagerLocation>, jav
             TaskManagerLocation that = (TaskManagerLocation) obj;
             return this.resourceID.equals(that.resourceID)
                     && this.inetAddress.equals(that.inetAddress)
-                    && this.dataPort == that.dataPort;
+                    && this.dataPort == that.dataPort
+                    && this.nodeId.equals(that.nodeId);
         } else {
             return false;
         }
@@ -272,7 +301,10 @@ public class TaskManagerLocation implements 
Comparable<TaskManagerLocation>, jav
 
     @Override
     public int hashCode() {
-        return resourceID.hashCode() + 17 * inetAddress.hashCode() + 129 * 
dataPort;
+        return resourceID.hashCode()
+                + 17 * inetAddress.hashCode()
+                + 129 * dataPort
+                + 257 * nodeId.hashCode();
     }
 
     @Override
@@ -309,9 +341,10 @@ public class TaskManagerLocation implements 
Comparable<TaskManagerLocation>, jav
             return -1;
         } else if (this.dataPort > o.dataPort) {
             return 1;
-        } else {
-            return 0;
         }
+
+        // finally, decided based on node id
+        return this.nodeId.compareTo(o.nodeId);
     }
 
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/UnresolvedTaskManagerLocation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/UnresolvedTaskManagerLocation.java
index 28bd9155e2e..249ccef1ef2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/UnresolvedTaskManagerLocation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/UnresolvedTaskManagerLocation.java
@@ -36,15 +36,20 @@ public class UnresolvedTaskManagerLocation implements 
Serializable {
     private final ResourceID resourceID;
     private final String externalAddress;
     private final int dataPort;
+    private final String nodeId;
 
     public UnresolvedTaskManagerLocation(
-            final ResourceID resourceID, final String externalAddress, final 
int dataPort) {
+            final ResourceID resourceID,
+            final String externalAddress,
+            final int dataPort,
+            final String nodeId) {
         // -1 indicates a local instance connection info
         checkArgument(dataPort > 0 || dataPort == -1, "dataPort must be > 0, 
or -1 (local)");
 
         this.resourceID = checkNotNull(resourceID);
         this.externalAddress = checkNotNull(externalAddress);
         this.dataPort = dataPort;
+        this.nodeId = checkNotNull(nodeId);
     }
 
     public ResourceID getResourceID() {
@@ -58,4 +63,8 @@ public class UnresolvedTaskManagerLocation implements 
Serializable {
     public int getDataPort() {
         return dataPort;
     }
+
+    public String getNodeId() {
+        return nodeId;
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java
index 0f021a08c0d..f8b3382947d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java
@@ -24,9 +24,12 @@ import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.TaskManagerOptionsInternal;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.entrypoint.FlinkParseException;
+import org.apache.flink.runtime.entrypoint.WorkingDirectory;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.rest.util.NoOpFatalErrorHandler;
@@ -47,6 +50,7 @@ import javax.annotation.concurrent.NotThreadSafe;
 import java.io.File;
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.net.InetAddress;
 import java.net.ServerSocket;
 import java.net.URI;
 import java.nio.file.Files;
@@ -214,6 +218,37 @@ class TaskManagerRunnerConfigurationTest {
         
assertThat(jmPort).isEqualTo(configuration.getInteger(JobManagerOptions.PORT));
     }
 
+    @Test
+    void testNodeIdShouldBeConfiguredValueIfExplicitlySet() throws Exception {
+        String nodeId = "node1";
+        Configuration configuration = new Configuration();
+        configuration.set(TaskManagerOptionsInternal.TASK_MANAGER_NODE_ID, 
nodeId);
+        TaskManagerServicesConfiguration servicesConfiguration =
+                createTaskManagerServiceConfiguration(configuration);
+        assertThat(servicesConfiguration.getNodeId()).isEqualTo(nodeId);
+    }
+
+    @Test
+    void testNodeIdShouldBeExternalAddressIfNotExplicitlySet() throws 
Exception {
+        TaskManagerServicesConfiguration servicesConfiguration =
+                createTaskManagerServiceConfiguration(new Configuration());
+        assertThat(servicesConfiguration.getNodeId())
+                .isEqualTo(InetAddress.getLocalHost().getHostName());
+    }
+
+    private TaskManagerServicesConfiguration 
createTaskManagerServiceConfiguration(
+            Configuration config) throws Exception {
+        return TaskManagerServicesConfiguration.fromConfiguration(
+                config,
+                ResourceID.generate(),
+                InetAddress.getLocalHost().getHostName(),
+                true,
+                
TaskExecutorResourceUtils.resourceSpecFromConfigForLocalExecution(config),
+                WorkingDirectory.create(
+                        Files.createTempDirectory(temporaryFolder, 
UUID.randomUUID().toString())
+                                .toFile()));
+    }
+
     private static Configuration 
createFlinkConfigWithPredefinedTaskManagerHostname(
             final String taskmanagerHost) {
         final Configuration config = new Configuration();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LocalUnresolvedTaskManagerLocation.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LocalUnresolvedTaskManagerLocation.java
index 201de612669..365974677ea 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LocalUnresolvedTaskManagerLocation.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LocalUnresolvedTaskManagerLocation.java
@@ -25,6 +25,6 @@ public class LocalUnresolvedTaskManagerLocation extends 
UnresolvedTaskManagerLoc
     private static final long serialVersionUID = 1L;
 
     public LocalUnresolvedTaskManagerLocation() {
-        super(ResourceID.generate(), "localhost", 42);
+        super(ResourceID.generate(), "localhost", 42, "localhost");
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerLocationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerLocationTest.java
index 3c9fa646e1e..de3e1c7e69f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerLocationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerLocationTest.java
@@ -91,6 +91,39 @@ class TaskManagerLocationTest {
         }
     }
 
+    @Test
+    void testEqualsHashAndCompareToWithDifferentNodeId() throws Exception {
+        ResourceID resourceID = ResourceID.generate();
+        InetAddress inetAddress = InetAddress.getByName("1.2.3.4");
+        TaskManagerLocation.HostNameSupplier hostNameSupplier =
+                new TaskManagerLocation.DefaultHostNameSupplier(inetAddress);
+        String nodeId1 = "node1";
+        String nodeId2 = "node2";
+
+        // one == three != two
+        TaskManagerLocation one =
+                new TaskManagerLocation(resourceID, inetAddress, 19871, 
hostNameSupplier, nodeId1);
+        TaskManagerLocation two =
+                new TaskManagerLocation(resourceID, inetAddress, 19871, 
hostNameSupplier, nodeId2);
+        TaskManagerLocation three =
+                new TaskManagerLocation(resourceID, inetAddress, 19871, 
hostNameSupplier, nodeId1);
+
+        assertThat(one).isEqualTo(three);
+        assertThat(one).isNotEqualTo(two);
+        assertThat(two).isNotEqualTo(three);
+
+        assertThat(one.hashCode()).isEqualTo(three.hashCode());
+        assertThat(one.hashCode()).isNotEqualTo(two.hashCode());
+        assertThat(two.hashCode()).isNotEqualTo(three.hashCode());
+
+        assertThat(one.compareTo(three)).isEqualTo(0);
+        assertThat(one.compareTo(two)).isNotEqualTo(0);
+        assertThat(two.compareTo(three)).isNotEqualTo(0);
+
+        int val = one.compareTo(two);
+        assertThat(two.compareTo(one)).isEqualTo(-val);
+    }
+
     @Test
     void testSerialization() {
         try {
@@ -211,7 +244,8 @@ class TaskManagerLocationTest {
                         ResourceID.generate(),
                         address,
                         19871,
-                        new 
TaskManagerLocation.IpOnlyHostNameSupplier(address));
+                        new 
TaskManagerLocation.IpOnlyHostNameSupplier(address),
+                        address.getHostAddress());
 
         assertThat("worker10").isNotEqualTo(info.getHostname());
         assertThat("worker10").isNotEqualTo(info.getFQDNHostname());
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
index 6b0a629bf97..f39010a5d9f 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.TaskManagerOptionsInternal;
 import org.apache.flink.core.plugin.PluginManager;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
@@ -138,6 +139,8 @@ public class YarnTaskExecutorRunner {
                 variables.get(YarnResourceManagerDriver.ENV_FLINK_NODE_ID);
         if (taskExecutorHostname != null) {
             configuration.setString(TaskManagerOptions.HOST, 
taskExecutorHostname);
+            configuration.setString(
+                    TaskManagerOptionsInternal.TASK_MANAGER_NODE_ID, 
taskExecutorHostname);
         }
     }
 }
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnTaskExecutorRunnerTest.java
 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnTaskExecutorRunnerTest.java
index 32043040c29..15a7149079f 100644
--- 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnTaskExecutorRunnerTest.java
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnTaskExecutorRunnerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.yarn;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.configuration.TaskManagerOptionsInternal;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.security.modules.HadoopModule;
@@ -30,6 +31,7 @@ import org.junit.jupiter.api.Test;
 
 import java.io.File;
 import java.nio.file.Paths;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -125,4 +127,17 @@ public class YarnTaskExecutorRunnerTest {
         
assertThat(configuration.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL))
                 .isEqualTo("testuser1@domain");
     }
+
+    @Test
+    void testTaskManagerNodeIdConfiguration() throws Exception {
+        final String resourceDirPath =
+                Paths.get("src", "test", 
"resources").toAbsolutePath().toString();
+        Configuration configuration = new Configuration();
+        YarnTaskExecutorRunner.setupAndModifyConfiguration(
+                configuration,
+                resourceDirPath,
+                
Collections.singletonMap(YarnResourceManagerDriver.ENV_FLINK_NODE_ID, "test"));
+        
assertThat(configuration.getString(TaskManagerOptionsInternal.TASK_MANAGER_NODE_ID))
+                .isEqualTo("test");
+    }
 }

Reply via email to