Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2461#discussion_r77327741
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/YarnTaskExecutorFactory.java
---
@@ -0,0 +1,198 @@
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.taskmanager.MemoryLogger;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.util.NetUtils;
+
+import akka.actor.ActorSystem;
+import akka.util.Timeout;
+
+import scala.Some;
+import scala.Tuple2;
+import scala.concurrent.duration.FiniteDuration;
+
+import com.typesafe.config.Config;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An factory for creating {@link TaskExecutor} and starting it in yarn
mode.
+ */
+public class YarnTaskExecutorFactory extends TaskExecutorFactory {
+
+ public YarnTaskExecutorFactory(Configuration configuration, ResourceID
resourceID) {
+ super(configuration, resourceID);
+ }
+
+ @Override
+ public TaskExecutor createAndStartTaskExecutor() throws Exception {
+ return selectNetworkInterfaceAndRunTaskManager(configuration,
resourceID);
+ }
+
+ /**
+ * Starts and runs the TaskManager.
+ * <p/>
+ * This method first tries to select the network interface to use for
the TaskManager
+ * communication. The network interface is used both for the actor
communication
+ * (coordination) as well as for the data exchange between task
managers. Unless
+ * the hostname/interface is explicitly configured in the
configuration, this
+ * method will try out various interfaces and methods to connect to the
JobManager
+ * and select the one where the connection attempt is successful.
+ * <p/>
+ * After selecting the network interface, this method brings up an
actor system
+ * for the TaskManager and its actors, starts the TaskManager's services
+ * (library cache, shuffle network stack, ...), and starts the
TaskManager itself.
+ *
+ * @param configuration The configuration for the TaskManager.
+ * @param resourceID The id of the resource which the task
manager will run on.
+ */
+ private TaskExecutor selectNetworkInterfaceAndRunTaskManager(
--- End diff --
Why does only the Yarn factory selects the network interface and not the
standalone implementation?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---