[FLINK-4738] [TaskManager] Port TaskManager logic to new Flip-6 TaskManager

The ported logic contains the task lifecycle management methods, JobManager 
association and
setup of TaskManager components.

Introduce Rpc implementations for TaskManager components

Implement metrics setup

Move more TaskManager components out of the constructor to make TaskExecutor 
more testable

Add RpcMethod annotation to TaskExecutor#confirmCheckpoint

This closes #2594.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a00619a4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a00619a4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a00619a4

Branch: refs/heads/flip-6
Commit: a00619a4850e1239eb54bcf656bf1802b8403454
Parents: 9dfaf45
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Wed Sep 28 14:39:51 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 14 15:14:43 2016 +0200

----------------------------------------------------------------------
 .../CheckpointCoordinatorGateway.java           |  38 ++
 .../deployment/TaskDeploymentDescriptor.java    |   9 +
 .../runtime/executiongraph/PartitionInfo.java   |  47 ++
 .../flink/runtime/filecache/FileCache.java      |  17 +-
 .../jobgraph/tasks/InputSplitProvider.java      |   3 +-
 .../tasks/InputSplitProviderException.java      |  36 ++
 .../jobmaster/ExecutionGraphException.java      |  41 ++
 .../runtime/jobmaster/JobManagerException.java  |  39 ++
 .../flink/runtime/jobmaster/JobMaster.java      |  50 +-
 .../runtime/jobmaster/JobMasterGateway.java     |  30 +-
 .../jobmaster/MiniClusterJobDispatcher.java     |   2 +-
 .../runtime/jobmaster/SerializedInputSplit.java |  39 ++
 .../jobmaster/message/NextInputSplit.java       |  39 --
 .../flink/runtime/operators/DataSourceTask.java |  12 +-
 .../runtime/query/KvStateRegistryGateway.java   |  57 ++
 .../taskexecutor/JobManagerConnection.java      |  91 +++
 .../runtime/taskexecutor/TaskExecutor.java      | 627 +++++++++++++++++--
 .../taskexecutor/TaskExecutorGateway.java       |  80 ++-
 .../taskexecutor/TaskManagerConfiguration.java  |   3 +-
 .../runtime/taskexecutor/TaskManagerRunner.java |   3 +
 .../taskexecutor/TaskManagerServices.java       |  51 +-
 .../flink/runtime/taskexecutor/TaskSlot.java    |  73 +++
 .../runtime/taskexecutor/TaskSlotMapping.java   |  44 ++
 .../exceptions/CheckpointException.java         |  41 ++
 .../exceptions/PartitionException.java          |  41 ++
 .../taskexecutor/exceptions/TaskException.java  |  41 ++
 .../exceptions/TaskManagerException.java        |  41 ++
 .../exceptions/TaskSubmissionException.java     |  41 ++
 .../rpc/RpcCheckpointResponder.java             |  56 ++
 .../taskexecutor/rpc/RpcInputSplitProvider.java |  73 +++
 .../rpc/RpcKvStateRegistryListener.java         |  73 +++
 .../rpc/RpcPartitionStateChecker.java           |  48 ++
 .../RpcResultPartitionConsumableNotifier.java   |  67 ++
 .../utils/TaskExecutorMetricsInitializer.java   | 257 ++++++++
 .../ActorGatewayTaskManagerActions.java         |  59 ++
 .../ActorGatewayTaskManagerConnection.java      |  59 --
 .../apache/flink/runtime/taskmanager/Task.java  |  23 +-
 .../runtime/taskmanager/TaskExecutionState.java |   4 +-
 .../taskmanager/TaskInputSplitProvider.java     |  49 +-
 .../runtime/taskmanager/TaskManagerActions.java |  57 ++
 .../taskmanager/TaskManagerConnection.java      |  57 --
 .../flink/runtime/taskmanager/TaskManager.scala | 169 +----
 .../FileCacheDeleteValidationTest.java          |   4 +-
 .../jobmaster/JobManagerRunnerMockTest.java     |   1 -
 .../runtime/taskexecutor/TaskExecutorTest.java  |  24 +-
 .../runtime/taskmanager/TaskAsyncCallTest.java  |   2 +-
 .../taskmanager/TaskInputSplitProviderTest.java |   3 +-
 .../flink/runtime/taskmanager/TaskStopTest.java |   4 +-
 .../flink/runtime/taskmanager/TaskTest.java     |   4 +-
 .../source/InputFormatSourceFunction.java       |   8 +-
 .../tasks/InterruptSensitiveRestoreTest.java    |  11 +-
 .../streaming/runtime/tasks/StreamTaskTest.java |   4 +-
 52 files changed, 2320 insertions(+), 432 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
