rmetzger commented on a change in pull request #13706:
URL: https://github.com/apache/flink/pull/13706#discussion_r508697760



##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
##########
@@ -308,6 +308,19 @@
                        .defaultValue(Integer.MAX_VALUE)
                        .withDescription("The max number of completed jobs that 
can be kept in the job store.");
 
+       /**
+        * Flag indicating whether JobManager would retrieve canonical host 
name of TaskManager during registration.
+        */
+       @Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING)

Review comment:
       I don't think this config option is related to scheduling.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
##########
@@ -150,28 +166,48 @@ public String addressString() {
 
        /**
         * Returns the fully-qualified domain name the TaskManager. If the name 
could not be
-        * determined, the return value will be a textual representation of the 
TaskManager's IP address.
+        * determined or {@link 
JobManagerOptions.RETRIEVE_TASK_MANAGER_HOSTNAME} is set to false,
+        * the return value will be a textual representation of the 
TaskManager's IP address.
         * 
         * @return The fully-qualified domain name of the TaskManager.
         */
        public String getFQDNHostname() {
+               // Lazily retrieve FQDN host name of this TaskManager
+               if (fqdnHostName == null) {
+                       if (!retrieveHostName) {
+                               fqdnHostName = inetAddress.getHostAddress();
+                       } else {
+                               fqdnHostName = getFqdnHostName(inetAddress);
+                       }
+               }
+
                return fqdnHostName;
        }
 
        /**
-        * Gets the hostname of the TaskManager. The hostname derives from the 
fully qualified
-        * domain name (FQDN, see {@link #getFQDNHostname()}):
+        * Gets the hostname of the TaskManager.
+        * If {@link JobManagerOptions.RETRIEVE_TASK_MANAGER_HOSTNAME} is true,

Review comment:
       again, this class is exposing an implementation detail of `JobMaster`.
   
   (I'm a bit uncertain about this review item, maybe it is also okay. But I 
believe it is dangerous to mention this here as the implementation could change)

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
##########
@@ -606,7 +608,12 @@ private void releaseEmptyTaskManager(ResourceID 
resourceId) {
 
                final TaskManagerLocation taskManagerLocation;
                try {
-                       taskManagerLocation = 
TaskManagerLocation.fromUnresolvedLocation(unresolvedTaskManagerLocation);
+                       final boolean retrieveTaskManagerHostName = 
this.jobMasterConfiguration.getConfiguration()
+                                       
.getBoolean(JobManagerOptions.RETRIEVE_TASK_MANAGER_HOSTNAME);
+                       if (!retrieveTaskManagerHostName) {
+                               log.info("JobManager would not retrieve 
TaskManager's canonical hostname due to configuration.");

Review comment:
       I don't think it is a good idea to log this message every time we 
register a taskmanager.
   
   On startup, we are logging all configuration parameters. I think that is 
enough in this case OR we log on debug in the constructor when looking up the 
config key?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
##########
@@ -150,28 +166,48 @@ public String addressString() {
 
        /**
         * Returns the fully-qualified domain name the TaskManager. If the name 
could not be
-        * determined, the return value will be a textual representation of the 
TaskManager's IP address.
+        * determined or {@link 
JobManagerOptions.RETRIEVE_TASK_MANAGER_HOSTNAME} is set to false,

Review comment:
       Isn't the use of this configuration option an implementation of the 
`JobMaster`, not visible to this class?
   I would only mention the `retrieveHostName` field here.

##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
##########
@@ -308,6 +308,19 @@
                        .defaultValue(Integer.MAX_VALUE)
                        .withDescription("The max number of completed jobs that 
can be kept in the job store.");
 
+       /**
+        * Flag indicating whether JobManager would retrieve canonical host 
name of TaskManager during registration.
+        */
+       @Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING)

Review comment:
       ```suggestion
        @Documentation.Section(Documentation.Sections.ALL_JOB_MANAGER)
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerLocationTest.java
##########
@@ -198,4 +199,24 @@ public void testGetHostname2() {
                        fail(e.getMessage());
                }
        }
+
+       @Test
+       public void testNotRetrieveHostName() {
+               try {
+                       InetAddress address = mock(InetAddress.class);
+                       
when(address.getCanonicalHostName()).thenReturn("worker10");
+                       when(address.getHostName()).thenReturn("worker10");
+                       when(address.getHostAddress()).thenReturn("127.0.0.1");
+
+                       TaskManagerLocation info = new 
TaskManagerLocation(ResourceID.generate(), address, 19871, false);
+                       assertNotEquals("worker10", info.getHostname());
+                       assertNotEquals("worker10", info.getFQDNHostname());
+                       assertEquals("127.0.0.1", info.getHostname());
+                       assertEquals("127.0.0.1", info.getFQDNHostname());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());

Review comment:
       You don't need to manually fail the test. if there's an exception thrown 
out of the test, it'll fail.
   The other code in this file is very old, and uses an outdated pattern 😄 

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
##########
@@ -606,7 +608,12 @@ private void releaseEmptyTaskManager(ResourceID 
resourceId) {
 
                final TaskManagerLocation taskManagerLocation;
                try {
-                       taskManagerLocation = 
TaskManagerLocation.fromUnresolvedLocation(unresolvedTaskManagerLocation);
+                       final boolean retrieveTaskManagerHostName = 
this.jobMasterConfiguration.getConfiguration()
+                                       
.getBoolean(JobManagerOptions.RETRIEVE_TASK_MANAGER_HOSTNAME);

Review comment:
       I was wondering whether it makes sense to move this local variable to a 
field, and do this lookup in the constructor only once?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to