This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 51c139b601806cc4f4272fb678f4d1bed4cf06ab Author: Lijie Wang <[email protected]> AuthorDate: Tue Jun 28 22:53:50 2022 +0800 [FLINK-28142][runtime] Enrich TaskManagerLocation with node information --- .../configuration/TaskManagerOptionsInternal.java | 11 +++++ .../decorators/InitTaskManagerDecorator.java | 15 ++++++- .../taskmanager/KubernetesTaskExecutorRunner.java | 29 ++++++++++++- .../apache/flink/kubernetes/utils/Constants.java | 4 ++ .../decorators/InitTaskManagerDecoratorTest.java | 24 +++++++++-- .../factory/KubernetesTaskManagerFactoryTest.java | 2 +- .../runtime/taskexecutor/TaskManagerRunner.java | 2 +- .../runtime/taskexecutor/TaskManagerServices.java | 3 +- .../TaskManagerServicesConfiguration.java | 22 +++++++++- .../runtime/taskmanager/TaskManagerLocation.java | 49 ++++++++++++++++++---- .../taskmanager/UnresolvedTaskManagerLocation.java | 11 ++++- .../TaskManagerRunnerConfigurationTest.java | 35 ++++++++++++++++ .../LocalUnresolvedTaskManagerLocation.java | 2 +- .../taskmanager/TaskManagerLocationTest.java | 36 +++++++++++++++- .../apache/flink/yarn/YarnTaskExecutorRunner.java | 3 ++ .../flink/yarn/YarnTaskExecutorRunnerTest.java | 15 +++++++ 16 files changed, 241 insertions(+), 22 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptionsInternal.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptionsInternal.java index 314bb3abd0a..9a0b88202e6 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptionsInternal.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptionsInternal.java @@ -32,4 +32,15 @@ public class TaskManagerOptionsInternal { .noDefaultValue() .withDescription( "**DO NOT USE** The metadata of TaskManager's ResourceID to be used for logging."); + + /** + * The ID of the node where the TaskManager is located on. In Yarn and Native Kubernetes mode, + * this option will be set by resource manager when launch a container for the task executor. In + * other modes, this option will not be set. This option is only used internally. + */ + public static final ConfigOption<String> TASK_MANAGER_NODE_ID = + key("internal.taskmanager.node-id") + .stringType() + .noDefaultValue() + .withDescription("ID of the node where the TaskManager is located on."); } diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecorator.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecorator.java index 38ea8315063..09eac71eb3e 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecorator.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecorator.java @@ -31,6 +31,7 @@ import io.fabric8.kubernetes.api.model.ContainerBuilder; import io.fabric8.kubernetes.api.model.ContainerPort; import io.fabric8.kubernetes.api.model.ContainerPortBuilder; import io.fabric8.kubernetes.api.model.EnvVar; +import io.fabric8.kubernetes.api.model.EnvVarSourceBuilder; import io.fabric8.kubernetes.api.model.PodBuilder; import io.fabric8.kubernetes.api.model.ResourceRequirements; @@ -39,8 +40,11 @@ import java.util.List; import java.util.Optional; import java.util.stream.Collectors; +import static org.apache.flink.kubernetes.utils.Constants.API_VERSION; import static org.apache.flink.kubernetes.utils.Constants.DNS_PLOICY_DEFAULT; import static org.apache.flink.kubernetes.utils.Constants.DNS_PLOICY_HOSTNETWORK; +import static org.apache.flink.kubernetes.utils.Constants.ENV_FLINK_POD_NODE_ID; +import static org.apache.flink.kubernetes.utils.Constants.POD_NODE_ID_FIELD_PATH; import static org.apache.flink.util.Preconditions.checkNotNull; /** An initializer for the TaskManager {@link org.apache.flink.kubernetes.kubeclient.FlinkPod}. */ @@ -149,7 +153,16 @@ public class InitTaskManagerDecorator extends AbstractKubernetesStepDecorator { .withResources(resourceRequirements); // Merge fields - mainContainerBuilder.addAllToPorts(getContainerPorts()).addAllToEnv(getCustomizedEnvs()); + mainContainerBuilder + .addAllToPorts(getContainerPorts()) + .addAllToEnv(getCustomizedEnvs()) + .addNewEnv() + .withName(ENV_FLINK_POD_NODE_ID) + .withValueFrom( + new EnvVarSourceBuilder() + .withNewFieldRef(API_VERSION, POD_NODE_ID_FIELD_PATH) + .build()) + .endEnv(); getFlinkLogDirEnv().ifPresent(mainContainerBuilder::addToEnv); return mainContainerBuilder.build(); diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/taskmanager/KubernetesTaskExecutorRunner.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/taskmanager/KubernetesTaskExecutorRunner.java index cf95c70db4f..049d1ad2b50 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/taskmanager/KubernetesTaskExecutorRunner.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/taskmanager/KubernetesTaskExecutorRunner.java @@ -18,14 +18,21 @@ package org.apache.flink.kubernetes.taskmanager; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptionsInternal; +import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.runtime.entrypoint.FlinkParseException; import org.apache.flink.runtime.taskexecutor.TaskManagerRunner; import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.runtime.util.JvmShutdownSafeguard; import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** This class is the executable entry point for running a TaskExecutor in a Kubernetes pod. */ public class KubernetesTaskExecutorRunner { @@ -36,6 +43,26 @@ public class KubernetesTaskExecutorRunner { SignalHandler.register(LOG); JvmShutdownSafeguard.installAsShutdownHook(LOG); - TaskManagerRunner.runTaskManagerProcessSecurely(args); + runTaskManagerSecurely(args); + } + + private static void runTaskManagerSecurely(String[] args) { + Configuration configuration = null; + + try { + configuration = TaskManagerRunner.loadConfiguration(args); + final String nodeId = System.getenv().get(Constants.ENV_FLINK_POD_NODE_ID); + Preconditions.checkState( + nodeId != null, + "The environment variable %s is not set, " + + "which is used to identify the node where the task manager is located.", + Constants.ENV_FLINK_POD_NODE_ID); + configuration.setString(TaskManagerOptionsInternal.TASK_MANAGER_NODE_ID, nodeId); + } catch (FlinkParseException fpe) { + LOG.error("Could not load the configuration.", fpe); + System.exit(TaskManagerRunner.FAILURE_EXIT_CODE); + } + + TaskManagerRunner.runTaskManagerProcessSecurely(checkNotNull(configuration)); } } diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java index bde37d933cb..7c1828c4562 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java @@ -85,6 +85,10 @@ public class Constants { public static final String POD_IP_FIELD_PATH = "status.podIP"; + public static final String ENV_FLINK_POD_NODE_ID = "_POD_NODE_ID"; + + public static final String POD_NODE_ID_FIELD_PATH = "spec.nodeName"; + public static final int MAXIMUM_CHARACTERS_OF_CLUSTER_ID = 45; public static final String RESTART_POLICY_OF_NEVER = "Never"; diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorTest.java index d4acbb4e30e..f7040e440fa 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorTest.java @@ -179,13 +179,29 @@ class InitTaskManagerDecoratorTest extends KubernetesTaskManagerTestBase { @Test void testMainContainerEnv() { final Map<String, String> expectedEnvVars = new HashMap<>(customizedEnvs); - - final Map<String, String> resultEnvVars = - this.resultMainContainer.getEnv().stream() - .collect(Collectors.toMap(EnvVar::getName, EnvVar::getValue)); + final Map<String, String> resultEnvVars = new HashMap<>(); + this.resultMainContainer + .getEnv() + .forEach(envVar -> resultEnvVars.put(envVar.getName(), envVar.getValue())); expectedEnvVars.forEach((k, v) -> assertThat(resultEnvVars.get(k)).isEqualTo(v)); } + @Test + void testNodeIdEnv() { + assertThat(this.resultMainContainer.getEnv()) + .anyMatch( + envVar -> + envVar.getName().equals(Constants.ENV_FLINK_POD_NODE_ID) + && envVar.getValueFrom() + .getFieldRef() + .getApiVersion() + .equals(Constants.API_VERSION) + && envVar.getValueFrom() + .getFieldRef() + .getFieldPath() + .equals(Constants.POD_NODE_ID_FIELD_PATH)); + } + @Test void testPodName() { assertThat(this.resultPod.getMetadata().getName()).isEqualTo(POD_NAME); diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesTaskManagerFactoryTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesTaskManagerFactoryTest.java index 01a059fc2c2..f49b8ad996f 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesTaskManagerFactoryTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesTaskManagerFactoryTest.java @@ -86,7 +86,7 @@ class KubernetesTaskManagerFactoryTest extends KubernetesTaskManagerTestBase { assertThat(resultMainContainer.getImagePullPolicy()) .isEqualTo(CONTAINER_IMAGE_PULL_POLICY.name()); - assertThat(resultMainContainer.getEnv()).hasSize(4); + assertThat(resultMainContainer.getEnv()).hasSize(5); assertThat( resultMainContainer.getEnv().stream() .anyMatch(envVar -> envVar.getName().equals("key1"))) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java index 0762b1558a2..0c2b82ff2b5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java @@ -113,7 +113,7 @@ public class TaskManagerRunner implements FatalErrorHandler { private static final long FATAL_ERROR_SHUTDOWN_TIMEOUT_MS = 10000L; private static final int SUCCESS_EXIT_CODE = 0; - @VisibleForTesting static final int FAILURE_EXIT_CODE = 1; + @VisibleForTesting public static final int FAILURE_EXIT_CODE = 1; private final Thread shutdownHook; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java index a02a3b34df7..78c1e282823 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java @@ -304,7 +304,8 @@ public class TaskManagerServices { // iff the external data port is not explicitly defined taskManagerServicesConfiguration.getExternalDataPort() > 0 ? taskManagerServicesConfiguration.getExternalDataPort() - : listeningDataPort); + : listeningDataPort, + taskManagerServicesConfiguration.getNodeId()); final BroadcastVariableManager broadcastVariableManager = new BroadcastVariableManager(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java index 5f0116b6204..301997a7c7c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java @@ -27,6 +27,7 @@ import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.configuration.TaskManagerOptionsInternal; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.entrypoint.ClusterEntrypointUtils; import org.apache.flink.runtime.entrypoint.WorkingDirectory; @@ -59,6 +60,8 @@ public class TaskManagerServicesConfiguration { private final String externalAddress; + private final String nodeId; + private final InetAddress bindAddress; private final int externalDataPort; @@ -110,7 +113,8 @@ public class TaskManagerServicesConfiguration { Optional<Time> systemResourceMetricsProbingInterval, FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder, String[] alwaysParentFirstLoaderPatterns, - int numIoThreads) { + int numIoThreads, + String nodeId) { this.configuration = checkNotNull(configuration); this.resourceID = checkNotNull(resourceID); @@ -139,6 +143,8 @@ public class TaskManagerServicesConfiguration { this.systemResourceMetricsProbingInterval = checkNotNull(systemResourceMetricsProbingInterval); + + this.nodeId = checkNotNull(nodeId); } // -------------------------------------------------------------------------------------------- @@ -230,6 +236,10 @@ public class TaskManagerServicesConfiguration { return numIoThreads; } + public String getNodeId() { + return nodeId; + } + // -------------------------------------------------------------------------------------------- // Parsing of Flink configuration // -------------------------------------------------------------------------------------------- @@ -302,6 +312,13 @@ public class TaskManagerServicesConfiguration { final String[] tmpDirs = ConfigurationUtils.parseTempDirectories(configuration); + // If TaskManagerOptionsInternal.TASK_MANAGER_NODE_ID is not set, use the external address + // as the node id. + final String nodeId = + configuration + .getOptional(TaskManagerOptionsInternal.TASK_MANAGER_NODE_ID) + .orElse(externalAddress); + return new TaskManagerServicesConfiguration( configuration, resourceID, @@ -321,6 +338,7 @@ public class TaskManagerServicesConfiguration { ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration), FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder), alwaysParentFirstLoaderPatterns, - numIoThreads); + numIoThreads, + nodeId); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java index ce33c8ba7db..36903248548 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.taskmanager; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.TaskManagerOptionsInternal; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.util.NetUtils; @@ -68,6 +69,14 @@ public class TaskManagerLocation implements Comparable<TaskManagerLocation>, jav */ private String stringRepresentation; + /** + * ID of the node where the TaskManager is located on. In Yarn and Native Kubernetes mode, this + * value will be set by resource manager when launch this TaskManager(via the config option + * {@link TaskManagerOptionsInternal#TASK_MANAGER_NODE_ID}). In other modes, this value will be + * the external address of the TaskManager. + */ + private final String nodeId; + /** * Constructs a new instance connection info object. The constructor will attempt to retrieve * the instance's host name and domain name through the operating system's lookup mechanisms. @@ -76,13 +85,15 @@ public class TaskManagerLocation implements Comparable<TaskManagerLocation>, jav * @param dataPort the port instance's task manager expects to receive transfer envelopes on * @param hostNameSupplier the supplier for obtaining fully-qualified domain name and pure * hostname of the task manager + * @param nodeId the ID of node where the task manager is located on. */ @VisibleForTesting public TaskManagerLocation( ResourceID resourceID, InetAddress inetAddress, int dataPort, - HostNameSupplier hostNameSupplier) { + HostNameSupplier hostNameSupplier, + String nodeId) { // -1 indicates a local instance connection info checkArgument(dataPort > 0 || dataPort == -1, "dataPort must be > 0, or -1 (local)"); @@ -90,6 +101,7 @@ public class TaskManagerLocation implements Comparable<TaskManagerLocation>, jav this.inetAddress = checkNotNull(inetAddress); this.dataPort = dataPort; this.hostNameSupplier = checkNotNull(hostNameSupplier); + this.nodeId = checkNotNull(nodeId); } /** @@ -101,7 +113,12 @@ public class TaskManagerLocation implements Comparable<TaskManagerLocation>, jav */ @VisibleForTesting public TaskManagerLocation(ResourceID resourceID, InetAddress inetAddress, int dataPort) { - this(resourceID, inetAddress, dataPort, new DefaultHostNameSupplier(inetAddress)); + this( + resourceID, + inetAddress, + dataPort, + new DefaultHostNameSupplier(inetAddress), + getHostName(inetAddress)); } public static TaskManagerLocation fromUnresolvedLocation( @@ -117,13 +134,15 @@ public class TaskManagerLocation implements Comparable<TaskManagerLocation>, jav unresolvedLocation.getResourceID(), inetAddress, unresolvedLocation.getDataPort(), - new DefaultHostNameSupplier(inetAddress)); + new DefaultHostNameSupplier(inetAddress), + unresolvedLocation.getNodeId()); case USE_IP_ONLY: return new TaskManagerLocation( unresolvedLocation.getResourceID(), inetAddress, unresolvedLocation.getDataPort(), - new IpOnlyHostNameSupplier(inetAddress)); + new IpOnlyHostNameSupplier(inetAddress), + unresolvedLocation.getNodeId()); default: throw new UnsupportedOperationException("Unsupported resolution mode provided."); } @@ -196,6 +215,15 @@ public class TaskManagerLocation implements Comparable<TaskManagerLocation>, jav return hostNameSupplier.getHostName(); } + /** + * Return the ID of node where the task manager is located on. + * + * @return The ID of node where the task manager is located on. + */ + public String getNodeId() { + return nodeId; + } + // -------------------------------------------------------------------------------------------- // Utilities // -------------------------------------------------------------------------------------------- @@ -264,7 +292,8 @@ public class TaskManagerLocation implements Comparable<TaskManagerLocation>, jav TaskManagerLocation that = (TaskManagerLocation) obj; return this.resourceID.equals(that.resourceID) && this.inetAddress.equals(that.inetAddress) - && this.dataPort == that.dataPort; + && this.dataPort == that.dataPort + && this.nodeId.equals(that.nodeId); } else { return false; } @@ -272,7 +301,10 @@ public class TaskManagerLocation implements Comparable<TaskManagerLocation>, jav @Override public int hashCode() { - return resourceID.hashCode() + 17 * inetAddress.hashCode() + 129 * dataPort; + return resourceID.hashCode() + + 17 * inetAddress.hashCode() + + 129 * dataPort + + 257 * nodeId.hashCode(); } @Override @@ -309,9 +341,10 @@ public class TaskManagerLocation implements Comparable<TaskManagerLocation>, jav return -1; } else if (this.dataPort > o.dataPort) { return 1; - } else { - return 0; } + + // finally, decided based on node id + return this.nodeId.compareTo(o.nodeId); } // -------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/UnresolvedTaskManagerLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/UnresolvedTaskManagerLocation.java index 28bd9155e2e..249ccef1ef2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/UnresolvedTaskManagerLocation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/UnresolvedTaskManagerLocation.java @@ -36,15 +36,20 @@ public class UnresolvedTaskManagerLocation implements Serializable { private final ResourceID resourceID; private final String externalAddress; private final int dataPort; + private final String nodeId; public UnresolvedTaskManagerLocation( - final ResourceID resourceID, final String externalAddress, final int dataPort) { + final ResourceID resourceID, + final String externalAddress, + final int dataPort, + final String nodeId) { // -1 indicates a local instance connection info checkArgument(dataPort > 0 || dataPort == -1, "dataPort must be > 0, or -1 (local)"); this.resourceID = checkNotNull(resourceID); this.externalAddress = checkNotNull(externalAddress); this.dataPort = dataPort; + this.nodeId = checkNotNull(nodeId); } public ResourceID getResourceID() { @@ -58,4 +63,8 @@ public class UnresolvedTaskManagerLocation implements Serializable { public int getDataPort() { return dataPort; } + + public String getNodeId() { + return nodeId; + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java index 0f021a08c0d..f8b3382947d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java @@ -24,9 +24,12 @@ import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.configuration.TaskManagerOptionsInternal; import org.apache.flink.configuration.UnmodifiableConfiguration; import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.entrypoint.FlinkParseException; +import org.apache.flink.runtime.entrypoint.WorkingDirectory; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; import org.apache.flink.runtime.rest.util.NoOpFatalErrorHandler; @@ -47,6 +50,7 @@ import javax.annotation.concurrent.NotThreadSafe; import java.io.File; import java.io.IOException; import java.io.PrintWriter; +import java.net.InetAddress; import java.net.ServerSocket; import java.net.URI; import java.nio.file.Files; @@ -214,6 +218,37 @@ class TaskManagerRunnerConfigurationTest { assertThat(jmPort).isEqualTo(configuration.getInteger(JobManagerOptions.PORT)); } + @Test + void testNodeIdShouldBeConfiguredValueIfExplicitlySet() throws Exception { + String nodeId = "node1"; + Configuration configuration = new Configuration(); + configuration.set(TaskManagerOptionsInternal.TASK_MANAGER_NODE_ID, nodeId); + TaskManagerServicesConfiguration servicesConfiguration = + createTaskManagerServiceConfiguration(configuration); + assertThat(servicesConfiguration.getNodeId()).isEqualTo(nodeId); + } + + @Test + void testNodeIdShouldBeExternalAddressIfNotExplicitlySet() throws Exception { + TaskManagerServicesConfiguration servicesConfiguration = + createTaskManagerServiceConfiguration(new Configuration()); + assertThat(servicesConfiguration.getNodeId()) + .isEqualTo(InetAddress.getLocalHost().getHostName()); + } + + private TaskManagerServicesConfiguration createTaskManagerServiceConfiguration( + Configuration config) throws Exception { + return TaskManagerServicesConfiguration.fromConfiguration( + config, + ResourceID.generate(), + InetAddress.getLocalHost().getHostName(), + true, + TaskExecutorResourceUtils.resourceSpecFromConfigForLocalExecution(config), + WorkingDirectory.create( + Files.createTempDirectory(temporaryFolder, UUID.randomUUID().toString()) + .toFile())); + } + private static Configuration createFlinkConfigWithPredefinedTaskManagerHostname( final String taskmanagerHost) { final Configuration config = new Configuration(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LocalUnresolvedTaskManagerLocation.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LocalUnresolvedTaskManagerLocation.java index 201de612669..365974677ea 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LocalUnresolvedTaskManagerLocation.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LocalUnresolvedTaskManagerLocation.java @@ -25,6 +25,6 @@ public class LocalUnresolvedTaskManagerLocation extends UnresolvedTaskManagerLoc private static final long serialVersionUID = 1L; public LocalUnresolvedTaskManagerLocation() { - super(ResourceID.generate(), "localhost", 42); + super(ResourceID.generate(), "localhost", 42, "localhost"); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerLocationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerLocationTest.java index 3c9fa646e1e..de3e1c7e69f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerLocationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerLocationTest.java @@ -91,6 +91,39 @@ class TaskManagerLocationTest { } } + @Test + void testEqualsHashAndCompareToWithDifferentNodeId() throws Exception { + ResourceID resourceID = ResourceID.generate(); + InetAddress inetAddress = InetAddress.getByName("1.2.3.4"); + TaskManagerLocation.HostNameSupplier hostNameSupplier = + new TaskManagerLocation.DefaultHostNameSupplier(inetAddress); + String nodeId1 = "node1"; + String nodeId2 = "node2"; + + // one == three != two + TaskManagerLocation one = + new TaskManagerLocation(resourceID, inetAddress, 19871, hostNameSupplier, nodeId1); + TaskManagerLocation two = + new TaskManagerLocation(resourceID, inetAddress, 19871, hostNameSupplier, nodeId2); + TaskManagerLocation three = + new TaskManagerLocation(resourceID, inetAddress, 19871, hostNameSupplier, nodeId1); + + assertThat(one).isEqualTo(three); + assertThat(one).isNotEqualTo(two); + assertThat(two).isNotEqualTo(three); + + assertThat(one.hashCode()).isEqualTo(three.hashCode()); + assertThat(one.hashCode()).isNotEqualTo(two.hashCode()); + assertThat(two.hashCode()).isNotEqualTo(three.hashCode()); + + assertThat(one.compareTo(three)).isEqualTo(0); + assertThat(one.compareTo(two)).isNotEqualTo(0); + assertThat(two.compareTo(three)).isNotEqualTo(0); + + int val = one.compareTo(two); + assertThat(two.compareTo(one)).isEqualTo(-val); + } + @Test void testSerialization() { try { @@ -211,7 +244,8 @@ class TaskManagerLocationTest { ResourceID.generate(), address, 19871, - new TaskManagerLocation.IpOnlyHostNameSupplier(address)); + new TaskManagerLocation.IpOnlyHostNameSupplier(address), + address.getHostAddress()); assertThat("worker10").isNotEqualTo(info.getHostname()); assertThat("worker10").isNotEqualTo(info.getFQDNHostname()); diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java index 6b0a629bf97..f39010a5d9f 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.configuration.TaskManagerOptionsInternal; import org.apache.flink.core.plugin.PluginManager; import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.taskexecutor.TaskManagerRunner; @@ -138,6 +139,8 @@ public class YarnTaskExecutorRunner { variables.get(YarnResourceManagerDriver.ENV_FLINK_NODE_ID); if (taskExecutorHostname != null) { configuration.setString(TaskManagerOptions.HOST, taskExecutorHostname); + configuration.setString( + TaskManagerOptionsInternal.TASK_MANAGER_NODE_ID, taskExecutorHostname); } } } diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnTaskExecutorRunnerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnTaskExecutorRunnerTest.java index 32043040c29..15a7149079f 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnTaskExecutorRunnerTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnTaskExecutorRunnerTest.java @@ -20,6 +20,7 @@ package org.apache.flink.yarn; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.SecurityOptions; +import org.apache.flink.configuration.TaskManagerOptionsInternal; import org.apache.flink.runtime.security.SecurityConfiguration; import org.apache.flink.runtime.security.SecurityUtils; import org.apache.flink.runtime.security.modules.HadoopModule; @@ -30,6 +31,7 @@ import org.junit.jupiter.api.Test; import java.io.File; import java.nio.file.Paths; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -125,4 +127,17 @@ public class YarnTaskExecutorRunnerTest { assertThat(configuration.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL)) .isEqualTo("testuser1@domain"); } + + @Test + void testTaskManagerNodeIdConfiguration() throws Exception { + final String resourceDirPath = + Paths.get("src", "test", "resources").toAbsolutePath().toString(); + Configuration configuration = new Configuration(); + YarnTaskExecutorRunner.setupAndModifyConfiguration( + configuration, + resourceDirPath, + Collections.singletonMap(YarnResourceManagerDriver.ENV_FLINK_NODE_ID, "test")); + assertThat(configuration.getString(TaskManagerOptionsInternal.TASK_MANAGER_NODE_ID)) + .isEqualTo("test"); + } }
