[ 
https://issues.apache.org/jira/browse/FLINK-4363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15430436#comment-15430436
 ] 

ASF GitHub Bot commented on FLINK-4363:
---------------------------------------

Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2400#discussion_r75647172
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
    @@ -35,27 +79,634 @@
      */
     public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
     
    -   /** The unique resource ID of this TaskExecutor */
    +   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class);
    +
    +   /** Return code for critical errors during the runtime */
    +   private static final int RUNTIME_FAILURE_RETURN_CODE = 2;
    +
    +   /** The name of the TaskManager actor */
    +   private static final String TASK_MANAGER_NAME = "taskmanager";
    +
    +   /** The unique resource ID of this TaskManager */
        private final ResourceID resourceID;
     
        /** The access to the leader election and metadata storage services */
        private final HighAvailabilityServices haServices;
     
    -   // --------- resource manager --------
    +   /** The task manager configuration */
    +   private final TaskManagerConfiguration taskManagerConfig;
    +
    +   /** The connection information of the task manager */
    +   private final InstanceConnectionInfo connectionInfo;
    +
    +   /** The I/O manager component in the task manager */
    +   private final IOManager ioManager;
    +
    +   /** The memory manager component in the task manager */
    +   private final MemoryManager memoryManager;
    +
    +   /** The network component in the task manager */
    +   private final NetworkEnvironment networkEnvironment;
    +
    +   /** The number of slots in the task manager, should be 1 for YARN */
    +   private final int numberOfSlots;
     
    +   // --------- resource manager --------
        private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
     
        // 
------------------------------------------------------------------------
     
        public TaskExecutor(
    +                   TaskManagerConfiguration taskManagerConfig,
    +                   ResourceID resourceID,
    +                   InstanceConnectionInfo connectionInfo,
    +                   MemoryManager memoryManager,
    +                   IOManager ioManager,
    +                   NetworkEnvironment networkEnvironment,
    +                   int numberOfSlots,
                        RpcService rpcService,
    -                   HighAvailabilityServices haServices,
    -                   ResourceID resourceID) {
    +                   HighAvailabilityServices haServices) {
     
                super(rpcService);
     
    -           this.haServices = checkNotNull(haServices);
    +           this.taskManagerConfig = checkNotNull(taskManagerConfig);
                this.resourceID = checkNotNull(resourceID);
    +           this.connectionInfo = checkNotNull(connectionInfo);
    +           this.memoryManager = checkNotNull(memoryManager);
    +           this.ioManager = checkNotNull(ioManager);
    +           this.networkEnvironment = checkNotNull(networkEnvironment);
    +           this.numberOfSlots = checkNotNull(numberOfSlots);
    +           this.haServices = checkNotNull(haServices);
    +   }
    +
    +   /**
    +    * Starts and runs the TaskManager.
    +    * <p/>
    +    * This method first tries to select the network interface to use for 
the TaskManager
    +    * communication. The network interface is used both for the actor 
communication
    +    * (coordination) as well as for the data exchange between task 
managers. Unless
    +    * the hostname/interface is explicitly configured in the 
configuration, this
    +    * method will try out various interfaces and methods to connect to the 
JobManager
    +    * and select the one where the connection attempt is successful.
    +    * <p/>
    +    * After selecting the network interface, this method brings up an 
actor system
    +    * for the TaskManager and its actors, starts the TaskManager's services
    +    * (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
    +    *
    +    * @param configuration    The configuration for the TaskManager.
    +    * @param taskManagerClass The actor class to instantiate.
    +    *                         Allows to use TaskManager subclasses for 
example for YARN.
    +    */
    +   public static void selectNetworkInterfaceAndRunTaskManager(
    +           Configuration configuration,
    +           ResourceID resourceID,
    +           Class<? extends TaskManager> taskManagerClass) throws Exception 
{
    +
    +           Tuple2<String, Integer> tuple2 = 
selectNetworkInterfaceAndPort(configuration);
    +
    +           runTaskManager(tuple2._1(), resourceID, tuple2._2(), 
configuration, taskManagerClass);
    +   }
    +
    +   private static Tuple2<String, Integer> 
selectNetworkInterfaceAndPort(Configuration configuration)
    +           throws Exception {
    +           String taskManagerHostname = 
configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null);
    +           if (taskManagerHostname != null) {
    +                   LOG.info("Using configured hostname/address for 
TaskManager: " + taskManagerHostname);
    +           } else {
    +                   LeaderRetrievalService leaderRetrievalService = 
LeaderRetrievalUtils.createLeaderRetrievalService(configuration);
    +                   FiniteDuration lookupTimeout = 
AkkaUtils.getLookupTimeout(configuration);
    +
    +                   InetAddress taskManagerAddress = 
LeaderRetrievalUtils.findConnectingAddress(leaderRetrievalService, 
lookupTimeout);
    +                   taskManagerHostname = taskManagerAddress.getHostName();
    +                   LOG.info("TaskManager will use hostname/address '{}' 
({}) for communication.",
    +                           taskManagerHostname, 
taskManagerAddress.getHostAddress());
    +           }
    +
    +           // if no task manager port has been configured, use 0 (system 
will pick any free port)
    +           int actorSystemPort = 
configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0);
    +           if (actorSystemPort < 0 || actorSystemPort > 65535) {
    +                   throw new IllegalConfigurationException("Invalid value 
for '" +
    +                           ConfigConstants.TASK_MANAGER_IPC_PORT_KEY +
    +                           "' (port for the TaskManager actor system) : " 
+ actorSystemPort +
    +                           " - Leave config parameter empty or use 0 to 
let the system choose a port automatically.");
    +           }
    +
    +           return new Tuple2<>(taskManagerHostname, actorSystemPort);
    +   }
    +
    +   /**
    +    * Starts and runs the TaskManager. Brings up an actor system for the 
TaskManager and its
    +    * actors, starts the TaskManager's services (library cache, shuffle 
network stack, ...),
    +    * and starts the TaskManager itself.
    +    * <p/>
    +    * This method will also spawn a process reaper for the TaskManager 
(kill the process if
    +    * the actor fails) and optionally start the JVM memory logging thread.
    +    *
    +    * @param taskManagerHostname The hostname/address of the interface 
where the actor system
    +    *                            will communicate.
    +    * @param resourceID          The id of the resource which the task 
manager will run on.
    +    * @param actorSystemPort     The port at which the actor system will 
communicate.
    +    * @param configuration       The configuration for the TaskManager.
    +    * @param taskManagerClass    The actor class to instantiate. Allows 
the use of TaskManager
    +    *                            subclasses for example for YARN.
    +    */
    +   private static void runTaskManager(
    +           String taskManagerHostname,
    +           ResourceID resourceID,
    +           int actorSystemPort,
    +           Configuration configuration,
    +           Class<? extends TaskManager> taskManagerClass) throws Exception 
{
    +
    +           LOG.info("Starting TaskManager");
    +
    +           // Bring up the TaskManager actor system first, bind it to the 
given address.
    +
    +           LOG.info("Starting TaskManager actor system at " +
    +                   NetUtils.hostAndPortToUrlString(taskManagerHostname, 
actorSystemPort));
    +
    +           ActorSystem taskManagerSystem;
    +           try {
    +                   Tuple2<String, Object> address = new Tuple2<String, 
Object>(taskManagerHostname, actorSystemPort);
    +                   Config akkaConfig = 
AkkaUtils.getAkkaConfig(configuration, new Some<>(address));
    +                   LOG.debug("Using akka configuration\n " + akkaConfig);
    +                   taskManagerSystem = 
AkkaUtils.createActorSystem(akkaConfig);
    +           } catch (Throwable t) {
    +                   if (t instanceof 
org.jboss.netty.channel.ChannelException) {
    +                           Throwable cause = t.getCause();
    +                           if (cause != null && t.getCause() instanceof 
java.net.BindException) {
    +                                   String address = 
NetUtils.hostAndPortToUrlString(taskManagerHostname, actorSystemPort);
    +                                   throw new IOException("Unable to bind 
TaskManager actor system to address " +
    +                                           address + " - " + 
cause.getMessage(), t);
    +                           }
    +                   }
    +                   throw new Exception("Could not create TaskManager actor 
system", t);
    +           }
    +
    +           // start all the TaskManager services (network stack,  library 
cache, ...)
    +           // and the TaskManager actor
    +           try {
    +                   LOG.info("Starting TaskManager actor");
    +                   ActorRef taskManagerActor = 
startTaskManagerComponentsAndActor(
    +                           configuration,
    +                           resourceID,
    +                           taskManagerSystem,
    +                           taskManagerHostname,
    +                           TASK_MANAGER_NAME,
    +                           null,
    +                           false,
    +                           taskManagerClass);
    +
    +                   // start a process reaper that watches the JobManager. 
If the TaskManager actor dies,
    +                   // the process reaper will kill the JVM process (to 
ensure easy failure detection)
    +                   LOG.debug("Starting TaskManager process reaper");
    +
    +                   taskManagerSystem.actorOf(
    +                           Props.create(ProcessReaper.class, 
taskManagerActor, LOG, RUNTIME_FAILURE_RETURN_CODE),
    +                           "TaskManager_Process_Reaper");
    +
    +                   // if desired, start the logging daemon that 
periodically logs the memory usage information
    +                   if (LOG.isInfoEnabled() && configuration.getBoolean(
    +                           
ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD,
    +                           
ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD)) {
    +                           LOG.info("Starting periodic memory usage 
logger");
    +
    +                           long interval = configuration.getLong(
    +                                   
ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS,
    +                                   
ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS);
    +
    +                           MemoryLogger logger = new MemoryLogger(LOG, 
interval, taskManagerSystem);
    +                           logger.start();
    +                   }
    +
    +                   // block until everything is done
    +                   taskManagerSystem.awaitTermination();
    +           } catch (Throwable t) {
    +                   LOG.error("Error while starting up taskManager", t);
    --- End diff --
    
    This message can be misleading because an exception can also be thrown 
after startup.


> Implement TaskManager basic startup of all components in java
> -------------------------------------------------------------
>
>                 Key: FLINK-4363
>                 URL: https://issues.apache.org/jira/browse/FLINK-4363
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Cluster Management
>            Reporter: Zhijiang Wang
>            Assignee: Zhijiang Wang
>
> Similar with current {{TaskManager}},but implement initialization and startup 
> all components in java instead of scala.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to