Github user wangzhijiang999 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2400#discussion_r75651382
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
    @@ -35,27 +79,634 @@
      */
     public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
     
    -   /** The unique resource ID of this TaskExecutor */
    +   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class);
    +
    +   /** Return code for critical errors during the runtime */
    +   private static final int RUNTIME_FAILURE_RETURN_CODE = 2;
    +
    +   /** The name of the TaskManager actor */
    +   private static final String TASK_MANAGER_NAME = "taskmanager";
    +
    +   /** The unique resource ID of this TaskManager */
        private final ResourceID resourceID;
     
        /** The access to the leader election and metadata storage services */
        private final HighAvailabilityServices haServices;
     
    -   // --------- resource manager --------
    +   /** The task manager configuration */
    +   private final TaskManagerConfiguration taskManagerConfig;
    +
    +   /** The connection information of the task manager */
    +   private final InstanceConnectionInfo connectionInfo;
    +
    +   /** The I/O manager component in the task manager */
    +   private final IOManager ioManager;
    +
    +   /** The memory manager component in the task manager */
    +   private final MemoryManager memoryManager;
    +
    +   /** The network component in the task manager */
    +   private final NetworkEnvironment networkEnvironment;
    +
    +   /** The number of slots in the task manager, should be 1 for YARN */
    +   private final int numberOfSlots;
     
    +   // --------- resource manager --------
        private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
     
        // 
------------------------------------------------------------------------
     
        public TaskExecutor(
    +                   TaskManagerConfiguration taskManagerConfig,
    +                   ResourceID resourceID,
    +                   InstanceConnectionInfo connectionInfo,
    +                   MemoryManager memoryManager,
    +                   IOManager ioManager,
    +                   NetworkEnvironment networkEnvironment,
    +                   int numberOfSlots,
                        RpcService rpcService,
    -                   HighAvailabilityServices haServices,
    -                   ResourceID resourceID) {
    +                   HighAvailabilityServices haServices) {
     
                super(rpcService);
     
    -           this.haServices = checkNotNull(haServices);
    +           this.taskManagerConfig = checkNotNull(taskManagerConfig);
                this.resourceID = checkNotNull(resourceID);
    +           this.connectionInfo = checkNotNull(connectionInfo);
    +           this.memoryManager = checkNotNull(memoryManager);
    +           this.ioManager = checkNotNull(ioManager);
    +           this.networkEnvironment = checkNotNull(networkEnvironment);
    +           this.numberOfSlots = checkNotNull(numberOfSlots);
    +           this.haServices = checkNotNull(haServices);
    +   }
    +
    +   /**
    +    * Starts and runs the TaskManager.
    +    * <p/>
    +    * This method first tries to select the network interface to use for 
the TaskManager
    +    * communication. The network interface is used both for the actor 
communication
    +    * (coordination) as well as for the data exchange between task 
managers. Unless
    +    * the hostname/interface is explicitly configured in the 
configuration, this
    +    * method will try out various interfaces and methods to connect to the 
JobManager
    +    * and select the one where the connection attempt is successful.
    +    * <p/>
    +    * After selecting the network interface, this method brings up an 
actor system
    +    * for the TaskManager and its actors, starts the TaskManager's services
    +    * (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
    +    *
    +    * @param configuration    The configuration for the TaskManager.
    +    * @param taskManagerClass The actor class to instantiate.
    +    *                         Allows to use TaskManager subclasses for 
example for YARN.
    +    */
    +   public static void selectNetworkInterfaceAndRunTaskManager(
    +           Configuration configuration,
    +           ResourceID resourceID,
    +           Class<? extends TaskManager> taskManagerClass) throws Exception 
{
    +
    +           Tuple2<String, Integer> tuple2 = 
selectNetworkInterfaceAndPort(configuration);
    +
    +           runTaskManager(tuple2._1(), resourceID, tuple2._2(), 
configuration, taskManagerClass);
    +   }
    +
    +   private static Tuple2<String, Integer> 
