[
https://issues.apache.org/jira/browse/FLINK-4738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15561670#comment-15561670
]
ASF GitHub Bot commented on FLINK-4738:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2594#discussion_r82563262
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
---
@@ -127,12 +195,423 @@ public void start() {
}
}
+ /**
+ * Called to shut down the TaskManager. The method closes all
TaskManager services.
+ */
+ @Override
+ public void shutDown() {
+ log.info("Stopping TaskManager {}.", getAddress());
+
+ if (resourceManagerConnection.isConnected()) {
+ try {
+ resourceManagerConnection.close();
+ } catch (Exception e) {
+ log.error("Could not cleanly close the
ResourceManager connection.", e);
+ }
+ }
+
+ try {
+ ioManager.shutdown();
+ } catch (Exception e) {
+ log.error("IOManager did not shut down properly.", e);
+ }
+
+ try {
+ memoryManager.shutdown();
+ } catch (Exception e) {
+ log.error("MemoryManager did not shut down properly.",
e);
+ }
+
+ try {
+ networkEnvironment.shutdown();
+ } catch (Exception e) {
+ log.error("Network environment did not shut down
properly.", e);
+ }
+
+ try {
+ fileCache.shutdown();
+ } catch (Exception e) {
+ log.error("File cache did not shut down properly.", e);
+ }
+
+ try {
+ metricRegistry.shutdown();
+ } catch (Exception e) {
+ log.error("MetricRegistry did not shut down properly.",
e);
+ }
+
+ log.info("Stopped TaskManager {}.", getAddress());
+ }
+
+ //
========================================================================
+ // RPC methods
+ //
========================================================================
+
+ //
----------------------------------------------------------------------
+ // Task lifecycle RPCs
+ //
----------------------------------------------------------------------
+
+ @RpcMethod
+ public Acknowledge submitTask(TaskDeploymentDescriptor tdd, ResourceID
jobManagerID) throws TaskSubmissionException {
+
+ JobManagerConnection jobManagerConnection =
getJobManagerConnection(jobManagerID);
+
+ if (jobManagerConnection == null) {
+ final String message = "Could not submit task because
JobManager " + jobManagerID +
+ " was not associated.";
+
+ log.debug(message);
+ throw new TaskSubmissionException(message);
+ }
+
+ TaskSlot taskSlot = taskSlots.get(tdd.getAllocationID());
+
+ if (taskSlot == null) {
+ final String message = "No task slot allocated for
allocation ID " + tdd.getAllocationID() + '.';
+ log.debug(message);
+ throw new TaskSubmissionException(message);
+ }
+
+ TaskMetricGroup taskMetricGroup =
taskManagerMetricGroup.addTaskForJob(tdd);
+
+ InputSplitProvider inputSplitProvider = new
RpcInputSplitProvider(
+ jobManagerConnection.getJobManagerGateway(),
+ tdd.getJobID(),
+ tdd.getVertexID(),
+ tdd.getExecutionId(),
+ taskManagerConfiguration.getTimeout());
+
+ TaskManagerActions taskManagerActions =
jobManagerConnection.getTaskManagerActions();
+ CheckpointResponder checkpointResponder =
jobManagerConnection.getCheckpointResponder();
+ LibraryCacheManager libraryCache =
jobManagerConnection.getLibraryCacheManager();
+ ResultPartitionConsumableNotifier
resultPartitionConsumableNotifier =
jobManagerConnection.getResultPartitionConsumableNotifier();
+ PartitionStateChecker partitionStateChecker =
jobManagerConnection.getPartitionStateChecker();
+
+ Task task = new Task(
+ tdd,
+ memoryManager,
+ ioManager,
+ networkEnvironment,
+ broadcastVariableManager,
+ taskManagerActions,
+ inputSplitProvider,
+ checkpointResponder,
+ libraryCache,
+ fileCache,
+ taskManagerRuntimeInfo,
+ taskMetricGroup,
+ resultPartitionConsumableNotifier,
+ partitionStateChecker,
+ getRpcService().getExecutor());
+
+ log.info("Received task {}.",
task.getTaskInfo().getTaskNameWithSubtasks());
+
+ if(taskSlot.add(task)) {
+ TaskSlotMapping taskSlotMapping = new
TaskSlotMapping(task, taskSlot);
+
+ taskSlotMappings.put(task.getExecutionId(),
taskSlotMapping);
+ task.startTaskThread();
+
+ return Acknowledge.get();
+ } else {
+ final String message = "TaskManager already contains a
task for id " +
+ task.getExecutionId() + '.';
+
+ log.debug(message);
+ throw new TaskSubmissionException(message);
+ }
+ }
+
+ @RpcMethod
+ public Acknowledge cancelTask(ExecutionAttemptID executionAttemptID)
throws TaskException {
+ final Task task = getTask(executionAttemptID);
+
+ if (task != null) {
+ try {
+ task.cancelExecution();
+ return Acknowledge.get();
+ } catch (Throwable t) {
+ throw new TaskException("Cannot cancel task for
execution " + executionAttemptID + '.', t);
+ }
+ } else {
+ final String message = "Cannot find task to stop for
execution " + executionAttemptID + '.';
+
+ log.debug(message);
+ throw new TaskException(message);
+ }
+ }
+
+ @RpcMethod
+ public Acknowledge stopTask(ExecutionAttemptID executionAttemptID)
throws TaskException {
+ final Task task = getTask(executionAttemptID);
+
+ if (task != null) {
+ try {
+ task.stopExecution();
+ return Acknowledge.get();
+ } catch (Throwable t) {
+ throw new TaskException("Cannot stop task for
execution " + executionAttemptID + '.', t);
+ }
+ } else {
+ final String message = "Cannot find task to stop for
execution " + executionAttemptID + '.';
+
+ log.debug(message);
+ throw new TaskException(message);
+ }
+ }
+
+ //
----------------------------------------------------------------------
+ // Partition lifecycle RPCs
+ //
----------------------------------------------------------------------
+
+ @RpcMethod
+ public Acknowledge updatePartitions(final ExecutionAttemptID
executionAttemptID, Collection<PartitionInfo> partitionInfos) throws
PartitionException {
+ final Task task = getTask(executionAttemptID);
+
+ if (task != null) {
+ for (final PartitionInfo partitionInfo: partitionInfos)
{
+ IntermediateDataSetID
intermediateResultPartitionID = partitionInfo.getIntermediateDataSetID();
+
+ final SingleInputGate singleInputGate =
task.getInputGateById(intermediateResultPartitionID);
+
+ if (singleInputGate != null) {
+ // Run asynchronously because it might
be blocking
+ getRpcService().execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+
singleInputGate.updateInputChannel(partitionInfo.getInputChannelDeploymentDescriptor());
+ } catch (IOException |
InterruptedException e) {
+
log.error("Could not update input data location for task {}. Trying to fail
task.", task.getTaskInfo().getTaskName(), e);
+
+ try {
+
task.failExternally(e);
+ } catch
(RuntimeException re) {
+ //
TODO: Check whether we need this or make exception in failExtenally checked
+
log.error("Failed canceling task with execution ID {} after task update
failure.", executionAttemptID, re);
+ }
+ }
+ }
+ });
+ } else {
+ throw new PartitionException("No reader
with ID " +
+ intermediateResultPartitionID +
" for task " + executionAttemptID +
+ " was found.");
+ }
+ }
+
+ return Acknowledge.get();
+ } else {
+ log.debug("Discard update for input partitions of task
{}. Task is no longer running.", executionAttemptID);
+ return Acknowledge.get();
+ }
+ }
+
+ @RpcMethod
+ public void failPartition(ExecutionAttemptID executionAttemptID) {
+ log.info("Discarding the results produced by task execution
{}.", executionAttemptID);
+
+ try {
+
networkEnvironment.getResultPartitionManager().releasePartitionsProducedBy(executionAttemptID);
+ } catch (Throwable t) {
+ // TODO: Do we still need this catch branch?
+ onFatalError(t);
+ }
+
+ // TODO: Maybe it's better to return an Acknowledge here to
notify the JM about the success/failure with an Exception
+ }
+
+ //
----------------------------------------------------------------------
+ // Checkpointing RPCs
+ //
----------------------------------------------------------------------
+
+ @RpcMethod
+ public Acknowledge triggerCheckpoint(ExecutionAttemptID
executionAttemptID, long checkpointId, long checkpointTimestamp) throws
CheckpointException {
+ log.debug("Trigger checkpoint {}@{} for {}.", checkpointId,
checkpointTimestamp, executionAttemptID);
+
+ final Task task = getTask(executionAttemptID);
+
+ if (task != null) {
+ task.triggerCheckpointBarrier(checkpointId,
checkpointTimestamp);
+
+ return Acknowledge.get();
+ } else {
+ final String message = "TaskManager received a
checkpoint request for unknown task " + executionAttemptID + '.';
+
+ log.debug(message);
+ throw new CheckpointException(message);
+ }
+ }
+
--- End diff --
Good catch. Will fix it.
> Port TaskManager logic to TaskExecutor
> --------------------------------------
>
> Key: FLINK-4738
> URL: https://issues.apache.org/jira/browse/FLINK-4738
> Project: Flink
> Issue Type: Sub-task
> Components: Distributed Coordination
> Reporter: Till Rohrmann
> Assignee: Till Rohrmann
>
> Port the basic operations of the {{TaskManager}} to the {{TaskExecutor}}.
> These operations include the task lifecycle methods, {{JobManager}}
> association logic and setup of TaskManager components.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)