[
https://issues.apache.org/jira/browse/FLINK-4363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15430472#comment-15430472
]
ASF GitHub Bot commented on FLINK-4363:
---------------------------------------
Github user mxm commented on a diff in the pull request:
https://github.com/apache/flink/pull/2400#discussion_r75649283
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
---
@@ -35,27 +79,634 @@
*/
public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
- /** The unique resource ID of this TaskExecutor */
+ private static final Logger LOG =
LoggerFactory.getLogger(TaskExecutor.class);
+
+ /** Return code for critical errors during the runtime */
+ private static final int RUNTIME_FAILURE_RETURN_CODE = 2;
+
+ /** The name of the TaskManager actor */
+ private static final String TASK_MANAGER_NAME = "taskmanager";
+
+ /** The unique resource ID of this TaskManager */
private final ResourceID resourceID;
/** The access to the leader election and metadata storage services */
private final HighAvailabilityServices haServices;
- // --------- resource manager --------
+ /** The task manager configuration */
+ private final TaskManagerConfiguration taskManagerConfig;
+
+ /** The connection information of the task manager */
+ private final InstanceConnectionInfo connectionInfo;
+
+ /** The I/O manager component in the task manager */
+ private final IOManager ioManager;
+
+ /** The memory manager component in the task manager */
+ private final MemoryManager memoryManager;
+
+ /** The network component in the task manager */
+ private final NetworkEnvironment networkEnvironment;
+
+ /** The number of slots in the task manager, should be 1 for YARN */
+ private final int numberOfSlots;
+ // --------- resource manager --------
private TaskExecutorToResourceManagerConnection
resourceManagerConnection;
//
------------------------------------------------------------------------
public TaskExecutor(
+ TaskManagerConfiguration taskManagerConfig,
+ ResourceID resourceID,
+ InstanceConnectionInfo connectionInfo,
+ MemoryManager memoryManager,
+ IOManager ioManager,
+ NetworkEnvironment networkEnvironment,
+ int numberOfSlots,
RpcService rpcService,
- HighAvailabilityServices haServices,
- ResourceID resourceID) {
+ HighAvailabilityServices haServices) {
super(rpcService);
- this.haServices = checkNotNull(haServices);
+ this.taskManagerConfig = checkNotNull(taskManagerConfig);
this.resourceID = checkNotNull(resourceID);
+ this.connectionInfo = checkNotNull(connectionInfo);
+ this.memoryManager = checkNotNull(memoryManager);
+ this.ioManager = checkNotNull(ioManager);
+ this.networkEnvironment = checkNotNull(networkEnvironment);
+ this.numberOfSlots = checkNotNull(numberOfSlots);
+ this.haServices = checkNotNull(haServices);
+ }
+
+ /**
+ * Starts and runs the TaskManager.
+ * <p/>
+ * This method first tries to select the network interface to use for
the TaskManager
+ * communication. The network interface is used both for the actor
communication
+ * (coordination) as well as for the data exchange between task
managers. Unless
+ * the hostname/interface is explicitly configured in the
configuration, this
+ * method will try out various interfaces and methods to connect to the
JobManager
+ * and select the one where the connection attempt is successful.
+ * <p/>
+ * After selecting the network interface, this method brings up an
actor system
+ * for the TaskManager and its actors, starts the TaskManager's services
+ * (library cache, shuffle network stack, ...), and starts the
TaskManager itself.
+ *
+ * @param configuration The configuration for the TaskManager.
+ * @param taskManagerClass The actor class to instantiate.
+ * Allows to use TaskManager subclasses for
example for YARN.
+ */
+ public static void selectNetworkInterfaceAndRunTaskManager(
+ Configuration configuration,
+ ResourceID resourceID,
+ Class<? extends TaskManager> taskManagerClass) throws Exception
{
+
+ Tuple2<String, Integer> tuple2 =
selectNetworkInterfaceAndPort(configuration);
+
+ runTaskManager(tuple2._1(), resourceID, tuple2._2(),
configuration, taskManagerClass);
+ }
+
+ private static Tuple2<String, Integer>
selectNetworkInterfaceAndPort(Configuration configuration)
+ throws Exception {
+ String taskManagerHostname =
configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null);
+ if (taskManagerHostname != null) {
+ LOG.info("Using configured hostname/address for
TaskManager: " + taskManagerHostname);
+ } else {
+ LeaderRetrievalService leaderRetrievalService =
LeaderRetrievalUtils.createLeaderRetrievalService(configuration);
+ FiniteDuration lookupTimeout =
AkkaUtils.getLookupTimeout(configuration);
+
+ InetAddress taskManagerAddress =
LeaderRetrievalUtils.findConnectingAddress(leaderRetrievalService,
lookupTimeout);
+ taskManagerHostname = taskManagerAddress.getHostName();
+ LOG.info("TaskManager will use hostname/address '{}'
({}) for communication.",
+ taskManagerHostname,
taskManagerAddress.getHostAddress());
+ }
+
+ // if no task manager port has been configured, use 0 (system
will pick any free port)
+ int actorSystemPort =
configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0);
+ if (actorSystemPort < 0 || actorSystemPort > 65535) {
+ throw new IllegalConfigurationException("Invalid value
for '" +
+ ConfigConstants.TASK_MANAGER_IPC_PORT_KEY +
+ "' (port for the TaskManager actor system) : "
+ actorSystemPort +
+ " - Leave config parameter empty or use 0 to
let the system choose a port automatically.");
+ }
+
+ return new Tuple2<>(taskManagerHostname, actorSystemPort);
+ }
+
+ /**
+ * Starts and runs the TaskManager. Brings up an actor system for the
TaskManager and its
+ * actors, starts the TaskManager's services (library cache, shuffle
network stack, ...),
+ * and starts the TaskManager itself.
+ * <p/>
+ * This method will also spawn a process reaper for the TaskManager
(kill the process if
+ * the actor fails) and optionally start the JVM memory logging thread.
+ *
+ * @param taskManagerHostname The hostname/address of the interface
where the actor system
+ * will communicate.
+ * @param resourceID The id of the resource which the task
manager will run on.
+ * @param actorSystemPort The port at which the actor system will
communicate.
+ * @param configuration The configuration for the TaskManager.
+ * @param taskManagerClass The actor class to instantiate. Allows
the use of TaskManager
+ * subclasses for example for YARN.
+ */
+ private static void runTaskManager(
+ String taskManagerHostname,
+ ResourceID resourceID,
+ int actorSystemPort,
+ Configuration configuration,
+ Class<? extends TaskManager> taskManagerClass) throws Exception
{
+
+ LOG.info("Starting TaskManager");
+
+ // Bring up the TaskManager actor system first, bind it to the
given address.
+
+ LOG.info("Starting TaskManager actor system at " +
+ NetUtils.hostAndPortToUrlString(taskManagerHostname,
actorSystemPort));
+
+ ActorSystem taskManagerSystem;
+ try {
+ Tuple2<String, Object> address = new Tuple2<String,
Object>(taskManagerHostname, actorSystemPort);
+ Config akkaConfig =
AkkaUtils.getAkkaConfig(configuration, new Some<>(address));
+ LOG.debug("Using akka configuration\n " + akkaConfig);
+ taskManagerSystem =
AkkaUtils.createActorSystem(akkaConfig);
+ } catch (Throwable t) {
+ if (t instanceof
org.jboss.netty.channel.ChannelException) {
+ Throwable cause = t.getCause();
+ if (cause != null && t.getCause() instanceof
java.net.BindException) {
+ String address =
NetUtils.hostAndPortToUrlString(taskManagerHostname, actorSystemPort);
+ throw new IOException("Unable to bind
TaskManager actor system to address " +
+ address + " - " +
cause.getMessage(), t);
+ }
+ }
+ throw new Exception("Could not create TaskManager actor
system", t);
+ }
+
+ // start all the TaskManager services (network stack, library
cache, ...)
+ // and the TaskManager actor
+ try {
+ LOG.info("Starting TaskManager actor");
+ ActorRef taskManagerActor =
startTaskManagerComponentsAndActor(
+ configuration,
+ resourceID,
+ taskManagerSystem,
+ taskManagerHostname,
+ TASK_MANAGER_NAME,
+ null,
+ false,
+ taskManagerClass);
+
+ // start a process reaper that watches the JobManager.
If the TaskManager actor dies,
+ // the process reaper will kill the JVM process (to
ensure easy failure detection)
+ LOG.debug("Starting TaskManager process reaper");
+
+ taskManagerSystem.actorOf(
+ Props.create(ProcessReaper.class,
taskManagerActor, LOG, RUNTIME_FAILURE_RETURN_CODE),
+ "TaskManager_Process_Reaper");
+
+ // if desired, start the logging daemon that
periodically logs the memory usage information
+ if (LOG.isInfoEnabled() && configuration.getBoolean(
+
ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD,
+
ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD)) {
+ LOG.info("Starting periodic memory usage
logger");
+
+ long interval = configuration.getLong(
+
ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS,
+
ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS);
+
+ MemoryLogger logger = new MemoryLogger(LOG,
interval, taskManagerSystem);
+ logger.start();
+ }
+
+ // block until everything is done
+ taskManagerSystem.awaitTermination();
+ } catch (Throwable t) {
+ LOG.error("Error while starting up taskManager", t);
+ try {
+ taskManagerSystem.shutdown();
+ } catch (Throwable tt) {
+ LOG.warn("Could not cleanly shut down actor
system", tt);
+ }
+ throw t;
+ }
+ }
+
+ //
--------------------------------------------------------------------------
+ // Starting and running the TaskManager
+ //
--------------------------------------------------------------------------
+
+ /**
+ * @param configuration The configuration for the
TaskManager.
+ * @param resourceID The id of the resource which
the task manager will run on.
+ * @param actorSystem The actor system that should run
the TaskManager actor.
+ * @param taskManagerHostname The hostname/address that describes
the TaskManager's data location.
+ * @param taskManagerActorName Optionally the name of the
TaskManager actor. If none is given,
+ * the actor will use a random
name.
+ * @param leaderRetrievalService Optionally, a leader retrieval
service can be provided. If none is given,
+ * then a LeaderRetrievalService
is constructed from the configuration.
+ * @param localTaskManagerCommunication If true, the TaskManager will
not initiate the TCP network stack.
+ * @param taskManagerClass The class of the TaskManager actor. May
be used to give
+ * subclasses that understand
additional actor messages.
+ * @return An ActorRef to the TaskManager actor.
+ * @throws org.apache.flink.configuration.IllegalConfigurationException
Thrown, if the given config contains illegal values.
+ * @throws java.io.IOException Thrown, if any of the I/O
components (such as buffer pools,
+ * I/O manager, ...) cannot be
properly started.
+ * @throws java.lang.Exception Thrown is some other error occurs
while parsing the configuration
+ * or starting the TaskManager
components.
+ */
+ public static ActorRef startTaskManagerComponentsAndActor(
+ Configuration configuration,
+ ResourceID resourceID,
+ ActorSystem actorSystem,
+ String taskManagerHostname,
+ String taskManagerActorName,
+ LeaderRetrievalService leaderRetrievalService,
+ boolean localTaskManagerCommunication,
+ Class<? extends TaskManager> taskManagerClass) throws Exception
{
+
+ Tuple4<TaskManagerConfiguration,
NetworkEnvironmentConfiguration, InstanceConnectionInfo, MemoryType> tuple4
+ = parseTaskManagerConfiguration(configuration,
taskManagerHostname, localTaskManagerCommunication);
+
+ TaskManagerConfiguration taskManagerConfig = tuple4._1();
+ NetworkEnvironmentConfiguration netConfig = tuple4._2();
+ InstanceConnectionInfo connectionInfo = tuple4._3();
+ MemoryType memType = tuple4._4();
+
+ // pre-start checks
+ checkTempDirs(taskManagerConfig.tmpDirPaths());
+
+ ExecutionContext executionContext =
ExecutionContexts$.MODULE$.fromExecutor(new ForkJoinPool());
+
+ // we start the network first, to make sure it can allocate its
buffers first
+ NetworkEnvironment network = new
NetworkEnvironment(executionContext, taskManagerConfig.timeout(), netConfig);
+
+ // computing the amount of memory to use depends on how much
memory is available
+ // it strictly needs to happen AFTER the network stack has been
initialized
+
+ // check if a value has been configured
+ long configuredMemory =
configuration.getLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
+ checkConfigParameter(configuredMemory == -1 || configuredMemory
> 0, configuredMemory,
+ ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY,
+ "MemoryManager needs at least one MB of memory. " +
+ "If you leave this config parameter empty, the
system automatically " +
+ "pick a fraction of the available memory.");
+
+ long memorySize;
+ boolean preAllocateMemory = configuration.getBoolean(
+ ConfigConstants.TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY,
+
ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_PRE_ALLOCATE);
+ if (configuredMemory > 0) {
+ if (preAllocateMemory) {
+ LOG.info("Using {} MB for managed memory." ,
configuredMemory);
+ } else {
+ LOG.info("Limiting managed memory to {} MB,
memory will be allocated lazily." , configuredMemory);
+ }
+ memorySize = configuredMemory << 20; // megabytes to
bytes
+ } else {
+ float fraction = configuration.getFloat(
+
ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
+
ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION);
+ checkConfigParameter(fraction > 0.0f && fraction <
1.0f, fraction,
+
ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
+ "MemoryManager fraction of the free memory must
be between 0.0 and 1.0");
+
+ if (memType == MemoryType.HEAP) {
+ long relativeMemSize = (long)
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * fraction);
+ if (preAllocateMemory) {
+ LOG.info("Using {} of the currently
free heap space for managed heap memory ({} MB)." ,
+ fraction , relativeMemSize >>
20);
+ } else {
+ LOG.info("Limiting managed memory to {}
of the currently free heap space ({} MB), " +
+ "memory will be allocated
lazily." , fraction , relativeMemSize >> 20);
+ }
+ memorySize = relativeMemSize;
+ } else if (memType == MemoryType.OFF_HEAP) {
+ // The maximum heap memory has been adjusted
according to the fraction
+ long maxMemory =
EnvironmentInformation.getMaxJvmHeapMemory();
+ long directMemorySize = (long) (maxMemory /
(1.0 - fraction) * fraction);
+ if (preAllocateMemory) {
+ LOG.info("Using {} of the maximum
memory size for managed off-heap memory ({} MB)." ,
+ fraction, directMemorySize >>
20);
+ } else {
+ LOG.info("Limiting managed memory to {}
of the maximum memory size ({} MB)," +
+ " memory will be allocated
lazily.", fraction, directMemorySize >> 20);
+ }
+ memorySize = directMemorySize;
+ } else {
+ throw new RuntimeException("No supported memory
type detected.");
+ }
+ }
+
+ // now start the memory manager
+ MemoryManager memoryManager;
+ try {
+ memoryManager = new MemoryManager(
+ memorySize,
+ taskManagerConfig.numberOfSlots(),
+ netConfig.networkBufferSize(),
+ memType,
+ preAllocateMemory);
+ } catch (OutOfMemoryError e) {
+ if (memType == MemoryType.HEAP) {
+ throw new Exception("OutOfMemory error (" +
e.getMessage() +
+ ") while allocating the TaskManager
heap memory (" + memorySize + " bytes).", e);
+ } else if (memType == MemoryType.OFF_HEAP) {
+ throw new Exception("OutOfMemory error (" +
e.getMessage() +
+ ") while allocating the TaskManager
off-heap memory (" + memorySize +
+ " bytes).Try increasing the maximum
direct memory (-XX:MaxDirectMemorySize)", e);
+ } else {
+ throw e;
+ }
+ }
+
+ // start the I/O manager, it will create some temp directories.
+ IOManager ioManager = new
IOManagerAsync(taskManagerConfig.tmpDirPaths());
+
+ if (leaderRetrievalService == null){
+ leaderRetrievalService =
LeaderRetrievalUtils.createLeaderRetrievalService(configuration);
+ }
+
+ // create the actor properties (which define the actor
constructor parameters)
+ Props tmProps = Props.create(
+ taskManagerClass,
+ taskManagerConfig,
+ resourceID,
+ connectionInfo,
+ memoryManager,
+ ioManager,
+ network,
+ taskManagerConfig.numberOfSlots(),
+ leaderRetrievalService);
+
+ ActorRef taskManagerActorRef;
--- End diff --
`final`
> Implement TaskManager basic startup of all components in java
> -------------------------------------------------------------
>
> Key: FLINK-4363
> URL: https://issues.apache.org/jira/browse/FLINK-4363
> Project: Flink
> Issue Type: Sub-task
> Components: Cluster Management
> Reporter: Zhijiang Wang
> Assignee: Zhijiang Wang
>
> Similar with current {{TaskManager}},but implement initialization and startup
> all components in java instead of scala.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)