This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new d251ce5 [FLINK-19677][runtime] Make JobManager lazily resolve
hostname of TaskManager and provide an option to turn off reverse resolution
entirely
d251ce5 is described below
commit d251ce5ab43d89d3d09a3f09ecf9a33435e5d901
Author: Weike DONG <[email protected]>
AuthorDate: Sun Oct 18 21:40:04 2020 +0800
[FLINK-19677][runtime] Make JobManager lazily resolve hostname of
TaskManager and provide an option to turn off reverse resolution entirely
[FLINK-19677][runtime][tests] Added test cases
More test case and formatting
Minor changes after review
Re-generated docs
Improvements after review
Remove spaces
This closes #13774.
---
.../generated/all_jobmanager_section.html | 6 +
.../generated/job_manager_configuration.html | 6 +
.../flink/configuration/JobManagerOptions.java | 13 ++
.../apache/flink/runtime/jobmaster/JobMaster.java | 16 +-
.../runtime/taskmanager/TaskManagerLocation.java | 214 ++++++++++++++++-----
.../taskmanager/TaskManagerLocationTest.java | 17 ++
6 files changed, 226 insertions(+), 46 deletions(-)
diff --git a/docs/_includes/generated/all_jobmanager_section.html
b/docs/_includes/generated/all_jobmanager_section.html
index e280984..3cf1e61 100644
--- a/docs/_includes/generated/all_jobmanager_section.html
+++ b/docs/_includes/generated/all_jobmanager_section.html
@@ -27,6 +27,12 @@
<td>This option specifies how the job computation recovers from
task failures. Accepted values are:<ul><li>'full': Restarts all tasks to
recover the job.</li><li>'region': Restarts all tasks that could be affected by
the task failure. More details can be found <a
href="../dev/task_failure_recovery.html#restart-pipelined-region-failover-strategy">here</a>.</li></ul></td>
</tr>
<tr>
+ <td><h5>jobmanager.retrieve-taskmanager-hostname</h5></td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>Flag indicating whether JobManager would retrieve canonical
host name of TaskManager during registration. If the option is set to "false",
TaskManager registration with JobManager could be faster, since no reverse DNS
lookup is performed. However, local input split assignment (such as for HDFS
files) may be impacted.</td>
+ </tr>
+ <tr>
<td><h5>jobmanager.rpc.address</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
diff --git a/docs/_includes/generated/job_manager_configuration.html
b/docs/_includes/generated/job_manager_configuration.html
index 988bcc6..793616b 100644
--- a/docs/_includes/generated/job_manager_configuration.html
+++ b/docs/_includes/generated/job_manager_configuration.html
@@ -87,6 +87,12 @@
<td>Total Process Memory size for the JobManager. This includes
all the memory that a JobManager JVM process consumes, consisting of Total
Flink Memory, JVM Metaspace, and JVM Overhead. In containerized setups, this
should be set to the container memory. See also 'jobmanager.memory.flink.size'
for Total Flink Memory size configuration.</td>
</tr>
<tr>
+ <td><h5>jobmanager.retrieve-taskmanager-hostname</h5></td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>Flag indicating whether JobManager would retrieve canonical
host name of TaskManager during registration. If the option is set to "false",
TaskManager registration with JobManager could be faster, since no reverse DNS
lookup is performed. However, local input split assignment (such as for HDFS
files) may be impacted.</td>
+ </tr>
+ <tr>
<td><h5>jobmanager.rpc.address</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index aa3dc9a..3b774d2 100644
---
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -309,6 +309,19 @@ public class JobManagerOptions {
.withDescription("The max number of completed jobs that
can be kept in the job store.");
/**
+ * Flag indicating whether JobManager would retrieve canonical host
name of TaskManager during registration.
+ */
+ @Documentation.Section(Documentation.Sections.ALL_JOB_MANAGER)
+ public static final ConfigOption<Boolean>
RETRIEVE_TASK_MANAGER_HOSTNAME =
+ key("jobmanager.retrieve-taskmanager-hostname")
+ .defaultValue(true)
+ .withDescription("Flag indicating whether JobManager
would retrieve canonical "
+ + "host name of
TaskManager during registration. "
+ + "If the option is set
to \"false\", TaskManager registration with "
+ + "JobManager could be
faster, since no reverse DNS lookup is performed. "
+ + "However, local input
split assignment (such as for HDFS files) may be impacted.");
+
+ /**
* The timeout in milliseconds for requesting a slot from Slot Pool.
*/
@Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index d5087cb..12d65fc 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.blob.BlobWriter;
@@ -89,6 +90,7 @@ import
org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation.ResolutionMode;
import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
@@ -165,6 +167,8 @@ public class JobMaster extends
FencedRpcEndpoint<JobMasterId> implements JobMast
private final SchedulerNGFactory schedulerNGFactory;
+ private final boolean retrieveTaskManagerHostName;
+
// --------- BackPressure --------
private final BackPressureStatsTracker backPressureStatsTracker;
@@ -241,6 +245,8 @@ public class JobMaster extends
FencedRpcEndpoint<JobMasterId> implements JobMast
this.schedulerNGFactory = checkNotNull(schedulerNGFactory);
this.heartbeatServices = checkNotNull(heartbeatServices);
this.jobMetricGroupFactory =
checkNotNull(jobMetricGroupFactory);
+ this.retrieveTaskManagerHostName =
jobMasterConfiguration.getConfiguration()
+
.getBoolean(JobManagerOptions.RETRIEVE_TASK_MANAGER_HOSTNAME);
final String jobName = jobGraph.getName();
final JobID jid = jobGraph.getJobID();
@@ -579,7 +585,15 @@ public class JobMaster extends
FencedRpcEndpoint<JobMasterId> implements JobMast
final TaskManagerLocation taskManagerLocation;
try {
- taskManagerLocation =
TaskManagerLocation.fromUnresolvedLocation(unresolvedTaskManagerLocation);
+ if (retrieveTaskManagerHostName) {
+ taskManagerLocation =
TaskManagerLocation.fromUnresolvedLocation(
+ unresolvedTaskManagerLocation,
+
ResolutionMode.RETRIEVE_HOST_NAME);
+ } else {
+ taskManagerLocation =
TaskManagerLocation.fromUnresolvedLocation(
+ unresolvedTaskManagerLocation,
+ ResolutionMode.USE_IP_ONLY);
+ }
} catch (Throwable throwable) {
final String errMsg = String.format(
"Could not accept TaskManager registration.
TaskManager address %s cannot be resolved. %s",
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
index fd95712..9d7fb3a 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
@@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
+import java.io.Serializable;
import java.net.InetAddress;
import java.net.UnknownHostException;
@@ -51,54 +52,81 @@ public class TaskManagerLocation implements
Comparable<TaskManagerLocation>, jav
* the YARN container ID, Mesos container ID, or any other unique
identifier. */
private final ResourceID resourceID;
- /** The network address that the TaskManager binds its sockets to */
+ /** The network address that the TaskManager binds its sockets to. */
private final InetAddress inetAddress;
- /** The fully qualified host name of the TaskManager */
- private final String fqdnHostName;
+ /** The supplier for fully qualified host name and pure hostname. */
+ private final HostNameSupplier hostNameSupplier;
- /** The pure hostname, derived from the fully qualified host name. */
- private final String hostName;
-
- /** The port that the TaskManager receive data transport connection
requests at */
+ /** The port that the TaskManager receive data transport connection
requests at. */
private final int dataPort;
- /** The toString representation, eagerly constructed and cached to
avoid repeated string building */
- private final String stringRepresentation;
+ /** The toString representation, eagerly constructed and cached to
avoid repeated string building. */
+ private String stringRepresentation;
/**
* Constructs a new instance connection info object. The constructor
will attempt to retrieve the instance's
* host name and domain name through the operating system's lookup
mechanisms.
- *
+ *
* @param inetAddress
* the network address the instance's task manager binds its
sockets to
* @param dataPort
* the port instance's task manager expects to receive transfer
envelopes on
+ * @param hostNameSupplier
+ * the supplier for obtaining fully-qualified domain name and
pure hostname of the task manager
*/
@VisibleForTesting
- public TaskManagerLocation(ResourceID resourceID, InetAddress
inetAddress, int dataPort) {
+ public TaskManagerLocation(
+ ResourceID resourceID,
+ InetAddress inetAddress,
+ int dataPort,
+ HostNameSupplier hostNameSupplier) {
// -1 indicates a local instance connection info
checkArgument(dataPort > 0 || dataPort == -1, "dataPort must be
> 0, or -1 (local)");
this.resourceID = checkNotNull(resourceID);
this.inetAddress = checkNotNull(inetAddress);
this.dataPort = dataPort;
+ this.hostNameSupplier = checkNotNull(hostNameSupplier);
+ }
- // get FQDN hostname on this TaskManager.
- this.fqdnHostName = getFqdnHostName(inetAddress);
-
- this.hostName = getHostName(inetAddress);
-
- this.stringRepresentation = String.format(
- "%s @ %s (dataPort=%d)", resourceID,
fqdnHostName, dataPort);
+ /**
+ * Constructs a new instance connection info object. The constructor
will attempt to retrieve the instance's
+ * host name and domain name through the operating system's lookup
mechanisms.
+ *
+ * @param inetAddress
+ * the network address the instance's task manager binds its
sockets to
+ * @param dataPort
+ * the port instance's task manager expects to receive transfer
envelopes on
+ */
+ @VisibleForTesting
+ public TaskManagerLocation(ResourceID resourceID, InetAddress
inetAddress, int dataPort) {
+ this(resourceID, inetAddress, dataPort, new
DefaultHostNameSupplier(inetAddress));
}
- public static TaskManagerLocation fromUnresolvedLocation(final
UnresolvedTaskManagerLocation unresolvedLocation)
+ public static TaskManagerLocation fromUnresolvedLocation(
+ final UnresolvedTaskManagerLocation unresolvedLocation,
+ final ResolutionMode resolutionMode)
throws UnknownHostException {
- return new TaskManagerLocation(
- unresolvedLocation.getResourceID(),
-
InetAddress.getByName(unresolvedLocation.getExternalAddress()),
- unresolvedLocation.getDataPort());
+
+ InetAddress inetAddress =
InetAddress.getByName(unresolvedLocation.getExternalAddress());
+
+ switch (resolutionMode) {
+ case RETRIEVE_HOST_NAME:
+ return new TaskManagerLocation(
+ unresolvedLocation.getResourceID(),
+ inetAddress,
+ unresolvedLocation.getDataPort(),
+ new
DefaultHostNameSupplier(inetAddress));
+ case USE_IP_ONLY:
+ return new TaskManagerLocation(
+ unresolvedLocation.getResourceID(),
+ inetAddress,
+ unresolvedLocation.getDataPort(),
+ new
IpOnlyHostNameSupplier(inetAddress));
+ default:
+ throw new
UnsupportedOperationException("Unsupported resolution mode provided.");
+ }
}
//
------------------------------------------------------------------------
@@ -114,7 +142,7 @@ public class TaskManagerLocation implements
Comparable<TaskManagerLocation>, jav
* <li>If the TaskManager is started in standalone mode, or via a
MiniCluster, this is a random ID.</li>
* <li>Other deployment modes can set the resource ID in other
ways.</li>
* </ul>
- *
+ *
* @return The ID of the resource in which the TaskManager is started
*/
public ResourceID getResourceID() {
@@ -123,7 +151,7 @@ public class TaskManagerLocation implements
Comparable<TaskManagerLocation>, jav
/**
* Returns the port instance's task manager expects to receive transfer
envelopes on.
- *
+ *
* @return the port instance's task manager expects to receive transfer
envelopes on
*/
public int dataPort() {
@@ -132,7 +160,7 @@ public class TaskManagerLocation implements
Comparable<TaskManagerLocation>, jav
/**
* Returns the network address the instance's task manager binds its
sockets to.
- *
+ *
* @return the network address the instance's task manager binds its
sockets to
*/
public InetAddress address() {
@@ -149,32 +177,27 @@ public class TaskManagerLocation implements
Comparable<TaskManagerLocation>, jav
}
/**
- * Returns the fully-qualified domain name the TaskManager. If the name
could not be
- * determined, the return value will be a textual representation of the
TaskManager's IP address.
- *
+ * Returns the fully-qualified domain name of the TaskManager provided
by {@link #hostNameSupplier}.
+ *
* @return The fully-qualified domain name of the TaskManager.
*/
public String getFQDNHostname() {
- return fqdnHostName;
+ return hostNameSupplier.getFqdnHostName();
}
/**
- * Gets the hostname of the TaskManager. The hostname derives from the
fully qualified
- * domain name (FQDN, see {@link #getFQDNHostname()}):
- * <ul>
- * <li>If the FQDN is the textual IP address, then the hostname is
also the IP address</li>
- * <li>If the FQDN has only one segment (such as "localhost", or
"host17"), then this is
- * used as the hostname.</li>
- * <li>If the FQDN has multiple segments (such as
"worker3.subgroup.company.net"), then the first
- * segment (here "worker3") will be used as the hostname.</li>
- * </ul>
+ * Gets the hostname of the TaskManager from {@link #hostNameSupplier}.
*
* @return The hostname of the TaskManager.
*/
public String getHostname() {
- return hostName;
+ return hostNameSupplier.getHostName();
}
+ //
--------------------------------------------------------------------------------------------
+ // Utilities
+ //
--------------------------------------------------------------------------------------------
+
/**
* Gets the fully qualified hostname of the TaskManager based on the
network address.
*
@@ -219,12 +242,12 @@ public class TaskManagerLocation implements
Comparable<TaskManagerLocation>, jav
return hostName;
}
- //
--------------------------------------------------------------------------------------------
- // Utilities
- //
--------------------------------------------------------------------------------------------
-
@Override
public String toString() {
+ if (stringRepresentation == null) {
+ this.stringRepresentation = String.format(
+ "%s @ %s (dataPort=%d)", resourceID,
getFQDNHostname(), dataPort);
+ }
return stringRepresentation;
}
@@ -246,7 +269,7 @@ public class TaskManagerLocation implements
Comparable<TaskManagerLocation>, jav
@Override
public int hashCode() {
- return resourceID.hashCode() +
+ return resourceID.hashCode() +
17 * inetAddress.hashCode() +
129 * dataPort;
}
@@ -288,4 +311,105 @@ public class TaskManagerLocation implements
Comparable<TaskManagerLocation>, jav
return 0;
}
}
+
+ //
--------------------------------------------------------------------------------------------
+ // Hostname Resolution Suppliers
+ //
--------------------------------------------------------------------------------------------
+
+ public interface HostNameSupplier extends Serializable {
+ String getHostName();
+
+ String getFqdnHostName();
+ }
+
+ /**
+ * This Supplier class could retrieve the FQDN host name of the given
InetAddress on demand,
+ * extract the pure host name and cache the results for later use.
+ */
+ @VisibleForTesting
+ public static class DefaultHostNameSupplier implements HostNameSupplier
{
+ private final InetAddress inetAddress;
+ private String hostName;
+ private String fqdnHostName;
+
+ public DefaultHostNameSupplier(InetAddress inetAddress) {
+ this.inetAddress = inetAddress;
+ }
+
+ /**
+ * Gets the hostname of the TaskManager. The hostname derives
from the fully qualified
+ * domain name (FQDN, see {@link #getFQDNHostname()}):
+ * <ul>
+ * <li>If the FQDN is the textual IP address, then the
hostname is also the IP address</li>
+ * <li>If the FQDN has only one segment (such as
"localhost", or "host17"), then this is
+ * used as the hostname.</li>
+ * <li>If the FQDN has multiple segments (such as
"worker3.subgroup.company.net"), then the first
+ * segment (here "worker3") will be used as the
hostname.</li>
+ * </ul>
+ *
+ * @return The hostname of the TaskManager.
+ */
+ @Override
+ public String getHostName() {
+ if (hostName == null) {
+ hostName =
TaskManagerLocation.getHostName(inetAddress);
+ }
+ return hostName;
+ }
+
+ /**
+ * Returns the fully-qualified domain name the TaskManager. If
the name could not be
+ * determined, the return value will be a textual
representation of the TaskManager's IP address.
+ *
+ * @return The fully-qualified domain name of the TaskManager.
+ */
+ @Override
+ public String getFqdnHostName() {
+ if (fqdnHostName == null) {
+ fqdnHostName =
TaskManagerLocation.getFqdnHostName(inetAddress);
+ }
+ return fqdnHostName;
+ }
+ }
+
+ /**
+ * This Supplier class returns the IP address of the given InetAddress
directly,
+ * therefore no reverse DNS lookup is required.
+ */
+ @VisibleForTesting
+ public static class IpOnlyHostNameSupplier implements HostNameSupplier {
+ private final InetAddress inetAddress;
+
+ public IpOnlyHostNameSupplier(InetAddress inetAddress) {
+ this.inetAddress = inetAddress;
+ }
+
+ /**
+ * Returns the textual representation of the TaskManager's IP
address as host name.
+ *
+ * @return The textual representation of the TaskManager's IP
address.
+ */
+ @Override
+ public String getHostName() {
+ return inetAddress.getHostAddress();
+ }
+
+ /**
+ * Returns the textual representation of the TaskManager's IP
address as FQDN host name.
+ *
+ * @return The textual representation of the TaskManager's IP
address.
+ */
+ @Override
+ public String getFqdnHostName() {
+ return inetAddress.getHostAddress();
+ }
+ }
+
+ /**
+ * The DNS resolution mode for TaskManager's IP address.
+ */
+ public enum ResolutionMode {
+ RETRIEVE_HOST_NAME,
+ USE_IP_ONLY
+ }
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerLocationTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerLocationTest.java
index 9452b20..62ff960 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerLocationTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerLocationTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.taskmanager;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -198,4 +199,20 @@ public class TaskManagerLocationTest {
fail(e.getMessage());
}
}
+
+ @Test
+ public void testNotRetrieveHostName() {
+ InetAddress address = mock(InetAddress.class);
+ when(address.getCanonicalHostName()).thenReturn("worker10");
+ when(address.getHostName()).thenReturn("worker10");
+ when(address.getHostAddress()).thenReturn("127.0.0.1");
+
+ TaskManagerLocation info = new
TaskManagerLocation(ResourceID.generate(), address, 19871,
+ new
TaskManagerLocation.IpOnlyHostNameSupplier(address));
+
+ assertNotEquals("worker10", info.getHostname());
+ assertNotEquals("worker10", info.getFQDNHostname());
+ assertEquals("127.0.0.1", info.getHostname());
+ assertEquals("127.0.0.1", info.getFQDNHostname());
+ }
}