tillrohrmann commented on a change in pull request #12054:
URL: https://github.com/apache/flink/pull/12054#discussion_r443420479



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
##########
@@ -458,4 +462,11 @@ private static String 
determineTaskManagerBindAddressByConnectingToResourceManag
                HostBindPolicy bindPolicy = 
HostBindPolicy.fromString(configuration.getString(TaskManagerOptions.HOST_BIND_POLICY));
                return bindPolicy == HostBindPolicy.IP ? 
taskManagerAddress.getHostAddress() : taskManagerAddress.getHostName();
        }
+
+       @VisibleForTesting
+       static ResourceID getTaskManagerResourceID(Configuration config) throws 
Exception {
+               final String resourceID = 
config.get(TaskManagerOptions.TASK_MANAGER_RESOURCE_ID);
+               return resourceID != null ?
+                       new ResourceID(resourceID) : new 
ResourceID(InetAddress.getLocalHost().getHostName() + "-" + new 
AbstractID().toString().substring(0, 6));

Review comment:
       I'm wondering whether the address the `RpcService` binds to would be 
more meaningful. Moreover, we could also add the port of the `RpcService`. But 
as a safety net we should still add a random suffix to filter out if a TM gets 
restarted on the same machine with the same port.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java
##########
@@ -86,6 +89,26 @@ public void 
testShouldShutdownIfRegistrationWithJobManagerFails() throws Excepti
                assertThat(statusCode, 
is(equalTo(TaskManagerRunner.RUNTIME_FAILURE_RETURN_CODE)));
        }
 
+       @Test
+       public void testGenerateTaskManagerResourceIDWithPrefixSpecified() 
throws Exception {

Review comment:
       Method name seems out of date.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
##########
@@ -293,30 +294,32 @@ public static void main(String[] args) throws Exception {
                        LOG.info("Cannot determine the maximum number of open 
file descriptors");
                }
 
-               runTaskManagerSecurely(args, ResourceID.generate());
+               runTaskManagerSecurely(args, null);
        }
 
        public static Configuration loadConfiguration(String[] args) throws 
FlinkParseException {
                return ConfigurationParserUtils.loadCommonConfiguration(args, 
TaskManagerRunner.class.getSimpleName());
        }
 
-       public static void runTaskManager(Configuration configuration, 
ResourceID resourceId, PluginManager pluginManager) throws Exception {
-               final TaskManagerRunner taskManagerRunner = new 
TaskManagerRunner(configuration, resourceId, pluginManager);
+       public static void runTaskManager(Configuration configuration, 
PluginManager pluginManager) throws Exception {
+               final TaskManagerRunner taskManagerRunner = new 
TaskManagerRunner(configuration, getTaskManagerResourceID(configuration), 
pluginManager);
 
                taskManagerRunner.start();
        }
 
-       public static void runTaskManagerSecurely(String[] args, ResourceID 
resourceID) {
+       public static void runTaskManagerSecurely(String[] args, String 
resourceID) {
                try {
                        final Configuration configuration = 
loadConfiguration(args);
-
+                       if (resourceID != null) {
+                               
configuration.set(TaskManagerOptions.TASK_MANAGER_RESOURCE_ID, resourceID);
+                       }

Review comment:
       I think it would be good if setting the `TASK_MANAGER_RESOURCE_ID` 
happens on the `ResourceManager` side.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java
##########
@@ -86,6 +89,26 @@ public void 
testShouldShutdownIfRegistrationWithJobManagerFails() throws Excepti
                assertThat(statusCode, 
is(equalTo(TaskManagerRunner.RUNTIME_FAILURE_RETURN_CODE)));
        }
 
+       @Test
+       public void testGenerateTaskManagerResourceIDWithPrefixSpecified() 
throws Exception {
+               final Configuration configuration = createConfiguration();
+               final String resourceID = "test";
+               configuration.set(TaskManagerOptions.TASK_MANAGER_RESOURCE_ID, 
resourceID);
+               final ResourceID taskManagerResourceID = 
TaskManagerRunner.getTaskManagerResourceID(configuration);
+
+               assertThat(taskManagerResourceID.toString(), 
equalTo(resourceID));
+       }
+
+       @Test
+       public void testGenerateTaskManagerResourceIDWithoutPrefixSpecified() 
throws Exception {

Review comment:
       Same here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to