This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new d251ce5  [FLINK-19677][runtime] Make JobManager lazily resolve 
hostname of TaskManager and provide an option to turn off reverse resolution 
entirely
d251ce5 is described below

commit d251ce5ab43d89d3d09a3f09ecf9a33435e5d901
Author: Weike DONG <[email protected]>
AuthorDate: Sun Oct 18 21:40:04 2020 +0800

    [FLINK-19677][runtime] Make JobManager lazily resolve hostname of 
TaskManager and provide an option to turn off reverse resolution entirely
    
    [FLINK-19677][runtime][tests] Added test cases
    
    More test case and formatting
    
    Minor changes after review
    
    Re-generated docs
    
    Improvements after review
    
    Remove spaces
    
    This closes #13774.
---
 .../generated/all_jobmanager_section.html          |   6 +
 .../generated/job_manager_configuration.html       |   6 +
 .../flink/configuration/JobManagerOptions.java     |  13 ++
 .../apache/flink/runtime/jobmaster/JobMaster.java  |  16 +-
 .../runtime/taskmanager/TaskManagerLocation.java   | 214 ++++++++++++++++-----
 .../taskmanager/TaskManagerLocationTest.java       |  17 ++
 6 files changed, 226 insertions(+), 46 deletions(-)

diff --git a/docs/_includes/generated/all_jobmanager_section.html 
b/docs/_includes/generated/all_jobmanager_section.html
index e280984..3cf1e61 100644
--- a/docs/_includes/generated/all_jobmanager_section.html
+++ b/docs/_includes/generated/all_jobmanager_section.html
@@ -27,6 +27,12 @@
             <td>This option specifies how the job computation recovers from 
task failures. Accepted values are:<ul><li>'full': Restarts all tasks to 
recover the job.</li><li>'region': Restarts all tasks that could be affected by 
the task failure. More details can be found <a 
href="../dev/task_failure_recovery.html#restart-pipelined-region-failover-strategy">here</a>.</li></ul></td>
         </tr>
         <tr>
+            <td><h5>jobmanager.retrieve-taskmanager-hostname</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>Flag indicating whether JobManager would retrieve canonical 
host name of TaskManager during registration. If the option is set to "false", 
TaskManager registration with JobManager could be faster, since no reverse DNS 
lookup is performed. However, local input split assignment (such as for HDFS 
files) may be impacted.</td>
+        </tr>
+        <tr>
             <td><h5>jobmanager.rpc.address</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>String</td>
diff --git a/docs/_includes/generated/job_manager_configuration.html 
b/docs/_includes/generated/job_manager_configuration.html
index 988bcc6..793616b 100644
--- a/docs/_includes/generated/job_manager_configuration.html
+++ b/docs/_includes/generated/job_manager_configuration.html
@@ -87,6 +87,12 @@
             <td>Total Process Memory size for the JobManager. This includes 
all the memory that a JobManager JVM process consumes, consisting of Total 
Flink Memory, JVM Metaspace, and JVM Overhead. In containerized setups, this 
should be set to the container memory. See also 'jobmanager.memory.flink.size' 
for Total Flink Memory size configuration.</td>
         </tr>
         <tr>