selectNetworkInterfaceAndPort(Configuration configuration)
    +           throws Exception {
    +           String taskManagerHostname = 
configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null);
    +           if (taskManagerHostname != null) {
    +                   LOG.info("Using configured hostname/address for 
TaskManager: " + taskManagerHostname);
    +           } else {
    +                   LeaderRetrievalService leaderRetrievalService = 
LeaderRetrievalUtils.createLeaderRetrievalService(configuration);
    +                   FiniteDuration lookupTimeout = 
AkkaUtils.getLookupTimeout(configuration);
    +
    +                   InetAddress taskManagerAddress = 
LeaderRetrievalUtils.findConnectingAddress(leaderRetrievalService, 
lookupTimeout);
    +                   taskManagerHostname = taskManagerAddress.getHostName();
    +                   LOG.info("TaskManager will use hostname/address '{}' 
({}) for communication.",
    +                           taskManagerHostname, 
taskManagerAddress.getHostAddress());
    +           }
    +
    +           // if no task manager port has been configured, use 0 (system 
will pick any free port)
    +           int actorSystemPort = 
configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0);
    +           if (actorSystemPort < 0 || actorSystemPort > 65535) {
    +                   throw new IllegalConfigurationException("Invalid value 
for '" +
    +                           ConfigConstants.TASK_MANAGER_IPC_PORT_KEY +
    +                           "' (port for the TaskManager actor system) : " 
+ actorSystemPort +
    +                           " - Leave config parameter empty or use 0 to 
let the system choose a port automatically.");
    +           }
    +
    +           return new Tuple2<>(taskManagerHostname, actorSystemPort);
    +   }
    +
    +   /**
    +    * Starts and runs the TaskManager. Brings up an actor system for the 
TaskManager and its
    +    * actors, starts the TaskManager's services (library cache, shuffle 
network stack, ...),
    +    * and starts the TaskManager itself.
    +    * <p/>
    +    * This method will also spawn a process reaper for the TaskManager 
(kill the process if
    +    * the actor fails) and optionally start the JVM memory logging thread.
    +    *
    +    * @param taskManagerHostname The hostname/address of the interface 
where the actor system
    +    *                            will communicate.
    +    * @param resourceID          The id of the resource which the task 
manager will run on.
    +    * @param actorSystemPort     The port at which the actor system will 
communicate.
    +    * @param configuration       The configuration for the TaskManager.
    +    * @param taskManagerClass    The actor class to instantiate. Allows 
the use of TaskManager
    +    *                            subclasses for example for YARN.
    +    */
    +   private static void runTaskManager(
    +           String taskManagerHostname,
    +           ResourceID resourceID,
    +           int actorSystemPort,
    +           Configuration configuration,
    +           Class<? extends TaskManager> taskManagerClass) throws Exception 
{
    +
    +           LOG.info("Starting TaskManager");
    +
    +           // Bring up the TaskManager actor system first, bind it to the 
given address.
    +
    +           LOG.info("Starting TaskManager actor system at " +
    +                   NetUtils.hostAndPortToUrlString(taskManagerHostname, 
actorSystemPort));
    +
    +           ActorSystem taskManagerSystem;
    +           try {
    +                   Tuple2<String, Object> address = new Tuple2<String, 
Object>(taskManagerHostname, actorSystemPort);
    +                   Config akkaConfig = 
AkkaUtils.getAkkaConfig(configuration, new Some<>(address));
    +                   LOG.debug("Using akka configuration\n " + akkaConfig);
    +                   taskManagerSystem = 
AkkaUtils.createActorSystem(akkaConfig);
    +           } catch (Throwable t) {
    +                   if (t instanceof 
org.jboss.netty.channel.ChannelException) {
    +                           Throwable cause = t.getCause();
    +                           if (cause != null && t.getCause() instanceof 
java.net.BindException) {
    +                                   String address = 
NetUtils.hostAndPortToUrlString(taskManagerHostname, actorSystemPort);
    +                                   throw new IOException("Unable to bind 
TaskManager actor system to address " +
    +                                           address + " - " + 
cause.getMessage(), t);
    +                           }
    +                   }
    +                   throw new Exception("Could not create TaskManager actor 
system", t);
    +           }
    +
    +           // start all the TaskManager services (network stack,  library 
cache, ...)
    +           // and the TaskManager actor
    +           try {
    +                   LOG.info("Starting TaskManager actor");
    +                   ActorRef taskManagerActor = 
startTaskManagerComponentsAndActor(
    +                           configuration,
    +                           resourceID,
    +                           taskManagerSystem,
    +                           taskManagerHostname,
    +                           TASK_MANAGER_NAME,
    +                           null,
    +                           false,
    +                           taskManagerClass);
    +
    +                   // start a process reaper that watches the JobManager. 
If the TaskManager actor dies,
    +                   // the process reaper will kill the JVM process (to 
ensure easy failure detection)
    +                   LOG.debug("Starting TaskManager process reaper");
    +
    +                   taskManagerSystem.actorOf(
    +                           Props.create(ProcessReaper.class, 
taskManagerActor, LOG, RUNTIME_FAILURE_RETURN_CODE),
    +                           "TaskManager_Process_Reaper");
    +
    +                   // if desired, start the logging daemon that 
periodically logs the memory usage information
    +                   if (LOG.isInfoEnabled() && configuration.getBoolean(
    +                           
ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD,
    +                           
ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD)) {
    +                           LOG.info("Starting periodic memory usage 
logger");
    +
    +                           long interval = configuration.getLong(
    +                                   
ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS,
    +                                   
ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS);
    +
    +                           MemoryLogger logger = new MemoryLogger(LOG, 
interval, taskManagerSystem);
    +                           logger.start();
    +                   }
    +
    +                   // block until everything is done
    +                   taskManagerSystem.awaitTermination();
    +           } catch (Throwable t) {
    +                   LOG.error("Error while starting up taskManager", t);
    +                   try {
    +                           taskManagerSystem.shutdown();
    +                   } catch (Throwable tt) {
    +                           LOG.warn("Could not cleanly shut down actor 
system", tt);
    +                   }
    +                   throw t;
    +           }
    +   }
    +
    +   // 
--------------------------------------------------------------------------
    +   //  Starting and running the TaskManager
    +   // 
--------------------------------------------------------------------------
    +
    +   /**
    +    * @param configuration                 The configuration for the 
TaskManager.
    +    * @param resourceID                    The id of the resource which 
the task manager will run on.
    +    * @param actorSystem                  The actor system that should run 
the TaskManager actor.
    +    * @param taskManagerHostname       The hostname/address that describes 
the TaskManager's data location.
    +    * @param taskManagerActorName      Optionally the name of the 
TaskManager actor. If none is given,
    +    *                                      the actor will use a random 
name.
    +    * @param leaderRetrievalService        Optionally, a leader retrieval 
service can be provided. If none is given,
    +    *                                      then a LeaderRetrievalService 
is constructed from the configuration.
    +    * @param localTaskManagerCommunication If true, the TaskManager will 
not initiate the TCP network stack.
    +    * @param taskManagerClass      The class of the TaskManager actor. May 
be used to give
    +    *                                      subclasses that understand 
additional actor messages.
    +    * @return An ActorRef to the TaskManager actor.
    +    * @throws org.apache.flink.configuration.IllegalConfigurationException 
    Thrown, if the given config contains illegal values.
    +    * @throws java.io.IOException      Thrown, if any of the I/O 
components (such as buffer pools,
    +    *                                       I/O manager, ...) cannot be 
properly started.
    +    * @throws java.lang.Exception      Thrown is some other error occurs 
while parsing the configuration
    +    *                                      or starting the TaskManager 
components.
    +    */
    +   public static ActorRef startTaskManagerComponentsAndActor(
    +           Configuration configuration,
    +           ResourceID resourceID,
    +           ActorSystem actorSystem,
    +           String taskManagerHostname,
    +           String taskManagerActorName,
    +           LeaderRetrievalService leaderRetrievalService,
    +           boolean localTaskManagerCommunication,
    +           Class<? extends TaskManager> taskManagerClass) throws Exception 
{
    +
    +           Tuple4<TaskManagerConfiguration, 
NetworkEnvironmentConfiguration, InstanceConnectionInfo, MemoryType> tuple4
    +                   = parseTaskManagerConfiguration(configuration, 
taskManagerHostname, localTaskManagerCommunication);
    +
    +           TaskManagerConfiguration taskManagerConfig = tuple4._1();
    +           NetworkEnvironmentConfiguration netConfig = tuple4._2();
    +           InstanceConnectionInfo connectionInfo = tuple4._3();
    +           MemoryType memType = tuple4._4();
    +
    +           // pre-start checks
    +           checkTempDirs(taskManagerConfig.tmpDirPaths());
    +
    +           ExecutionContext executionContext = 
ExecutionContexts$.MODULE$.fromExecutor(new ForkJoinPool());
    +
    +           // we start the network first, to make sure it can allocate its 
buffers first
    +           NetworkEnvironment network = new 
NetworkEnvironment(executionContext, taskManagerConfig.timeout(), netConfig);
    +
    +           // computing the amount of memory to use depends on how much 
memory is available
    +           // it strictly needs to happen AFTER the network stack has been 
initialized
    +
    +           // check if a value has been configured
    +           long configuredMemory = 
configuration.getLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
    +           checkConfigParameter(configuredMemory == -1 || configuredMemory 
> 0, configuredMemory,
    +                   ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY,
    +                   "MemoryManager needs at least one MB of memory. " +
    +                           "If you leave this config parameter empty, the 
system automatically " +
    +                           "pick a fraction of the available memory.");
    +
    +           long memorySize;
    +           boolean preAllocateMemory = configuration.getBoolean(
    +                   ConfigConstants.TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY,
    +                   
ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_PRE_ALLOCATE);
    +           if (configuredMemory > 0) {
    +                   if (preAllocateMemory) {
    +                           LOG.info("Using {} MB for managed memory." , 
configuredMemory);
    +                   } else {
    +                           LOG.info("Limiting managed memory to {} MB, 
memory will be allocated lazily." , configuredMemory);
    +                   }
    +                   memorySize = configuredMemory << 20; // megabytes to 
bytes
    +           } else {
    +                   float fraction = configuration.getFloat(
    +                           
ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
    +                           
ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION);
    +                   checkConfigParameter(fraction > 0.0f && fraction < 
1.0f, fraction,
    +                           
ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
    +                           "MemoryManager fraction of the free memory must 
be between 0.0 and 1.0");
    +
    +                   if (memType == MemoryType.HEAP) {
    +                           long relativeMemSize = (long) 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * fraction);
    +                           if (preAllocateMemory) {
    +                                   LOG.info("Using {} of the currently 
free heap space for managed heap memory ({} MB)." ,
    +                                           fraction , relativeMemSize >> 
20);
    +                           } else {
    +                                   LOG.info("Limiting managed memory to {} 
of the currently free heap space ({} MB), " +
    +                                           "memory will be allocated 
lazily." , fraction , relativeMemSize >> 20);
    +                           }
    +                           memorySize = relativeMemSize;
    +                   } else if (memType == MemoryType.OFF_HEAP) {
    +                           // The maximum heap memory has been adjusted 
according to the fraction
    +                           long maxMemory = 
EnvironmentInformation.getMaxJvmHeapMemory();
    +                           long directMemorySize = (long) (maxMemory / 
(1.0 - fraction) * fraction);
    +                           if (preAllocateMemory) {
    +                                   LOG.info("Using {} of the maximum 
memory size for managed off-heap memory ({} MB)." ,
    +                                           fraction, directMemorySize >> 
20);
    +                           } else {
    +                                   LOG.info("Limiting managed memory to {} 
of the maximum memory size ({} MB)," +
    +                                           " memory will be allocated 
lazily.", fraction, directMemorySize >> 20);
    +                           }
    +                           memorySize = directMemorySize;
    +                   } else {
    +                           throw new RuntimeException("No supported memory 
type detected.");
    +                   }
    +           }
    +
    +           // now start the memory manager
    +           MemoryManager memoryManager;
    +           try {
    +                   memoryManager = new MemoryManager(
    +                           memorySize,
    +                           taskManagerConfig.numberOfSlots(),
    +                           netConfig.networkBufferSize(),
    +                           memType,
    +                           preAllocateMemory);
    +           } catch (OutOfMemoryError e) {
    +                   if (memType == MemoryType.HEAP) {
    +                           throw new Exception("OutOfMemory error (" + 
e.getMessage() +
    +                                   ") while allocating the TaskManager 
heap memory (" + memorySize + " bytes).", e);
    +                   } else if (memType == MemoryType.OFF_HEAP) {
    +                           throw new Exception("OutOfMemory error (" + 
e.getMessage() +
    +                                   ") while allocating the TaskManager 
off-heap memory (" + memorySize +
    +                                   " bytes).Try increasing the maximum 
direct memory (-XX:MaxDirectMemorySize)", e);
    +                   } else {
    +                           throw e;
    +                   }
    +           }
    +
    +           // start the I/O manager, it will create some temp directories.
    +           IOManager ioManager = new 
IOManagerAsync(taskManagerConfig.tmpDirPaths());
    +
    +           if (leaderRetrievalService == null){
    +                   leaderRetrievalService = 
LeaderRetrievalUtils.createLeaderRetrievalService(configuration);
    +           }
    +
    +           // create the actor properties (which define the actor 
constructor parameters)
    +           Props tmProps = Props.create(
    +                   taskManagerClass,
    +                   taskManagerConfig,
    +                   resourceID,
    +                   connectionInfo,
    +                   memoryManager,
    +                   ioManager,
    +                   network,
    +                   taskManagerConfig.numberOfSlots(),
    +                   leaderRetrievalService);
    +
    +           ActorRef taskManagerActorRef;
    +           if (taskManagerActorName != null && 
!taskManagerActorName.equals("")) {
    +                   taskManagerActorRef = actorSystem.actorOf(tmProps, 
taskManagerActorName);
    +           } else {
    +                   taskManagerActorRef = actorSystem.actorOf(tmProps);
    +           }
    +
    +           return taskManagerActorRef;
    +   }
    +
    +   // 
--------------------------------------------------------------------------
    +   //  Parsing and checking the TaskManager Configuration
    +   // 
--------------------------------------------------------------------------
    +
    +   /**
    +    * Utility method to extract TaskManager config parameters from the 
configuration and to
    +    * sanity check them.
    +    *
    +    * @param configuration                 The configuration.
    +    * @param taskManagerHostname           The host name under which the 
TaskManager communicates.
    +    * @param localTaskManagerCommunication True, to skip initializing the 
network stack.
    +    *                                      Use only in cases where only 
one task manager runs.
    +    * @return A tuple (TaskManagerConfiguration, network configuration,
    +    * InstanceConnectionInfo, JobManager actor Akka URL).
    +    */
    +   private static Tuple4<TaskManagerConfiguration, 
NetworkEnvironmentConfiguration, InstanceConnectionInfo, MemoryType>
    --- End diff --
    
    Yes, it is better to do that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to