[
https://issues.apache.org/jira/browse/FLINK-4505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15458217#comment-15458217
]
ASF GitHub Bot commented on FLINK-4505:
---------------------------------------
Github user wangzhijiang999 commented on a diff in the pull request:
https://github.com/apache/flink/pull/2461#discussion_r77329068
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/YarnTaskExecutorFactory.java
---
@@ -0,0 +1,198 @@
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.taskmanager.MemoryLogger;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.util.NetUtils;
+
+import akka.actor.ActorSystem;
+import akka.util.Timeout;
+
+import scala.Some;
+import scala.Tuple2;
+import scala.concurrent.duration.FiniteDuration;
+
+import com.typesafe.config.Config;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An factory for creating {@link TaskExecutor} and starting it in yarn
mode.
+ */
+public class YarnTaskExecutorFactory extends TaskExecutorFactory {
+
+ public YarnTaskExecutorFactory(Configuration configuration, ResourceID
resourceID) {
+ super(configuration, resourceID);
+ }
+
+ @Override
+ public TaskExecutor createAndStartTaskExecutor() throws Exception {
+ return selectNetworkInterfaceAndRunTaskManager(configuration,
resourceID);
+ }
+
+ /**
+ * Starts and runs the TaskManager.
+ * <p/>
+ * This method first tries to select the network interface to use for
the TaskManager
+ * communication. The network interface is used both for the actor
communication
+ * (coordination) as well as for the data exchange between task
managers. Unless
+ * the hostname/interface is explicitly configured in the
configuration, this
+ * method will try out various interfaces and methods to connect to the
JobManager
+ * and select the one where the connection attempt is successful.
+ * <p/>
+ * After selecting the network interface, this method brings up an
actor system
+ * for the TaskManager and its actors, starts the TaskManager's services
+ * (library cache, shuffle network stack, ...), and starts the
TaskManager itself.
+ *
+ * @param configuration The configuration for the TaskManager.
+ * @param resourceID The id of the resource which the task
manager will run on.
+ */
+ private TaskExecutor selectNetworkInterfaceAndRunTaskManager(
--- End diff --
The parameters provided from different modes are different as I referred to
previous implementation for **TaskManager**. For example,
**YarnTaskManagerRunner** invokes the method
"selectNetworkInterfaceAndRunTaskManager", and **ForkableFlinkMiniCluster**
invokes the method "startTaskManagerComponentsAndActor" to start
**TaskManager** before. So I retained the previous ways for different modes.
> Implement TaskManagerFactory to bring up TaskManager for different modes
> ------------------------------------------------------------------------
>
> Key: FLINK-4505
> URL: https://issues.apache.org/jira/browse/FLINK-4505
> Project: Flink
> Issue Type: Sub-task
> Components: Cluster Management
> Reporter: Zhijiang Wang
> Assignee: Zhijiang Wang
> Priority: Minor
>
> Implement {{TaskExecutorFactory}} that should be an abstract class with the
> helper methods to bring up the {{TaskManager}}. The factory can be
> implemented by some classes to start a {{TaskManager}} in different modes
> (testing, standalone, yarn).
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)