new file mode 100644
index 0000000..5a01e4d
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.state.CheckpointStateHandles;
+
+public interface CheckpointCoordinatorGateway extends RpcGateway {
+
+       void acknowledgeCheckpoint(
+                       final JobID jobID,
+                       final ExecutionAttemptID executionAttemptID,
+                       final CheckpointMetaData checkpointInfo,
+                       final CheckpointStateHandles checkpointStateHandles);
+
+       void declineCheckpoint(
+                       final JobID jobID,
+                       final ExecutionAttemptID executionAttemptID,
+                       final CheckpointMetaData checkpoint);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index 7bbdb2a..b1ac665 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.ChainedStateHandle;
@@ -57,6 +58,9 @@ public final class TaskDeploymentDescriptor implements 
Serializable {
        /** The ID referencing the attempt to execute the task. */
        private final ExecutionAttemptID executionId;
 
+       /** The allocation ID of the slot in which the task shall be run */
+       private final AllocationID allocationID;
+
        /** The task's name. */
        private final String taskName;
 
@@ -158,6 +162,7 @@ public final class TaskDeploymentDescriptor implements 
Serializable {
                this.operatorState = operatorState;
                this.keyGroupState = keyGroupState;
                this.partitionableOperatorState = 
partitionableOperatorStateHandles;
+               this.allocationID = new AllocationID();
        }
 
        public TaskDeploymentDescriptor(
@@ -322,6 +327,10 @@ public final class TaskDeploymentDescriptor implements 
Serializable {
                return requiredClasspaths;
        }
 
+       public AllocationID getAllocationID() {
+               return allocationID;
+       }
+
        @Override
        public String toString() {
                return String.format("TaskDeploymentDescriptor [job id: %s, job 
vertex id: %s, " +

http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/PartitionInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/PartitionInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/PartitionInfo.java
new file mode 100644
index 0000000..1a79a99
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/PartitionInfo.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Contains information where to find a partition. The partition is defined by 
the
+ * {@link IntermediateDataSetID} and the partition location is specified by
+ * {@link InputChannelDeploymentDescriptor}.
+ */
+public class PartitionInfo {
+
+       private final IntermediateDataSetID intermediateDataSetID;
+       private final InputChannelDeploymentDescriptor 
inputChannelDeploymentDescriptor;
+
+       public PartitionInfo(IntermediateDataSetID 
intermediateResultPartitionID, InputChannelDeploymentDescriptor 
inputChannelDeploymentDescriptor) {
+               this.intermediateDataSetID = 
Preconditions.checkNotNull(intermediateResultPartitionID);
+               this.inputChannelDeploymentDescriptor = 
Preconditions.checkNotNull(inputChannelDeploymentDescriptor);
+       }
+
+       public IntermediateDataSetID getIntermediateDataSetID() {
+               return intermediateDataSetID;
+       }
+
+       public InputChannelDeploymentDescriptor 
getInputChannelDeploymentDescriptor() {
+               return inputChannelDeploymentDescriptor;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
index b5bdcaf..a07f1a0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
@@ -33,8 +33,6 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.io.FileUtils;
 import 
org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
 import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileStatus;
@@ -44,6 +42,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.util.IOUtils;
 
+import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -71,17 +70,15 @@ public class FileCache {
 
        // 
------------------------------------------------------------------------
 
-       public FileCache(Configuration config) throws IOException {
-               
-               String tempDirs = 
config.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
-                               ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH);
+       public FileCache(String[] tempDirectories) throws IOException {
 
-               String[] directories = tempDirs.split(",|" + 
File.pathSeparator);
-               storageDirectories = new File[directories.length];
+               Preconditions.checkNotNull(tempDirectories);
 
-               for (int i = 0; i < directories.length; i++) {
+               storageDirectories = new File[tempDirectories.length];
+
+               for (int i = 0; i < tempDirectories.length; i++) {
                        String cacheDirName = "flink-dist-cache-" + 
UUID.randomUUID().toString();
-                       storageDirectories[i] = new File(directories[i], 
cacheDirName);
+                       storageDirectories[i] = new File(tempDirectories[i], 
cacheDirName);
                        String path = storageDirectories[i].getAbsolutePath();
 
                        if (storageDirectories[i].mkdirs()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitProvider.java
index e0cde17..464b13f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitProvider.java
@@ -34,6 +34,7 @@ public interface InputSplitProvider {
         * @param userCodeClassLoader used to deserialize input splits
         * @return the next input split to be consumed by the calling task or 
<code>null</code> if the
         *         task shall not consume any further input splits.
+        * @throws InputSplitProviderException if fetching the next input split 
fails
         */
-       InputSplit getNextInputSplit(ClassLoader userCodeClassLoader);
+       InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) throws 
InputSplitProviderException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitProviderException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitProviderException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitProviderException.java
new file mode 100644
index 0000000..ac73c6f
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitProviderException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph.tasks;
+
+public class InputSplitProviderException extends Exception {
+
+       private static final long serialVersionUID = -8043190713983651548L;
+
+       public InputSplitProviderException(String message) {
+               super(message);
+       }
+
+       public InputSplitProviderException(String message, Throwable cause) {
+               super(message, cause);
+       }
+
+       public InputSplitProviderException(Throwable cause) {
+               super(cause);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionGraphException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionGraphException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionGraphException.java
new file mode 100644
index 0000000..7c35f3d
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionGraphException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+
+/**
+ * Exceptions thrown by operations on the {@link ExecutionGraph} by the {@link 
JobMaster}.
+ */
+public class ExecutionGraphException extends JobManagerException {
+
+       private static final long serialVersionUID = -5439002256464886357L;
+
+       public ExecutionGraphException(String message) {
+               super(message);
+       }
+
+       public ExecutionGraphException(String message, Throwable cause) {
+               super(message, cause);
+       }
+
+       public ExecutionGraphException(Throwable cause) {
+               super(cause);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerException.java
new file mode 100644
index 0000000..bc2759d
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+/**
+ * Base exception thrown by the {@link JobMaster}.
+ */
+public class JobManagerException extends Exception {
+
+       private static final long serialVersionUID = -7290962952242188064L;
+
+       public JobManagerException(final String message) {
+               super(message);
+       }
+
+       public JobManagerException(final String message, Throwable cause) {
+               super(message, cause);
+       }
+
+       public JobManagerException(Throwable cause) {
+               super(cause);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index e67a167..8f3a342 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -38,6 +38,7 @@ import 
org.apache.flink.runtime.checkpoint.stats.SimpleCheckpointStatsTracker;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.client.SerializedJobExecutionResult;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
@@ -60,9 +61,9 @@ import 
org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.jobmaster.message.NextInputSplit;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.registration.RegisteredRpcConnection;
 import org.apache.flink.runtime.registration.RegistrationResponse;
@@ -71,6 +72,7 @@ import 
org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.state.CheckpointStateHandles;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.util.SerializedThrowable;
 import org.apache.flink.util.InstantiationUtil;
@@ -507,12 +509,18 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
         * @return Acknowledge the task execution state update
         */
        @RpcMethod
-       public boolean updateTaskExecutionState(final TaskExecutionState 
taskExecutionState) {
+       public Acknowledge updateTaskExecutionState(final TaskExecutionState 
taskExecutionState) throws ExecutionGraphException {
                if (taskExecutionState == null) {
-                       return false;
+                       throw new NullPointerException("TaskExecutionState must 
not be null.");
+               }
+
+               if (executionGraph.updateState(taskExecutionState)) {
+                       return Acknowledge.get();
                } else {
-                       return executionGraph.updateState(taskExecutionState);
+                       throw new ExecutionGraphException("The execution 
attempt " +
+                               taskExecutionState.getID() + " was not found.");
                }
+
        }
 
        
//----------------------------------------------------------------------------------------------

@@ -531,7 +539,7 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
        }
 
        @RpcMethod
-       public NextInputSplit requestNextInputSplit(
+       public SerializedInputSplit requestNextInputSplit(
                final JobVertexID vertexID,
                final ExecutionAttemptID executionAttempt) throws Exception
        {
@@ -569,7 +577,7 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
 
                try {
                        final byte[] serializedInputSplit = 
InstantiationUtil.serializeObject(nextInputSplit);
-                       return new NextInputSplit(serializedInputSplit);
+                       return new SerializedInputSplit(serializedInputSplit);
                } catch (Exception ex) {
                        log.error("Could not serialize the next input split of 
class {}.", nextInputSplit.getClass(), ex);
                        IOException reason = new IOException("Could not 
serialize the next input split of class " +
@@ -591,8 +599,36 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
        }
 
        @RpcMethod
-       public void scheduleOrUpdateConsumers(final ResultPartitionID 
partitionID) {
+       public Acknowledge scheduleOrUpdateConsumers(final ResultPartitionID 
partitionID) {
                executionGraph.scheduleOrUpdateConsumers(partitionID);
+               return Acknowledge.get();
+       }
+
+       @RpcMethod
+       public void disconnectTaskManager(final ResourceID resourceID) {
+               throw new UnsupportedOperationException();
+       }
+
+       @RpcMethod
+       public void acknowledgeCheckpoint(
+               JobID jobID,
+               ExecutionAttemptID executionAttemptID,
+               long checkpointID,
+               CheckpointStateHandles checkpointStateHandles,
+               long synchronousDurationMillis,
+               long asynchronousDurationMillis,
+               long bytesBufferedInAlignment,
+               long alignmentDurationNanos) {
+               throw new UnsupportedOperationException();
+       }
+
+       @RpcMethod
+       public void declineCheckpoint(
+               JobID jobID,
+               ExecutionAttemptID executionAttemptID,
+               long checkpointID,
+               long checkpointTimestamp) {
+               throw new UnsupportedOperationException();
        }
 
        
//----------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 686a3f3..e3e57d4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.runtime.jobmaster;
 
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
@@ -27,8 +30,8 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmaster.message.NextInputSplit;
-import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 
 import java.util.UUID;
@@ -36,7 +39,7 @@ import java.util.UUID;
 /**
  * {@link JobMaster} rpc gateway interface
  */
-public interface JobMasterGateway extends RpcGateway {
+public interface JobMasterGateway extends CheckpointCoordinatorGateway {
 
        /**
         * Starting the job under the given leader session ID.
@@ -57,20 +60,19 @@ public interface JobMasterGateway extends RpcGateway {
         * @param taskExecutionState New task execution state for a given task
         * @return Future flag of the task execution state update result
         */
-       Future<Boolean> updateTaskExecutionState(TaskExecutionState 
taskExecutionState);
+       Future<Acknowledge> updateTaskExecutionState(TaskExecutionState 
taskExecutionState);
 
        /**
         * Requesting next input split for the {@link ExecutionJobVertex}. The 
next input split is sent back to the sender
-        * as a {@link NextInputSplit} message.
+        * as a {@link SerializedInputSplit} message.
         *
         * @param vertexID         The job vertex id
         * @param executionAttempt The execution attempt id
         * @return The future of the input split. If there is no further input 
split, will return an empty object.
-        * @throws Exception if some error occurred or information mismatch.
         */
-       Future<NextInputSplit> requestNextInputSplit(
+       Future<SerializedInputSplit> requestNextInputSplit(
                final JobVertexID vertexID,
-               final ExecutionAttemptID executionAttempt) throws Exception;
+               final ExecutionAttemptID executionAttempt);
 
        /**
         * Requests the current state of the partition.
@@ -96,6 +98,16 @@ public interface JobMasterGateway extends RpcGateway {
         * The JobManager then can decide when to schedule the partition 
consumers of the given session.
         *
         * @param partitionID The partition which has already produced data
+        * @param timeout before the rpc call fails
+        * @return Future acknowledge of the schedule or update operation
         */
-       void scheduleOrUpdateConsumers(final ResultPartitionID partitionID);
+       Future<Acknowledge> scheduleOrUpdateConsumers(final ResultPartitionID 
partitionID, @RpcTimeout final Time timeout);
+
+       /**
+        * Disconnects the given {@link 
org.apache.flink.runtime.taskexecutor.TaskExecutor} from the
+        * {@link JobMaster}.
+        *
+        * @param resourceID identifying the TaskManager to disconnect
+        */
+       void disconnectTaskManager(ResourceID resourceID);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
index 792bfd5..e8fb5bb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
@@ -356,7 +356,7 @@ public class MiniClusterJobDispatcher {
                        final Throwable runnerException = this.runnerException;
                        final JobExecutionResult result = this.result;
 
-                       // (1) we check if teh job terminated with an exception
+                       // (1) we check if the job terminated with an exception
                        // (2) we check whether the job completed successfully
                        // (3) we check if we have exceptions from the 
JobManagers. the job may still have
                        //     completed successfully in that case, if multiple 
JobMasters were running

http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SerializedInputSplit.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SerializedInputSplit.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SerializedInputSplit.java
new file mode 100644
index 0000000..bfdc65a
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SerializedInputSplit.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import java.io.Serializable;
+
+public class SerializedInputSplit implements Serializable {
+       private static final long serialVersionUID = -2063021844254152064L;
+
+       private final byte[] inputSplitData;
+
+       public SerializedInputSplit(byte[] inputSplitData) {
+               this.inputSplitData = inputSplitData;
+       }
+
+       public byte[] getInputSplitData() {
+               return inputSplitData;
+       }
+
+       public boolean isEmpty() {
+               return inputSplitData == null || inputSplitData.length == 0;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/NextInputSplit.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/NextInputSplit.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/NextInputSplit.java
deleted file mode 100644
index fe511ed..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/NextInputSplit.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmaster.message;
-
-import java.io.Serializable;
-
-/**
- * Contains the next input split for a task.
- */
-public class NextInputSplit implements Serializable {
-
-       private static final long serialVersionUID = -1355784074565856240L;
-
-       private final byte[] splitData;
-
-       public NextInputSplit(final byte[] splitData) {
-               this.splitData = splitData;
-       }
-
-       public byte[] getSplitData() {
-               return splitData;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
index c062bf8..1c751fd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException;
 import org.apache.flink.runtime.operators.chaining.ChainedDriver;
 import 
org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException;
 import org.apache.flink.runtime.operators.util.DistributedRuntimeUDFContext;
@@ -332,9 +333,14 @@ public class DataSourceTask<OT> extends AbstractInvokable {
                                if (nextSplit != null) {
                                        return true;
                                }
-                               
-                               InputSplit split = 
provider.getNextInputSplit(getUserCodeClassLoader());
-                               
+
+                               final InputSplit split;
+                               try {
+                                       split = 
provider.getNextInputSplit(getUserCodeClassLoader());
+                               } catch (InputSplitProviderException e) {
+                                       throw new RuntimeException("Could not 
retrieve next input split.", e);
+                               }
+
                                if (split != null) {
                                        this.nextSplit = split;
                                        return true;

http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryGateway.java
new file mode 100644
index 0000000..d285074
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryGateway.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.state.KeyGroupRange;
+
+public interface KvStateRegistryGateway extends RpcGateway {
+       /**
+        * Notifies the listener about a registered KvState instance.
+        *
+        * @param jobId            Job ID the KvState instance belongs to
+        * @param jobVertexId      JobVertexID the KvState instance belongs to
+        * @param keyGroupRange    Key group range the KvState instance belongs 
to
+        * @param registrationName Name under which the KvState is registered
+        * @param kvStateId        ID of the KvState instance
+        */
+       void notifyKvStateRegistered(
+               JobID jobId,
+               JobVertexID jobVertexId,
+               KeyGroupRange keyGroupRange,
+               String registrationName,
+               KvStateID kvStateId,
+               KvStateServerAddress kvStateServerAddress);
+
+       /**
+        * Notifies the listener about an unregistered KvState instance.
+        *
+        * @param jobId            Job ID the KvState instance belongs to
+        * @param jobVertexId      JobVertexID the KvState instance belongs to
+        * @param keyGroupRange    Key group range the KvState instance belongs 
to
+        * @param registrationName Name under which the KvState is registered
+        */
+       void notifyKvStateUnregistered(
+               JobID jobId,
+               JobVertexID jobVertexId,
+               KeyGroupRange keyGroupRange,
+               String registrationName);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
new file mode 100644
index 0000000..ef62ef1
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.taskmanager.CheckpointResponder;
+import org.apache.flink.runtime.taskmanager.TaskManagerActions;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Container class for JobManager specific communication utils used by the 
{@link TaskExecutor}.
+ */
+public class JobManagerConnection {
+
+       // Gateway to the job master
+       private final JobMasterGateway jobMasterGateway;
+
+       // Task manager actions with respect to the connected job manager
+       private final TaskManagerActions taskManagerActions;
+
+       // Checkpoint responder for the specific job manager
+       private final CheckpointResponder checkpointResponder;
+
+       // Library cache manager connected to the specific job manager
+       private final LibraryCacheManager libraryCacheManager;
+
+       // Result partition consumable notifier for the specific job manager
+       private final ResultPartitionConsumableNotifier 
resultPartitionConsumableNotifier;
+
+       // Partition state checker for the specific job manager
+       private final PartitionStateChecker partitionStateChecker;
+
+       public JobManagerConnection(
+               JobMasterGateway jobMasterGateway,
+               TaskManagerActions taskManagerActions,
+               CheckpointResponder checkpointResponder,
+               LibraryCacheManager libraryCacheManager,
+               ResultPartitionConsumableNotifier 
resultPartitionConsumableNotifier,
+               PartitionStateChecker partitionStateChecker) {
+
+               this.jobMasterGateway = 
Preconditions.checkNotNull(jobMasterGateway);
+               this.taskManagerActions = 
Preconditions.checkNotNull(taskManagerActions);
+               this.checkpointResponder = 
Preconditions.checkNotNull(checkpointResponder);
+               this.libraryCacheManager = 
Preconditions.checkNotNull(libraryCacheManager);
+               this.resultPartitionConsumableNotifier = 
Preconditions.checkNotNull(resultPartitionConsumableNotifier);
+               this.partitionStateChecker = 
Preconditions.checkNotNull(partitionStateChecker);
+       }
+
+       public JobMasterGateway getJobManagerGateway() {
+               return jobMasterGateway;
+       }
+
+       public TaskManagerActions getTaskManagerActions() {
+               return taskManagerActions;
+       }
+
+       public CheckpointResponder getCheckpointResponder() {
+               return checkpointResponder;
+       }
+
+       public LibraryCacheManager getLibraryCacheManager() {
+               return libraryCacheManager;
+       }
+
+       public ResultPartitionConsumableNotifier 
getResultPartitionConsumableNotifier() {
+               return resultPartitionConsumableNotifier;
+       }
+
+       public PartitionStateChecker getPartitionStateChecker() {
+               return partitionStateChecker;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index c0041a3..35b639b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -18,15 +18,48 @@
 
 package org.apache.flink.runtime.taskexecutor;
 
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+import org.apache.flink.runtime.blob.BlobCache;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRegistered;
 import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.taskexecutor.exceptions.CheckpointException;
+import org.apache.flink.runtime.taskexecutor.exceptions.PartitionException;
+import org.apache.flink.runtime.taskexecutor.exceptions.TaskException;
+import 
org.apache.flink.runtime.taskexecutor.exceptions.TaskSubmissionException;
+import org.apache.flink.runtime.taskexecutor.rpc.RpcCheckpointResponder;
+import org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider;
+import org.apache.flink.runtime.taskexecutor.rpc.RpcPartitionStateChecker;
+import 
org.apache.flink.runtime.taskexecutor.rpc.RpcResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.taskmanager.CheckpointResponder;
+import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -38,11 +71,17 @@ import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.util.Preconditions;
 
 import java.util.HashSet;
 import java.util.Set;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
 import java.util.UUID;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -50,12 +89,10 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * TaskExecutor implementation. The task executor is responsible for the 
execution of multiple
- * {@link org.apache.flink.runtime.taskmanager.Task}.
+ * {@link Task}.
  */
 public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 
-       private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class);
-
        /** The connection information of this task manager */
        private final TaskManagerLocation taskManagerLocation;
 
@@ -77,19 +114,38 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
        /** The metric registry in the task manager */
        private final MetricRegistry metricRegistry;
 
-       /** The number of slots in the task manager, should be 1 for YARN */
-       private final int numberOfSlots;
-
        /** The fatal error handler to use in case of a fatal error */
        private final FatalErrorHandler fatalErrorHandler;
 
+       private final TaskManagerMetricGroup taskManagerMetricGroup;
+
+       private final BroadcastVariableManager broadcastVariableManager;
+       
        /** Slots which have become available but haven't been confirmed by the 
RM */
        private final Set<SlotID> unconfirmedFreeSlots;
 
+
+       private final FileCache fileCache;
+
+       // TODO: Try to get rid of it
+       private final TaskManagerRuntimeInfo taskManagerRuntimeInfo;
+
        // --------- resource manager --------
 
        private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
 
+       // --------- job manager connections -----------
+
+       private Map<ResourceID, JobManagerConnection> jobManagerConnections;
+
+       // --------- Slot allocation table --------
+
+       private Map<AllocationID, TaskSlot> taskSlots;
+
+       // --------- Slot allocation table --------
+
+       private Map<ExecutionAttemptID, TaskSlotMapping> taskSlotMappings;
+
        // 
------------------------------------------------------------------------
 
        public TaskExecutor(
@@ -101,6 +157,9 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                NetworkEnvironment networkEnvironment,
                HighAvailabilityServices haServices,
                MetricRegistry metricRegistry,
+               TaskManagerMetricGroup taskManagerMetricGroup,
+               BroadcastVariableManager broadcastVariableManager,
+               FileCache fileCache,
                FatalErrorHandler fatalErrorHandler) {
 
                super(rpcService);
@@ -115,10 +174,19 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                this.haServices = checkNotNull(haServices);
                this.metricRegistry = checkNotNull(metricRegistry);
                this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
+               this.taskManagerMetricGroup = 
checkNotNull(taskManagerMetricGroup);
+               this.broadcastVariableManager = 
checkNotNull(broadcastVariableManager);
+               this.fileCache = checkNotNull(fileCache);
+               this.taskManagerRuntimeInfo = new TaskManagerRuntimeInfo(
+                       taskManagerLocation.getHostname(),
+                       new 
UnmodifiableConfiguration(taskManagerConfiguration.getConfiguration()),
+                       taskManagerConfiguration.getTmpDirPaths());
 
-               this.numberOfSlots =  taskManagerConfiguration.getNumberSlots();
+               this.jobManagerConnections = new HashMap<>(4);
 
                this.unconfirmedFreeSlots = new HashSet<>();
+               this.taskSlots = new 
HashMap<>(taskManagerConfiguration.getNumberSlots());
+               this.taskSlotMappings = new 
HashMap<>(taskManagerConfiguration.getNumberSlots() * 2);
        }
 
        // 
------------------------------------------------------------------------
@@ -137,12 +205,436 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                }
        }
 
+       /**
+        * Called to shut down the TaskManager. The method closes all 
TaskManager services.
+        */
+       @Override
+       public void shutDown() {
+               log.info("Stopping TaskManager {}.", getAddress());
+
+               if (resourceManagerConnection.isConnected()) {
+                       try {
+                               resourceManagerConnection.close();
+                       } catch (Exception e) {
+                               log.error("Could not cleanly close the 
ResourceManager connection.", e);
+                       }
+               }
+
+               try {
+                       ioManager.shutdown();
+               } catch (Exception e) {
+                       log.error("IOManager did not shut down properly.", e);
+               }
+
+               try {
+                       memoryManager.shutdown();
+               } catch (Exception e) {
+                       log.error("MemoryManager did not shut down properly.", 
e);
+               }
+
+               try {
+                       networkEnvironment.shutdown();
+               } catch (Exception e) {
+                       log.error("Network environment did not shut down 
properly.", e);
+               }
+
+               try {
+                       fileCache.shutdown();
+               } catch (Exception e) {
+                       log.error("File cache did not shut down properly.", e);
+               }
+
+               try {
+                       metricRegistry.shutdown();
+               } catch (Exception e) {
+                       log.error("MetricRegistry did not shut down properly.", 
e);
+               }
+
+               log.info("Stopped TaskManager {}.", getAddress());
+       }
+
+       // 
========================================================================
+       //  RPC methods
+       // 
========================================================================
+
+       // 
----------------------------------------------------------------------
+       // Task lifecycle RPCs
+       // 
----------------------------------------------------------------------
+
+       @RpcMethod
+       public Acknowledge submitTask(TaskDeploymentDescriptor tdd, ResourceID 
jobManagerID) throws TaskSubmissionException {
+
+               JobManagerConnection jobManagerConnection = 
getJobManagerConnection(jobManagerID);
+
+               if (jobManagerConnection == null) {
+                       final String message = "Could not submit task because 
JobManager " + jobManagerID +
+                               " was not associated.";
+
+                       log.debug(message);
+                       throw new TaskSubmissionException(message);
+               }
+
+               TaskSlot taskSlot = taskSlots.get(tdd.getAllocationID());
+
+               if (taskSlot == null) {
+                       final String message = "No task slot allocated for 
allocation ID " + tdd.getAllocationID() + '.';
+                       log.debug(message);
+                       throw new TaskSubmissionException(message);
+               }
+
+               TaskMetricGroup taskMetricGroup = 
taskManagerMetricGroup.addTaskForJob(tdd);
+
+               InputSplitProvider inputSplitProvider = new 
RpcInputSplitProvider(
+                       jobManagerConnection.getJobManagerGateway(),
+                       tdd.getJobID(),
+                       tdd.getVertexID(),
+                       tdd.getExecutionId(),
+                       taskManagerConfiguration.getTimeout());
+
+               TaskManagerActions taskManagerActions = 
jobManagerConnection.getTaskManagerActions();
+               CheckpointResponder checkpointResponder = 
jobManagerConnection.getCheckpointResponder();
+               LibraryCacheManager libraryCache = 
jobManagerConnection.getLibraryCacheManager();
+               ResultPartitionConsumableNotifier 
resultPartitionConsumableNotifier = 
jobManagerConnection.getResultPartitionConsumableNotifier();
+               PartitionStateChecker partitionStateChecker = 
jobManagerConnection.getPartitionStateChecker();
+
+               Task task = new Task(
+                       tdd,
+                       memoryManager,
+                       ioManager,
+                       networkEnvironment,
+                       broadcastVariableManager,
+                       taskManagerActions,
+                       inputSplitProvider,
+                       checkpointResponder,
+                       libraryCache,
+                       fileCache,
+                       taskManagerRuntimeInfo,
+                       taskMetricGroup,
+                       resultPartitionConsumableNotifier,
+                       partitionStateChecker,
+                       getRpcService().getExecutor());
+
+               log.info("Received task {}.", 
task.getTaskInfo().getTaskNameWithSubtasks());
+
+               if(taskSlot.add(task)) {
+                       TaskSlotMapping taskSlotMapping = new 
TaskSlotMapping(task, taskSlot);
+
+                       taskSlotMappings.put(task.getExecutionId(), 
taskSlotMapping);
+                       task.startTaskThread();
+
+                       return Acknowledge.get();
+               } else {
+                       final String message = "TaskManager already contains a 
task for id " +
+                               task.getExecutionId() + '.';
+
+                       log.debug(message);
+                       throw new TaskSubmissionException(message);
+               }
+       }
+
+       @RpcMethod
+       public Acknowledge cancelTask(ExecutionAttemptID executionAttemptID) 
throws TaskException {
+               final Task task = getTask(executionAttemptID);
+
+               if (task != null) {
+                       try {
+                               task.cancelExecution();
+                               return Acknowledge.get();
+                       } catch (Throwable t) {
+                               throw new TaskException("Cannot cancel task for 
execution " + executionAttemptID + '.', t);
+                       }
+               } else {
+                       final String message = "Cannot find task to stop for 
execution " + executionAttemptID + '.';
+
+                       log.debug(message);
+                       throw new TaskException(message);
+               }
+       }
+
+       @RpcMethod
+       public Acknowledge stopTask(ExecutionAttemptID executionAttemptID) 
throws TaskException {
+               final Task task = getTask(executionAttemptID);
+
+               if (task != null) {
+                       try {
+                               task.stopExecution();
+                               return Acknowledge.get();
+                       } catch (Throwable t) {
+                               throw new TaskException("Cannot stop task for 
execution " + executionAttemptID + '.', t);
+                       }
+               } else {
+                       final String message = "Cannot find task to stop for 
execution " + executionAttemptID + '.';
+
+                       log.debug(message);
+                       throw new TaskException(message);
+               }
+       }
+
+       // 
----------------------------------------------------------------------
+       // Partition lifecycle RPCs
+       // 
----------------------------------------------------------------------
+
+       @RpcMethod
+       public Acknowledge updatePartitions(final ExecutionAttemptID 
executionAttemptID, Collection<PartitionInfo> partitionInfos) throws 
PartitionException {
+               final Task task = getTask(executionAttemptID);
+
+               if (task != null) {
+                       for (final PartitionInfo partitionInfo: partitionInfos) 
{
+                               IntermediateDataSetID 
intermediateResultPartitionID = partitionInfo.getIntermediateDataSetID();
+
+                               final SingleInputGate singleInputGate = 
task.getInputGateById(intermediateResultPartitionID);
+
+                               if (singleInputGate != null) {
+                                       // Run asynchronously because it might 
be blocking
+                                       getRpcService().execute(new Runnable() {
+                                               @Override
+                                               public void run() {
+                                                       try {
+                                                               
singleInputGate.updateInputChannel(partitionInfo.getInputChannelDeploymentDescriptor());
+                                                       } catch (IOException | 
InterruptedException e) {
+                                                               
log.error("Could not update input data location for task {}. Trying to fail 
task.", task.getTaskInfo().getTaskName(), e);
+
+                                                               try {
+                                                                       
task.failExternally(e);
+                                                               } catch 
(RuntimeException re) {
+                                                                       // 
TODO: Check whether we need this or make exception in failExtenally checked
+                                                                       
log.error("Failed canceling task with execution ID {} after task update 
failure.", executionAttemptID, re);
+                                                               }
+                                                       }
+                                               }
+                                       });
+                               } else {
+                                       throw new PartitionException("No reader 
with ID " +
+                                               intermediateResultPartitionID + 
" for task " + executionAttemptID +
+                                               " was found.");
+                               }
+                       }
+
+                       return Acknowledge.get();
+               } else {
+                       log.debug("Discard update for input partitions of task 
{}. Task is no longer running.", executionAttemptID);
+                       return Acknowledge.get();
+               }
+       }
+
+       @RpcMethod
+       public void failPartition(ExecutionAttemptID executionAttemptID) {
+               log.info("Discarding the results produced by task execution 
{}.", executionAttemptID);
+
+               try {
+                       
networkEnvironment.getResultPartitionManager().releasePartitionsProducedBy(executionAttemptID);
+               } catch (Throwable t) {
+                       // TODO: Do we still need this catch branch?
+                       onFatalError(t);
+               }
+
+               // TODO: Maybe it's better to return an Acknowledge here to 
notify the JM about the success/failure with an Exception
+       }
+
+       // 
----------------------------------------------------------------------
+       // Checkpointing RPCs
+       // 
----------------------------------------------------------------------
+
+       @RpcMethod
+       public Acknowledge triggerCheckpoint(ExecutionAttemptID 
executionAttemptID, long checkpointId, long checkpointTimestamp) throws 
CheckpointException {
+               log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, 
checkpointTimestamp, executionAttemptID);
+
+               final Task task = getTask(executionAttemptID);
+
+               if (task != null) {
+                       task.triggerCheckpointBarrier(checkpointId, 
checkpointTimestamp);
+
+                       return Acknowledge.get();
+               } else {
+                       final String message = "TaskManager received a 
checkpoint request for unknown task " + executionAttemptID + '.';
+
+                       log.debug(message);
+                       throw new CheckpointException(message);
+               }
+       }
+
+       @RpcMethod
+       public Acknowledge confirmCheckpoint(ExecutionAttemptID 
executionAttemptID, long checkpointId, long checkpointTimestamp) throws 
CheckpointException {
+               log.debug("Confirm checkpoint {}@{} for {}.", checkpointId, 
checkpointTimestamp, executionAttemptID);
+
+               final Task task = getTask(executionAttemptID);
+
+               if (task != null) {
+                       task.notifyCheckpointComplete(checkpointId);
+
+                       return Acknowledge.get();
+               } else {
+                       final String message = "TaskManager received a 
checkpoint confirmation for unknown task " + executionAttemptID + '.';
+
+                       log.debug(message);
+                       throw new CheckpointException(message);
+               }
+       }
+
+       /**
+        * Requests a slot from the TaskManager
+        *
+        * @param slotID Slot id for the request
+        * @param allocationID id for the request
+        * @param resourceManagerLeaderID current leader id of the 
ResourceManager
+        * @return answer to the slot request
+        */
+       @RpcMethod
+       public TMSlotRequestReply requestSlot(SlotID slotID, AllocationID 
allocationID, UUID resourceManagerLeaderID) {
+               if 
(!resourceManagerConnection.getTargetLeaderId().equals(resourceManagerLeaderID))
 {
+                       return new TMSlotRequestRejected(
+                               resourceManagerConnection.getRegistrationId(), 
getResourceID(), allocationID);
+               }
+               if (unconfirmedFreeSlots.contains(slotID)) {
+                       // check if request has not been blacklisted because 
the notification of a free slot
+                       // has not been confirmed by the ResourceManager
+                       return new TMSlotRequestRejected(
+                               resourceManagerConnection.getRegistrationId(), 
getResourceID(), allocationID);
+               }
+               return new TMSlotRequestRegistered(new InstanceID(), 
ResourceID.generate(), allocationID);
+
+       }
+
        // 
------------------------------------------------------------------------
-       //  RPC methods - ResourceManager related
+       //  Internal methods
        // 
------------------------------------------------------------------------
 
-       @RpcMethod
-       public void notifyOfNewResourceManagerLeader(String newLeaderAddress, 
UUID newLeaderId) {
+       private JobManagerConnection getJobManagerConnection(ResourceID 
jobManagerID) {
+               return jobManagerConnections.get(jobManagerID);
+       }
+
+       private Task getTask(ExecutionAttemptID executionAttemptID) {
+               TaskSlotMapping taskSlotMapping = 
taskSlotMappings.get(executionAttemptID);
+
+               if (taskSlotMapping != null) {
+                       return taskSlotMapping.getTask();
+               } else {
+                       return null;
+               }
+       }
+
+       private Task removeTask(ExecutionAttemptID executionAttemptID) {
+               TaskSlotMapping taskSlotMapping = 
taskSlotMappings.remove(executionAttemptID);
+
+               if (taskSlotMapping != null) {
+                       final Task task = taskSlotMapping.getTask();
+                       final TaskSlot taskSlot = taskSlotMapping.getTaskSlot();
+
+                       taskSlot.remove(task);
+
+                       return task;
+               } else {
+                       return null;
+               }
+       }
+
+       private Iterable<Task> getAllTasks() {
+               final Iterator<TaskSlotMapping> taskEntryIterator = 
taskSlotMappings.values().iterator();
+               final Iterator<Task> iterator = new Iterator<Task>() {
+                       @Override
+                       public boolean hasNext() {
+                               return taskEntryIterator.hasNext();
+                       }
+
+                       @Override
+                       public Task next() {
+                               return taskEntryIterator.next().getTask();
+                       }
+
+                       @Override
+                       public void remove() {
+                               taskEntryIterator.remove();
+                       }
+               };
+
+               return new Iterable<Task>() {
+                       @Override
+                       public Iterator<Task> iterator() {
+                               return iterator;
+                       }
+               };
+       }
+
+       private void clearTasks() {
+               taskSlotMappings.clear();
+
+               for (TaskSlot taskSlot: taskSlots.values()) {
+                       taskSlot.clear();
+               }
+       }
+
+       private void failTask(final ExecutionAttemptID executionAttemptID, 
final Throwable cause) {
+               final Task task = getTask(executionAttemptID);
+
+               if (task != null) {
+                       try {
+                               task.failExternally(cause);
+                       } catch (Throwable t) {
+                               log.error("Could not fail task {}.", 
executionAttemptID, t);
+                       }
+               } else {
+                       log.debug("Cannot find task to fail for execution {}.", 
executionAttemptID);
+               }
+       }
+
+       private void cancelAndClearAllTasks(Throwable cause) {
+               log.info("Cancellaing all computations and discarding all 
cached data.");
+
+               Iterable<Task> tasks = getAllTasks();
+
+               for (Task task: tasks) {
+                       task.failExternally(cause);
+               }
+
+               clearTasks();
+       }
+
+       private void updateTaskExecutionState(final JobMasterGateway 
jobMasterGateway, final TaskExecutionState taskExecutionState) {
+               final ExecutionAttemptID executionAttemptID = 
taskExecutionState.getID();
+
+               Future<Acknowledge> futureAcknowledge = 
jobMasterGateway.updateTaskExecutionState(taskExecutionState);
+
+               futureAcknowledge.exceptionallyAsync(new 
ApplyFunction<Throwable, Void>() {
+                       @Override
+                       public Void apply(Throwable value) {
+                               failTask(executionAttemptID, value);
+
+                               return null;
+                       }
+               }, getMainThreadExecutor());
+       }
+
+       private void unregisterTaskAndNotifyFinalState(final JobMasterGateway 
jobMasterGateway, ExecutionAttemptID executionAttemptID) {
+               Task task = removeTask(executionAttemptID);
+
+               if (task != null) {
+                       if (!task.getExecutionState().isTerminal()) {
+                               try {
+                                       task.failExternally(new 
IllegalStateException("Task is being remove from TaskManager."));
+                               } catch (Exception e) {
+                                       log.error("Could not properly fail 
task.", e);
+                               }
+                       }
+
+                       log.info("Un-registering task and sending final 
execution state {} to JobManager for task {} {}.",
+                               task.getExecutionState(), 
task.getTaskInfo().getTaskName(), task.getExecutionId());
+
+                       AccumulatorSnapshot accumulatorSnapshot = 
task.getAccumulatorRegistry().getSnapshot();
+
+                       updateTaskExecutionState(
+                               jobMasterGateway,
+                               new TaskExecutionState(
+                                       task.getJobID(),
+                                       task.getExecutionId(),
+                                       task.getExecutionState(),
+                                       task.getFailureCause(),
+                                       accumulatorSnapshot));
+               } else {
+                       log.error("Cannot find task with ID {} to unregister.", 
executionAttemptID);
+               }
+       }
+
+       private void notifyOfNewResourceManagerLeader(String newLeaderAddress, 
UUID newLeaderId) {
                if (resourceManagerConnection != null) {
                        if (newLeaderAddress != null) {
                                // the resource manager switched to a new leader
@@ -178,28 +670,46 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                }
        }
 
-       /**
-        * Requests a slot from the TaskManager
-        *
-        * @param slotID Slot id for the request
-        * @param allocationID id for the request
-        * @param resourceManagerLeaderID current leader id of the 
ResourceManager
-        * @return answer to the slot request
-        */
-       @RpcMethod
-       public TMSlotRequestReply requestSlot(SlotID slotID, AllocationID 
allocationID, UUID resourceManagerLeaderID) {
-               if 
(!resourceManagerConnection.getTargetLeaderId().equals(resourceManagerLeaderID))
 {
-                       return new TMSlotRequestRejected(
-                               resourceManagerConnection.getRegistrationId(), 
getResourceID(), allocationID);
-               }
-               if (unconfirmedFreeSlots.contains(slotID)) {
-                       // check if request has not been blacklisted because 
the notification of a free slot
-                       // has not been confirmed by the ResourceManager
-                       return new TMSlotRequestRejected(
-                               resourceManagerConnection.getRegistrationId(), 
getResourceID(), allocationID);
-               }
-               return new TMSlotRequestRegistered(new InstanceID(), 
ResourceID.generate(), allocationID);
+       private JobManagerConnection associateWithJobManager(JobMasterGateway 
jobMasterGateway, int blobPort) {
+               Preconditions.checkNotNull(jobMasterGateway);
+               Preconditions.checkArgument(blobPort > 0 || blobPort <= 65535, 
"Blob port is out of range.");
 
+               TaskManagerActions taskManagerActions = new 
TaskManagerActionsImpl(jobMasterGateway);
+
+               CheckpointResponder checkpointResponder = new 
RpcCheckpointResponder(jobMasterGateway);
+
+               InetSocketAddress address = new 
InetSocketAddress(jobMasterGateway.getAddress(), blobPort);
+
+               BlobCache blobCache = new BlobCache(address, 
taskManagerConfiguration.getConfiguration());
+
+               LibraryCacheManager libraryCacheManager = new 
BlobLibraryCacheManager(
+                       blobCache,
+                       taskManagerConfiguration.getCleanupInterval());
+
+               ResultPartitionConsumableNotifier 
resultPartitionConsumableNotifier = new RpcResultPartitionConsumableNotifier(
+                       jobMasterGateway,
+                       getRpcService().getExecutor(),
+                       taskManagerConfiguration.getTimeout());
+
+               PartitionStateChecker partitionStateChecker = new 
RpcPartitionStateChecker(jobMasterGateway);
+
+               return new JobManagerConnection(
+                       jobMasterGateway,
+                       taskManagerActions,
+                       checkpointResponder,
+                       libraryCacheManager,
+                       resultPartitionConsumableNotifier,
+                       partitionStateChecker);
+       }
+
+       private void disassociateFromJobManager(JobManagerConnection 
jobManagerConnection) throws IOException {
+               if (jobManagerConnection != null) {
+                       JobMasterGateway jobManagerGateway = 
jobManagerConnection.getJobManagerGateway();
+
+                       
jobManagerGateway.disconnectTaskManager(getResourceID());
+
+                       
jobManagerConnection.getLibraryCacheManager().shutdown();
+               }
        }
 
        // 
------------------------------------------------------------------------
@@ -237,8 +747,7 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
         * @param t The exception describing the fatal error
         */
        void onFatalError(Throwable t) {
-               // to be determined, probably delegate to a fatal error handler 
that 
-               // would either log (mini cluster) ot kill the process (yarn, 
mesos, ...)
+               log.error("Fatal error occurred.", t);
                fatalErrorHandler.onFatalError(t);
        }
 
@@ -266,8 +775,13 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
        private class ResourceManagerLeaderListener implements 
LeaderRetrievalListener {
 
                @Override
-               public void notifyLeaderAddress(String leaderAddress, UUID 
leaderSessionID) {
-                       
getSelf().notifyOfNewResourceManagerLeader(leaderAddress, leaderSessionID);
+               public void notifyLeaderAddress(final String leaderAddress, 
final UUID leaderSessionID) {
+                       runAsync(new Runnable() {
+                               @Override
+                               public void run() {
+                                       
notifyOfNewResourceManagerLeader(leaderAddress, leaderSessionID);
+                               }
+                       });
                }
 
                @Override
@@ -276,4 +790,43 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                }
        }
 
+       private class TaskManagerActionsImpl implements TaskManagerActions {
+               private final JobMasterGateway jobMasterGateway;
+
+               private TaskManagerActionsImpl(JobMasterGateway 
jobMasterGateway) {
+                       this.jobMasterGateway = 
Preconditions.checkNotNull(jobMasterGateway);
+               }
+
+               @Override
+               public void notifyFinalState(final ExecutionAttemptID 
executionAttemptID) {
+                       runAsync(new Runnable() {
+                               @Override
+                               public void run() {
+                                       
unregisterTaskAndNotifyFinalState(jobMasterGateway, executionAttemptID);
+                               }
+                       });
+               }
+
+               @Override
+               public void notifyFatalError(String message, Throwable cause) {
+                       log.error(message, cause);
+                       fatalErrorHandler.onFatalError(cause);
+               }
+
+               @Override
+               public void failTask(final ExecutionAttemptID 
executionAttemptID, final Throwable cause) {
+                       runAsync(new Runnable() {
+                               @Override
+                               public void run() {
+                                       
TaskExecutor.this.failTask(executionAttemptID, cause);
+                               }
+                       });
+               }
+
+               @Override
+               public void updateTaskExecutionState(final TaskExecutionState 
taskExecutionState) {
+                       
TaskExecutor.this.updateTaskExecutionState(jobMasterGateway, 
taskExecutionState);
+               }
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index 2360b53..f062b96 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -21,11 +21,18 @@ package org.apache.flink.runtime.taskexecutor;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Future;
 import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
+import org.apache.flink.runtime.taskmanager.Task;
 
+import java.util.Collection;
 import java.util.UUID;
 
 /**
@@ -33,12 +40,6 @@ import java.util.UUID;
  */
 public interface TaskExecutorGateway extends RpcGateway {
 
-       // 
------------------------------------------------------------------------
-       //  ResourceManager handlers
-       // 
------------------------------------------------------------------------
-
-       void notifyOfNewResourceManagerLeader(String address, UUID 
resourceManagerLeaderId);
-
        /**
         * Requests a slot from the TaskManager
         *
@@ -52,4 +53,71 @@ public interface TaskExecutorGateway extends RpcGateway {
                AllocationID allocationID,
                UUID resourceManagerLeaderID,
                @RpcTimeout Time timeout);
+
+       /**
+        * Submit a {@link Task} to the {@link TaskExecutor}.
+        *
+        * @param tdd describing the task to submit
+        * @param jobManagerID identifying the submitting JobManager
+        * @param timeout of the submit operation
+        * @return Future acknowledge of the successful operation
+        */
+       Future<Acknowledge> submitTask(
+               TaskDeploymentDescriptor tdd,
+               ResourceID jobManagerID,
+               @RpcTimeout Time timeout);
+
+       /**
+        * Update the task where the given partitions can be found.
+        *
+        * @param executionAttemptID identifying the task
+        * @param partitionInfos telling where the partition can be retrieved 
from
+        * @return Future acknowledge if the partitions have been successfully 
updated
+        */
+       Future<Acknowledge> updatePartitions(ExecutionAttemptID 
executionAttemptID, Collection<PartitionInfo> partitionInfos);
+
+       /**
+        * Fail all intermediate result partitions of the given task.
+        *
+        * @param executionAttemptID identifying the task
+        */
+       void failPartition(ExecutionAttemptID executionAttemptID);
+
+       /**
+        * Trigger the checkpoint for the given task. The checkpoint is 
identified by the checkpoint ID
+        * and the checkpoint timestamp.
+        *
+        * @param executionAttemptID identifying the task
+        * @param checkpointID unique id for the checkpoint
+        * @param checkpointTimestamp is the timestamp when the checkpoint has 
been initiated
+        * @return Future acknowledge if the checkpoint has been successfully 
triggered
+        */
+       Future<Acknowledge> triggerCheckpoint(ExecutionAttemptID 
executionAttemptID, long checkpointID, long checkpointTimestamp);
+
+       /**
+        * Confirm a checkpoint for the given task. The checkpoint is 
identified by the checkpoint ID
+        * and the checkpoint timestamp.
+        *
+        * @param executionAttemptID identifying the task
+        * @param checkpointId unique id for the checkpoint
+        * @param checkpointTimestamp is the timestamp when the checkpoint has 
been initiated
+        * @return Future acknowledge if the checkpoint has been successfully 
confirmed
+        */
+       Future<Acknowledge> confirmCheckpoint(ExecutionAttemptID 
executionAttemptID, long checkpointId, long checkpointTimestamp);
+
+       /**
+        * Stop the given task.
+        *
+        * @param executionAttemptID identifying the task
+        * @return Future acknowledge if the task is successfully stopped
+        */
+       Future<Acknowledge> stopTask(ExecutionAttemptID executionAttemptID);
+
+       /**
+        * Cancel the given task.
+        *
+        * @param executionAttemptID identifying the task
+        * @return Future acknowledge if the task is successfully canceled
+        */
+       Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
index f58af77..bce3dc3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.taskexecutor;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
@@ -71,7 +72,7 @@ public class TaskManagerConfiguration {
                this.maxRegistrationPause = 
Preconditions.checkNotNull(maxRegistrationPause);
                this.refusedRegistrationPause = 
Preconditions.checkNotNull(refusedRegistrationPause);
                this.cleanupInterval = 
Preconditions.checkNotNull(cleanupInterval);
-               this.configuration = Preconditions.checkNotNull(configuration);
+               this.configuration = new 
UnmodifiableConfiguration(Preconditions.checkNotNull(configuration));
        }
 
        public int getNumberSlots() {

http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 8ac0ddd..bb66655 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -95,6 +95,9 @@ public class TaskManagerRunner implements FatalErrorHandler {
                        taskManagerServices.getNetworkEnvironment(),
                        highAvailabilityServices,
                        taskManagerServices.getMetricRegistry(),
+                       taskManagerServices.getTaskManagerMetricGroup(),
+                       taskManagerServices.getBroadcastVariableManager(),
+                       taskManagerServices.getFileCache(),
                        this);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index ff7f7d5..e264a1c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -19,7 +19,9 @@
 package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.ConnectionManager;
@@ -32,9 +34,11 @@ import 
org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
 import org.apache.flink.runtime.query.netty.KvStateServer;
+import 
org.apache.flink.runtime.taskexecutor.utils.TaskExecutorMetricsInitializer;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.EnvironmentInformation;
@@ -58,19 +62,28 @@ public class TaskManagerServices {
        private final IOManager ioManager;
        private final NetworkEnvironment networkEnvironment;
        private final MetricRegistry metricRegistry;
+       private final TaskManagerMetricGroup taskManagerMetricGroup;
+       private final BroadcastVariableManager broadcastVariableManager;
+       private final FileCache fileCache;
 
        private TaskManagerServices(
                TaskManagerLocation taskManagerLocation,
                MemoryManager memoryManager,
                IOManager ioManager,
                NetworkEnvironment networkEnvironment,
-               MetricRegistry metricRegistry) {
+               MetricRegistry metricRegistry,
+               TaskManagerMetricGroup taskManagerMetricGroup,
+               BroadcastVariableManager broadcastVariableManager,
+               FileCache fileCache) {
 
                this.taskManagerLocation = 
Preconditions.checkNotNull(taskManagerLocation);
                this.memoryManager = Preconditions.checkNotNull(memoryManager);
                this.ioManager = Preconditions.checkNotNull(ioManager);
                this.networkEnvironment = 
Preconditions.checkNotNull(networkEnvironment);
                this.metricRegistry = 
Preconditions.checkNotNull(metricRegistry);
+               this.taskManagerMetricGroup = 
Preconditions.checkNotNull(taskManagerMetricGroup);
+               this.broadcastVariableManager = 
Preconditions.checkNotNull(broadcastVariableManager);
+               this.fileCache = Preconditions.checkNotNull(fileCache);
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -97,6 +110,18 @@ public class TaskManagerServices {
                return metricRegistry;
        }
 
+       public TaskManagerMetricGroup getTaskManagerMetricGroup() {
+               return taskManagerMetricGroup;
+       }
+
+       public BroadcastVariableManager getBroadcastVariableManager() {
+               return broadcastVariableManager;
+       }
+
+       public FileCache getFileCache() {
+               return fileCache;
+       }
+
        // 
--------------------------------------------------------------------------------------------
        //  Static factory methods for task manager services
        // 
--------------------------------------------------------------------------------------------
@@ -128,9 +153,29 @@ public class TaskManagerServices {
                // start the I/O manager, it will create some temp directories.
                final IOManager ioManager = new 
IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());
 
-               MetricRegistry metricsRegistry = new 
MetricRegistry(taskManagerServicesConfiguration.getMetricRegistryConfiguration());
+               final MetricRegistry metricRegistry = new 
MetricRegistry(taskManagerServicesConfiguration.getMetricRegistryConfiguration());
+
+               final TaskManagerMetricGroup taskManagerMetricGroup = new 
TaskManagerMetricGroup(
+                       metricRegistry,
+                       taskManagerLocation.getHostname(),
+                       taskManagerLocation.getResourceID().toString());
+
+               // Initialize the TM metrics
+               
TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, 
network);
+
+               final BroadcastVariableManager broadcastVariableManager = new 
BroadcastVariableManager();
+
+               final FileCache fileCache = new 
FileCache(taskManagerServicesConfiguration.getTmpDirPaths());
 
-               return new TaskManagerServices(taskManagerLocation, 
memoryManager, ioManager, network, metricsRegistry);
+               return new TaskManagerServices(
+                       taskManagerLocation,
+                       memoryManager,
+                       ioManager,
+                       network,
+                       metricRegistry,
+                       taskManagerMetricGroup,
+                       broadcastVariableManager,
+                       fileCache);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskSlot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskSlot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskSlot.java
new file mode 100644
index 0000000..4fc1d66
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskSlot.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Container for multiple {@link Task} belonging to the same slot.
+ */
+public class TaskSlot {
+       private final AllocationID allocationID;
+       private final ResourceID resourceID;
+       private final Map<ExecutionAttemptID, Task> tasks;
+
+       public TaskSlot(AllocationID allocationID, ResourceID resourceID) {
+               this.allocationID = Preconditions.checkNotNull(allocationID);
+               this.resourceID = Preconditions.checkNotNull(resourceID);
+               tasks = new HashMap<>(4);
+       }
+
+       public AllocationID getAllocationID() {
+               return allocationID;
+       }
+
+       public ResourceID getResourceID() {
+               return resourceID;
+       }
+
+       public boolean add(Task task) {
+               // sanity check
+               
Preconditions.checkArgument(allocationID.equals(task.getAllocationID()));
+
+               Task oldTask = tasks.put(task.getExecutionId(), task);
+
+               if (oldTask != null) {
+                       tasks.put(task.getExecutionId(), oldTask);
+                       return false;
+               } else {
+                       return true;
+               }
+       }
+
+       public Task remove(Task task) {
+               return tasks.remove(task.getExecutionId());
+       }
+
+       public void clear() {
+               tasks.clear();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskSlotMapping.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskSlotMapping.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskSlotMapping.java
new file mode 100644
index 0000000..e67fd52
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskSlotMapping.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Mapping between a {@link Task} and its {@link TaskSlot}.
+ */
+public class TaskSlotMapping {
+
+       private final Task task;
+       private final TaskSlot taskSlot;
+
+       public TaskSlotMapping(Task task, TaskSlot taskSlot) {
+               this.task = Preconditions.checkNotNull(task);
+               this.taskSlot = Preconditions.checkNotNull(taskSlot);
+       }
+
+       public Task getTask() {
+               return task;
+       }
+
+       public TaskSlot getTaskSlot() {
+               return taskSlot;
+       }
+}

Reply via email to