+            <td><h5>jobmanager.retrieve-taskmanager-hostname</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>Flag indicating whether JobManager would retrieve canonical 
host name of TaskManager during registration. If the option is set to "false", 
TaskManager registration with JobManager could be faster, since no reverse DNS 
lookup is performed. However, local input split assignment (such as for HDFS 
files) may be impacted.</td>
+        </tr>
+        <tr>
             <td><h5>jobmanager.rpc.address</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>String</td>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index aa3dc9a..3b774d2 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -309,6 +309,19 @@ public class JobManagerOptions {
                        .withDescription("The max number of completed jobs that 
can be kept in the job store.");
 
        /**
+        * Flag indicating whether JobManager would retrieve canonical host 
name of TaskManager during registration.
+        */
+       @Documentation.Section(Documentation.Sections.ALL_JOB_MANAGER)
+       public static final ConfigOption<Boolean> 
RETRIEVE_TASK_MANAGER_HOSTNAME =
+               key("jobmanager.retrieve-taskmanager-hostname")
+                       .defaultValue(true)
+                       .withDescription("Flag indicating whether JobManager 
would retrieve canonical "
+                                                       + "host name of 
TaskManager during registration. "
+                                                       + "If the option is set 
to \"false\", TaskManager registration with "
+                                                       + "JobManager could be 
faster, since no reverse DNS lookup is performed. "
+                                                       + "However, local input 
split assignment (such as for HDFS files) may be impacted.");
+
+       /**
         * The timeout in milliseconds for requesting a slot from Slot Pool.
         */
        @Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING)
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index d5087cb..12d65fc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.blob.BlobWriter;
@@ -89,6 +90,7 @@ import 
org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation.ResolutionMode;
 import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
@@ -165,6 +167,8 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
 
        private final SchedulerNGFactory schedulerNGFactory;
 
+       private final boolean retrieveTaskManagerHostName;
+
        // --------- BackPressure --------
 
        private final BackPressureStatsTracker backPressureStatsTracker;
@@ -241,6 +245,8 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                this.schedulerNGFactory = checkNotNull(schedulerNGFactory);
                this.heartbeatServices = checkNotNull(heartbeatServices);
                this.jobMetricGroupFactory = 
checkNotNull(jobMetricGroupFactory);
+               this.retrieveTaskManagerHostName = 
jobMasterConfiguration.getConfiguration()
+                               
.getBoolean(JobManagerOptions.RETRIEVE_TASK_MANAGER_HOSTNAME);
 
                final String jobName = jobGraph.getName();
                final JobID jid = jobGraph.getJobID();
@@ -579,7 +585,15 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
 
                final TaskManagerLocation taskManagerLocation;
                try {
-                       taskManagerLocation = 
TaskManagerLocation.fromUnresolvedLocation(unresolvedTaskManagerLocation);
+                       if (retrieveTaskManagerHostName) {
+                               taskManagerLocation = 
TaskManagerLocation.fromUnresolvedLocation(
+                                               unresolvedTaskManagerLocation,
+                                               
ResolutionMode.RETRIEVE_HOST_NAME);
+                       } else {
+                               taskManagerLocation = 
TaskManagerLocation.fromUnresolvedLocation(
+                                               unresolvedTaskManagerLocation,
+                                               ResolutionMode.USE_IP_ONLY);
+                       }
                } catch (Throwable throwable) {
                        final String errMsg = String.format(
                                "Could not accept TaskManager registration. 
TaskManager address %s cannot be resolved. %s",
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
index fd95712..9d7fb3a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
@@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
 
+import java.io.Serializable;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 
@@ -51,54 +52,81 @@ public class TaskManagerLocation implements 
Comparable<TaskManagerLocation>, jav
         * the YARN container ID, Mesos container ID, or any other unique 
identifier. */
        private final ResourceID resourceID;
 
-       /** The network address that the TaskManager binds its sockets to */
+       /** The network address that the TaskManager binds its sockets to. */
        private final InetAddress inetAddress;
 
-       /** The fully qualified host name of the TaskManager */
-       private final String fqdnHostName;
+       /** The supplier for fully qualified host name and pure hostname. */
+       private final HostNameSupplier hostNameSupplier;
 
-       /** The pure hostname, derived from the fully qualified host name. */
-       private final String hostName;
-       
-       /** The port that the TaskManager receive data transport connection 
requests at */
+       /** The port that the TaskManager receive data transport connection 
requests at. */
        private final int dataPort;
 
-       /** The toString representation, eagerly constructed and cached to 
avoid repeated string building */  
-       private final String stringRepresentation;
+       /** The toString representation, eagerly constructed and cached to 
avoid repeated string building. */
+       private String stringRepresentation;
 
        /**
         * Constructs a new instance connection info object. The constructor 
will attempt to retrieve the instance's
         * host name and domain name through the operating system's lookup 
mechanisms.
-        * 
+        *
         * @param inetAddress
         *        the network address the instance's task manager binds its 
sockets to
         * @param dataPort
         *        the port instance's task manager expects to receive transfer 
envelopes on
+        * @param hostNameSupplier
+        *        the supplier for obtaining fully-qualified domain name and 
pure hostname of the task manager
         */
        @VisibleForTesting
-       public TaskManagerLocation(ResourceID resourceID, InetAddress 
inetAddress, int dataPort) {
+       public TaskManagerLocation(
+                       ResourceID resourceID,
+                       InetAddress inetAddress,
+                       int dataPort,
+                       HostNameSupplier hostNameSupplier) {
                // -1 indicates a local instance connection info
                checkArgument(dataPort > 0 || dataPort == -1, "dataPort must be 
> 0, or -1 (local)");
 
                this.resourceID = checkNotNull(resourceID);
                this.inetAddress = checkNotNull(inetAddress);
                this.dataPort = dataPort;
+               this.hostNameSupplier = checkNotNull(hostNameSupplier);
+       }
 
-               // get FQDN hostname on this TaskManager.
-               this.fqdnHostName = getFqdnHostName(inetAddress);
-
-               this.hostName = getHostName(inetAddress);
-
-               this.stringRepresentation = String.format(
-                               "%s @ %s (dataPort=%d)", resourceID, 
fqdnHostName, dataPort);
+       /**
+        * Constructs a new instance connection info object. The constructor 
will attempt to retrieve the instance's
+        * host name and domain name through the operating system's lookup 
mechanisms.
+        *
+        * @param inetAddress
+        *        the network address the instance's task manager binds its 
sockets to
+        * @param dataPort
+        *        the port instance's task manager expects to receive transfer 
envelopes on
+        */
+       @VisibleForTesting
+       public TaskManagerLocation(ResourceID resourceID, InetAddress 
inetAddress, int dataPort) {
+               this(resourceID, inetAddress, dataPort, new 
DefaultHostNameSupplier(inetAddress));
        }
 
-       public static TaskManagerLocation fromUnresolvedLocation(final 
UnresolvedTaskManagerLocation unresolvedLocation)
+       public static TaskManagerLocation fromUnresolvedLocation(
+                       final UnresolvedTaskManagerLocation unresolvedLocation,
+                       final ResolutionMode resolutionMode)
                throws UnknownHostException {
-               return new TaskManagerLocation(
-                       unresolvedLocation.getResourceID(),
-                       
InetAddress.getByName(unresolvedLocation.getExternalAddress()),
-                       unresolvedLocation.getDataPort());
+
+               InetAddress inetAddress = 
InetAddress.getByName(unresolvedLocation.getExternalAddress());
+
+               switch (resolutionMode) {
+                       case RETRIEVE_HOST_NAME:
+                               return new TaskManagerLocation(
+                                       unresolvedLocation.getResourceID(),
+                                       inetAddress,
+                                       unresolvedLocation.getDataPort(),
+                                       new 
DefaultHostNameSupplier(inetAddress));
+                       case USE_IP_ONLY:
+                               return new TaskManagerLocation(
+                                       unresolvedLocation.getResourceID(),
+                                       inetAddress,
+                                       unresolvedLocation.getDataPort(),
+                                       new 
IpOnlyHostNameSupplier(inetAddress));
+                       default:
+                               throw new 
UnsupportedOperationException("Unsupported resolution mode provided.");
+               }
        }
 
        // 
------------------------------------------------------------------------
@@ -114,7 +142,7 @@ public class TaskManagerLocation implements 
Comparable<TaskManagerLocation>, jav
         *     <li>If the TaskManager is started in standalone mode, or via a 
MiniCluster, this is a random ID.</li>
         *     <li>Other deployment modes can set the resource ID in other 
ways.</li>
         * </ul>
-        * 
+        *
         * @return The ID of the resource in which the TaskManager is started
         */
        public ResourceID getResourceID() {
@@ -123,7 +151,7 @@ public class TaskManagerLocation implements 
Comparable<TaskManagerLocation>, jav
 
        /**
         * Returns the port instance's task manager expects to receive transfer 
envelopes on.
-        * 
+        *
         * @return the port instance's task manager expects to receive transfer 
envelopes on
         */
        public int dataPort() {
@@ -132,7 +160,7 @@ public class TaskManagerLocation implements 
Comparable<TaskManagerLocation>, jav
 
        /**
         * Returns the network address the instance's task manager binds its 
sockets to.
-        * 
+        *
         * @return the network address the instance's task manager binds its 
sockets to
         */
        public InetAddress address() {
@@ -149,32 +177,27 @@ public class TaskManagerLocation implements 
Comparable<TaskManagerLocation>, jav
        }
 
        /**
-        * Returns the fully-qualified domain name the TaskManager. If the name 
could not be
-        * determined, the return value will be a textual representation of the 
TaskManager's IP address.
-        * 
+        * Returns the fully-qualified domain name of the TaskManager provided 
by {@link #hostNameSupplier}.
+        *
         * @return The fully-qualified domain name of the TaskManager.
         */
        public String getFQDNHostname() {
-               return fqdnHostName;
+               return hostNameSupplier.getFqdnHostName();
        }
 
        /**
-        * Gets the hostname of the TaskManager. The hostname derives from the 
fully qualified
-        * domain name (FQDN, see {@link #getFQDNHostname()}):
-        * <ul>
-        *     <li>If the FQDN is the textual IP address, then the hostname is 
also the IP address</li>
-        *     <li>If the FQDN has only one segment (such as "localhost", or 
"host17"), then this is
-        *         used as the hostname.</li>
-        *     <li>If the FQDN has multiple segments (such as 
"worker3.subgroup.company.net"), then the first
-        *         segment (here "worker3") will be used as the hostname.</li>
-        * </ul>
+        * Gets the hostname of the TaskManager from {@link #hostNameSupplier}.
         *
         * @return The hostname of the TaskManager.
         */
        public String getHostname() {
-               return hostName;
+               return hostNameSupplier.getHostName();
        }
 
+       // 
--------------------------------------------------------------------------------------------
+       // Utilities
+       // 
--------------------------------------------------------------------------------------------
+
        /**
         * Gets the fully qualified hostname of the TaskManager based on the 
network address.
         *
@@ -219,12 +242,12 @@ public class TaskManagerLocation implements 
Comparable<TaskManagerLocation>, jav
                return hostName;
        }
 
-       // 
--------------------------------------------------------------------------------------------
-       // Utilities
-       // 
--------------------------------------------------------------------------------------------
-
        @Override
        public String toString() {
+               if (stringRepresentation == null) {
+                       this.stringRepresentation = String.format(
+                                       "%s @ %s (dataPort=%d)", resourceID, 
getFQDNHostname(), dataPort);
+               }
                return stringRepresentation;
        }
 
@@ -246,7 +269,7 @@ public class TaskManagerLocation implements 
Comparable<TaskManagerLocation>, jav
 
        @Override
        public int hashCode() {
-               return resourceID.hashCode() + 
+               return resourceID.hashCode() +
                                17 * inetAddress.hashCode() +
                                129 * dataPort;
        }
@@ -288,4 +311,105 @@ public class TaskManagerLocation implements 
Comparable<TaskManagerLocation>, jav
                        return 0;
                }
        }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Hostname Resolution Suppliers
+       // 
--------------------------------------------------------------------------------------------
+
+       public interface HostNameSupplier extends Serializable {
+               String getHostName();
+
+               String getFqdnHostName();
+       }
+
+       /**
+        * This Supplier class could retrieve the FQDN host name of the given 
InetAddress on demand,
+        * extract the pure host name and cache the results for later use.
+        */
+       @VisibleForTesting
+       public static class DefaultHostNameSupplier implements HostNameSupplier 
{
+               private final InetAddress inetAddress;
+               private String hostName;
+               private String fqdnHostName;
+
+               public DefaultHostNameSupplier(InetAddress inetAddress) {
+                       this.inetAddress = inetAddress;
+               }
+
+               /**
+                * Gets the hostname of the TaskManager. The hostname derives 
from the fully qualified
+                * domain name (FQDN, see {@link #getFQDNHostname()}):
+                * <ul>
+                *     <li>If the FQDN is the textual IP address, then the 
hostname is also the IP address</li>
+                *     <li>If the FQDN has only one segment (such as 
"localhost", or "host17"), then this is
+                *         used as the hostname.</li>
+                *     <li>If the FQDN has multiple segments (such as 
"worker3.subgroup.company.net"), then the first
+                *         segment (here "worker3") will be used as the 
hostname.</li>
+                * </ul>
+                *
+                * @return The hostname of the TaskManager.
+                */
+               @Override
+               public String getHostName() {
+                       if (hostName == null) {
+                               hostName = 
TaskManagerLocation.getHostName(inetAddress);
+                       }
+                       return hostName;
+               }
+
+               /**
+                * Returns the fully-qualified domain name the TaskManager. If 
the name could not be
+                * determined, the return value will be a textual 
representation of the TaskManager's IP address.
+                *
+                * @return The fully-qualified domain name of the TaskManager.
+                */
+               @Override
+               public String getFqdnHostName() {
+                       if (fqdnHostName == null) {
+                               fqdnHostName = 
TaskManagerLocation.getFqdnHostName(inetAddress);
+                       }
+                       return fqdnHostName;
+               }
+       }
+
+       /**
+        * This Supplier class returns the IP address of the given InetAddress 
directly,
+        * therefore no reverse DNS lookup is required.
+        */
+       @VisibleForTesting
+       public static class IpOnlyHostNameSupplier implements HostNameSupplier {
+               private final InetAddress inetAddress;
+
+               public IpOnlyHostNameSupplier(InetAddress inetAddress) {
+                       this.inetAddress = inetAddress;
+               }
+
+               /**
+                * Returns the textual representation of the TaskManager's IP 
address as host name.
+                *
+                * @return The textual representation of the TaskManager's IP 
address.
+                */
+               @Override
+               public String getHostName() {
+                       return inetAddress.getHostAddress();
+               }
+
+               /**
+                * Returns the textual representation of the TaskManager's IP 
address as FQDN host name.
+                *
+                * @return The textual representation of the TaskManager's IP 
address.
+                */
+               @Override
+               public String getFqdnHostName() {
+                       return inetAddress.getHostAddress();
+               }
+       }
+
+       /**
+        * The DNS resolution mode for TaskManager's IP address.
+        */
+       public enum ResolutionMode {
+               RETRIEVE_HOST_NAME,
+               USE_IP_ONLY
+       }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerLocationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerLocationTest.java
index 9452b20..62ff960 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerLocationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerLocationTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.taskmanager;
 
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -198,4 +199,20 @@ public class TaskManagerLocationTest {
                        fail(e.getMessage());
                }
        }
+
+       @Test
+       public void testNotRetrieveHostName() {
+               InetAddress address = mock(InetAddress.class);
+               when(address.getCanonicalHostName()).thenReturn("worker10");
+               when(address.getHostName()).thenReturn("worker10");
+               when(address.getHostAddress()).thenReturn("127.0.0.1");
+
+               TaskManagerLocation info = new 
TaskManagerLocation(ResourceID.generate(), address, 19871,
+                               new 
TaskManagerLocation.IpOnlyHostNameSupplier(address));
+
+               assertNotEquals("worker10", info.getHostname());
+               assertNotEquals("worker10", info.getFQDNHostname());
+               assertEquals("127.0.0.1", info.getHostname());
+               assertEquals("127.0.0.1", info.getFQDNHostname());
+       }
 }

Reply via email to