zhuzhurk commented on code in PR #20056:
URL: https://github.com/apache/flink/pull/20056#discussion_r908290749


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java:
##########
@@ -55,6 +56,10 @@ public class TaskExecutorRegistration implements 
Serializable {
     /** The task executor total resource profile. */
     private final ResourceProfile totalResourceProfile;
 
+    /** ID of the node where the TaskManager is located. */
+    private final String nodeId;

Review Comment:
   This change seems to serve another purpose which is different from adding 
`nodeId` to `TaskManagerLocation`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java:
##########
@@ -76,20 +84,41 @@ public class TaskManagerLocation implements 
Comparable<TaskManagerLocation>, jav
      * @param dataPort the port instance's task manager expects to receive 
transfer envelopes on
      * @param hostNameSupplier the supplier for obtaining fully-qualified 
domain name and pure
      *     hostname of the task manager
+     * @param nodeId the ID of node where the task manager is located.
      */
     @VisibleForTesting
     public TaskManagerLocation(
             ResourceID resourceID,
             InetAddress inetAddress,
             int dataPort,
-            HostNameSupplier hostNameSupplier) {
+            HostNameSupplier hostNameSupplier,
+            String nodeId) {
         // -1 indicates a local instance connection info
         checkArgument(dataPort > 0 || dataPort == -1, "dataPort must be > 0, 
or -1 (local)");
 
         this.resourceID = checkNotNull(resourceID);
         this.inetAddress = checkNotNull(inetAddress);
         this.dataPort = dataPort;
         this.hostNameSupplier = checkNotNull(hostNameSupplier);
+        this.nodeId = checkNotNull(nodeId);
+    }
+
+    /**
+     * Constructs a new instance connection info object. The constructor will 
attempt to retrieve
+     * the instance's host name and domain name through the operating system's 
lookup mechanisms.
+     *
+     * @param inetAddress the network address the instance's task manager 
binds its sockets to
+     * @param dataPort the port instance's task manager expects to receive 
transfer envelopes on
+     * @param hostNameSupplier the supplier for obtaining fully-qualified 
domain name and pure
+     *     hostname of the task manager
+     */
+    @VisibleForTesting
+    public TaskManagerLocation(

Review Comment:
   If this method will not be used in production, I would propose to drop it 
and rework its current usages (given that there are not many usages).



##########
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptionsInternal.java:
##########
@@ -32,4 +32,15 @@ public class TaskManagerOptionsInternal {
                     .noDefaultValue()
                     .withDescription(
                             "**DO NOT USE** The metadata of TaskManager's 
ResourceID to be used for logging.");
+
+    /**
+     * The ID of the node where the TaskManager is located. In Yarn and Native 
Kubernetes mode, this
+     * option will be set by resource manager when launch a container for the 
task executor. In
+     * other modes, this option will not be set. This option is only used 
internally.
+     */
+    public static final ConfigOption<String> TASK_MANAGER_NODE_ID =
+            key("internal.taskmanager.node-id")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("ID of the node where the TaskManager is 
located.");

Review Comment:
   located -> located in



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorTest.java:
##########
@@ -178,12 +178,24 @@ void testMainContainerPorts() {
 
     @Test
     void testMainContainerEnv() {
-        final Map<String, String> expectedEnvVars = new 
HashMap<>(customizedEnvs);
+        final Map<String, String> envVars = new HashMap<>();
+        this.resultMainContainer
+                .getEnv()
+                .forEach(envVar -> envVars.put(envVar.getName(), 
envVar.getValue()));
+        this.customizedEnvs.forEach((k, v) -> 
assertThat(envVars.get(k)).isEqualTo(v));
 
-        final Map<String, String> resultEnvVars =
-                this.resultMainContainer.getEnv().stream()
-                        .collect(Collectors.toMap(EnvVar::getName, 
EnvVar::getValue));
-        expectedEnvVars.forEach((k, v) -> 
assertThat(resultEnvVars.get(k)).isEqualTo(v));

Review Comment:
   Why do we need to make the above changes?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java:
##########
@@ -74,17 +68,17 @@
  * and verifies its content.
  */
 @NotThreadSafe
-public class TaskManagerRunnerConfigurationTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)

Review Comment:
   This `ExtendWith` is not needed because it was added as a basic rule of the 
project.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java:
##########
@@ -55,6 +56,10 @@ public class TaskExecutorRegistration implements 
Serializable {
     /** The task executor total resource profile. */
     private final ResourceProfile totalResourceProfile;
 
+    /** ID of the node where the TaskManager is located. */
+    private final String nodeId;
+
+    @VisibleForTesting
     public TaskExecutorRegistration(

Review Comment:
   If this method will not be used in production, I would propose to drop it 
and rework its current usages (given that there are not many usages).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to