http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/CheckpointException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/CheckpointException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/CheckpointException.java
new file mode 100644
index 0000000..80f2aa0
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/CheckpointException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.exceptions;
+
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+
+/**
+ * Exception indicating a problem with checkpointing on the {@link 
TaskExecutor} side.
+ */
+public class CheckpointException extends TaskManagerException {
+
+       private static final long serialVersionUID = 3366394086880327955L;
+
+       public CheckpointException(String message) {
+               super(message);
+       }
+
+       public CheckpointException(String message, Throwable cause) {
+               super(message, cause);
+       }
+
+       public CheckpointException(Throwable cause) {
+               super(cause);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/PartitionException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/PartitionException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/PartitionException.java
new file mode 100644
index 0000000..eecd0ae
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/PartitionException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.exceptions;
+
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+
+/**
+ * Exception indicating a problem with the result partitions on the {@link 
TaskExecutor} side.
+ */
+public class PartitionException extends TaskManagerException {
+
+       private static final long serialVersionUID = 6248696963418276618L;
+
+       public PartitionException(String message) {
+               super(message);
+       }
+
+       public PartitionException(String message, Throwable cause) {
+               super(message, cause);
+       }
+
+       public PartitionException(Throwable cause) {
+               super(cause);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/TaskException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/TaskException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/TaskException.java
new file mode 100644
index 0000000..a4a89c2
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/TaskException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.exceptions;
+
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+
+/**
+ * Exception indicating a task related problem on the {@link TaskExecutor}.
+ */
+public class TaskException extends TaskManagerException {
+
+       private static final long serialVersionUID = 968001398103156856L;
+
+       public TaskException(String message) {
+               super(message);
+       }
+
+       public TaskException(String message, Throwable cause) {
+               super(message, cause);
+       }
+
+       public TaskException(Throwable cause) {
+               super(cause);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/TaskManagerException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/TaskManagerException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/TaskManagerException.java
new file mode 100644
index 0000000..62d186e
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/TaskManagerException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.exceptions;
+
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+
+/**
+ * Base exception thrown by the {@link TaskExecutor}.
+ */
+public class TaskManagerException extends Exception {
+
+       private static final long serialVersionUID = -2997745772227694731L;
+
+       public TaskManagerException(String message) {
+               super(message);
+       }
+
+       public TaskManagerException(String message, Throwable cause) {
+               super(message, cause);
+       }
+
+       public TaskManagerException(Throwable cause) {
+               super(cause);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/TaskSubmissionException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/TaskSubmissionException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/TaskSubmissionException.java
new file mode 100644
index 0000000..23f7812
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/TaskSubmissionException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.exceptions;
+
+import org.apache.flink.runtime.taskmanager.Task;
+
+/**
+ * Exception indicating a problem with the {@link Task} submission at the 
{@link TaskException}.
+ */
+public class TaskSubmissionException extends TaskManagerException {
+
+       private static final long serialVersionUID = 4589813591317690486L;
+
+       public TaskSubmissionException(String message) {
+               super(message);
+       }
+
+       public TaskSubmissionException(String message, Throwable cause) {
+               super(message, cause);
+       }
+
+       public TaskSubmissionException(Throwable cause) {
+               super(cause);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
new file mode 100644
index 0000000..246c11d
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.rpc;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.state.CheckpointStateHandles;
+import org.apache.flink.runtime.taskmanager.CheckpointResponder;
+import org.apache.flink.util.Preconditions;
+
+public class RpcCheckpointResponder implements CheckpointResponder {
+
+       private final CheckpointCoordinatorGateway checkpointCoordinatorGateway;
+
+       public RpcCheckpointResponder(CheckpointCoordinatorGateway 
checkpointCoordinatorGateway) {
+               this.checkpointCoordinatorGateway = 
Preconditions.checkNotNull(checkpointCoordinatorGateway);
+       }
+
+       @Override
+       public void acknowledgeCheckpoint(
+                       JobID jobID,
+                       ExecutionAttemptID executionAttemptID,
+                       CheckpointMetaData checkpointMetaData,
+                       CheckpointStateHandles checkpointStateHandles) {
+
+               checkpointCoordinatorGateway.acknowledgeCheckpoint(
+                       jobID,
+                       executionAttemptID,
+                       checkpointMetaData,
+                       checkpointStateHandles);
+
+       }
+
+       @Override
+       public void declineCheckpoint(JobID jobID, ExecutionAttemptID 
executionAttemptID, CheckpointMetaData checkpoint) {
+               checkpointCoordinatorGateway.declineCheckpoint(jobID, 
executionAttemptID, checkpoint);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
new file mode 100644
index 0000000..4850d63
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.rpc;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+public class RpcInputSplitProvider implements InputSplitProvider {
+       private final JobMasterGateway jobMasterGateway;
+       private final JobID jobID;
+       private final JobVertexID jobVertexID;
+       private final ExecutionAttemptID executionAttemptID;
+       private final Time timeout;
+
+       public RpcInputSplitProvider(
+                       JobMasterGateway jobMasterGateway,
+                       JobID jobID,
+                       JobVertexID jobVertexID,
+                       ExecutionAttemptID executionAttemptID,
+                       Time timeout) {
+               this.jobMasterGateway = 
Preconditions.checkNotNull(jobMasterGateway);
+               this.jobID = Preconditions.checkNotNull(jobID);
+               this.jobVertexID = Preconditions.checkNotNull(jobVertexID);
+               this.executionAttemptID = 
Preconditions.checkNotNull(executionAttemptID);
+               this.timeout = Preconditions.checkNotNull(timeout);
+       }
+
+
+       @Override
+       public InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) 
throws InputSplitProviderException {
+               Preconditions.checkNotNull(userCodeClassLoader);
+
+               Future<SerializedInputSplit> futureInputSplit = 
jobMasterGateway.requestNextInputSplit(jobVertexID, executionAttemptID);
+
+               try {
+                       SerializedInputSplit serializedInputSplit = 
futureInputSplit.get(timeout.getSize(), timeout.getUnit());
+
+                       if (serializedInputSplit.isEmpty()) {
+                               return null;
+                       } else {
+                               return 
InstantiationUtil.deserializeObject(serializedInputSplit.getInputSplitData(), 
userCodeClassLoader);
+                       }
+               } catch (Exception e) {
+                       throw new InputSplitProviderException("Requesting the 
next input split failed.", e);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcKvStateRegistryListener.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcKvStateRegistryListener.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcKvStateRegistryListener.java
new file mode 100644
index 0000000..3692a71
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcKvStateRegistryListener.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.rpc;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.query.KvStateID;
+import org.apache.flink.runtime.query.KvStateRegistryGateway;
+import org.apache.flink.runtime.query.KvStateRegistryListener;
+import org.apache.flink.runtime.query.KvStateServerAddress;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.util.Preconditions;
+
+public class RpcKvStateRegistryListener implements KvStateRegistryListener {
+
+       private final KvStateRegistryGateway kvStateRegistryGateway;
+       private final KvStateServerAddress kvStateServerAddress;
+
+       public RpcKvStateRegistryListener(
+                       KvStateRegistryGateway kvStateRegistryGateway,
+                       KvStateServerAddress kvStateServerAddress) {
+               this.kvStateRegistryGateway = 
Preconditions.checkNotNull(kvStateRegistryGateway);
+               this.kvStateServerAddress = 
Preconditions.checkNotNull(kvStateServerAddress);
+       }
+
+       @Override
+       public void notifyKvStateRegistered(
+                       JobID jobId,
+                       JobVertexID jobVertexId,
+                       KeyGroupRange keyGroupRange,
+                       String registrationName,
+                       KvStateID kvStateId) {
+               kvStateRegistryGateway.notifyKvStateRegistered(
+                       jobId,
+                       jobVertexId,
+                       keyGroupRange,
+                       registrationName,
+                       kvStateId,
+                       kvStateServerAddress);
+
+       }
+
+       @Override
+       public void notifyKvStateUnregistered(
+               JobID jobId,
+               JobVertexID jobVertexId,
+               KeyGroupRange keyGroupRange,
+               String registrationName) {
+
+               kvStateRegistryGateway.notifyKvStateUnregistered(
+                       jobId,
+                       jobVertexId,
+                       keyGroupRange,
+                       registrationName);
+
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcPartitionStateChecker.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcPartitionStateChecker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcPartitionStateChecker.java
new file mode 100644
index 0000000..ab111ad
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcPartitionStateChecker.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.rpc;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.network.PartitionState;
+import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.util.Preconditions;
+
+public class RpcPartitionStateChecker implements PartitionStateChecker {
+
+       private final JobMasterGateway jobMasterGateway;
+
+       public RpcPartitionStateChecker(JobMasterGateway jobMasterGateway) {
+               this.jobMasterGateway = 
Preconditions.checkNotNull(jobMasterGateway);
+       }
+
+       @Override
+       public Future<PartitionState> requestPartitionState(
+               JobID jobId,
+               ExecutionAttemptID executionId,
+               IntermediateDataSetID resultId,
+               ResultPartitionID partitionId) {
+
+               return jobMasterGateway.requestPartitionState(partitionId, 
executionId, resultId);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java
new file mode 100644
index 0000000..29ad3b6
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.rpc;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.taskmanager.TaskActions;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Executor;
+
+public class RpcResultPartitionConsumableNotifier implements 
ResultPartitionConsumableNotifier {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(RpcResultPartitionConsumableNotifier.class);
+
+       private final JobMasterGateway jobMasterGateway;
+       private final Executor executor;
+       private final Time timeout;
+
+       public RpcResultPartitionConsumableNotifier(
+                       JobMasterGateway jobMasterGateway,
+                       Executor executor,
+                       Time timeout) {
+               this.jobMasterGateway = 
Preconditions.checkNotNull(jobMasterGateway);
+               this.executor = Preconditions.checkNotNull(executor);
+               this.timeout = Preconditions.checkNotNull(timeout);
+       }
+       @Override
+       public void notifyPartitionConsumable(JobID jobId, ResultPartitionID 
partitionId, final TaskActions taskActions) {
+               Future<Acknowledge> acknowledgeFuture = 
jobMasterGateway.scheduleOrUpdateConsumers(partitionId, timeout);
+
+               acknowledgeFuture.exceptionallyAsync(new 
ApplyFunction<Throwable, Void>() {
+                       @Override
+                       public Void apply(Throwable value) {
+                               LOG.error("Could not schedule or update 
consumers at the JobManager.", value);
+
+                               taskActions.failExternally(new 
RuntimeException("Could not notify JobManager to schedule or update 
consumers.", value));
+
+                               return null;
+                       }
+               }, executor);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/utils/TaskExecutorMetricsInitializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/utils/TaskExecutorMetricsInitializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/utils/TaskExecutorMetricsInitializer.java
new file mode 100644
index 0000000..1f8d5ed
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/utils/TaskExecutorMetricsInitializer.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.utils;
+
+import com.sun.management.OperatingSystemMXBean;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.management.AttributeNotFoundException;
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.ReflectionException;
+import java.lang.management.ClassLoadingMXBean;
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.ThreadMXBean;
+import java.util.List;
+
+/**
+ * Utility class ot initialize {@link TaskExecutor} specific metrics.
+ */
+public class TaskExecutorMetricsInitializer {
+       private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutorMetricsInitializer.class);
+
+       public static void instantiateStatusMetrics(
+               MetricGroup taskManagerMetricGroup,
+               NetworkEnvironment network) {
+               MetricGroup status = taskManagerMetricGroup.addGroup("Status");
+
+               instantiateNetworkMetrics(status.addGroup("Network"), network);
+
+               MetricGroup jvm = status.addGroup("JVM");
+
+               instantiateClassLoaderMetrics(jvm.addGroup("ClassLoader"));
+               
instantiateGarbageCollectorMetrics(jvm.addGroup("GarbageCollector"));
+               instantiateMemoryMetrics(jvm.addGroup("Memory"));
+               instantiateThreadMetrics(jvm.addGroup("Threads"));
+               instantiateCPUMetrics(jvm.addGroup("CPU"));
+       }
+
+       private static void instantiateNetworkMetrics(
+               MetricGroup metrics,
+               final NetworkEnvironment network) {
+               metrics.<Long, Gauge<Long>>gauge("TotalMemorySegments", new 
Gauge<Long> () {
+                       @Override
+                       public Long getValue() {
+                               return (long) 
network.getNetworkBufferPool().getTotalNumberOfMemorySegments();
+                       }
+               });
+
+               metrics.<Long, Gauge<Long>>gauge("AvailableMemorySegments", new 
Gauge<Long> () {
+                       @Override
+                       public Long getValue() {
+                               return (long) 
network.getNetworkBufferPool().getNumberOfAvailableMemorySegments();
+                       }
+               });
+       }
+
+       private static void instantiateClassLoaderMetrics(MetricGroup metrics) {
+               final ClassLoadingMXBean mxBean = 
ManagementFactory.getClassLoadingMXBean();
+
+               metrics.<Long, Gauge<Long>>gauge("ClassesLoaded", new 
Gauge<Long> () {
+                       @Override
+                       public Long getValue() {
+                               return mxBean.getTotalLoadedClassCount();
+                       }
+               });
+
+               metrics.<Long, Gauge<Long>>gauge("ClassesUnloaded", new 
Gauge<Long> () {
+                       @Override
+                       public Long getValue() {
+                               return mxBean.getUnloadedClassCount();
+                       }
+               });
+       }
+
+       private static void instantiateGarbageCollectorMetrics(MetricGroup 
metrics) {
+               List<GarbageCollectorMXBean> garbageCollectors = 
ManagementFactory.getGarbageCollectorMXBeans();
+
+               for (final GarbageCollectorMXBean garbageCollector: 
garbageCollectors) {
+                       MetricGroup gcGroup = 
metrics.addGroup(garbageCollector.getName());
+
+                       gcGroup.<Long, Gauge<Long>>gauge("Count", new 
Gauge<Long> () {
+                               @Override
+                               public Long getValue() {
+                                       return 
garbageCollector.getCollectionCount();
+                               }
+                       });
+
+                       gcGroup.<Long, Gauge<Long>>gauge("Time", new 
Gauge<Long> () {
+                               @Override
+                               public Long getValue() {
+                                       return 
garbageCollector.getCollectionTime();
+                               }
+                       });
+               }
+       }
+
+       private static void instantiateMemoryMetrics(MetricGroup metrics) {
+               final MemoryMXBean mxBean = ManagementFactory.getMemoryMXBean();
+
+               MetricGroup heap = metrics.addGroup("Heap");
+
+               heap.<Long, Gauge<Long>>gauge("Used", new Gauge<Long> () {
+                       @Override
+                       public Long getValue() {
+                               return mxBean.getHeapMemoryUsage().getUsed();
+                       }
+               });
+               heap.<Long, Gauge<Long>>gauge("Committed", new Gauge<Long> () {
+                       @Override
+                       public Long getValue() {
+                               return 
mxBean.getHeapMemoryUsage().getCommitted();
+                       }
+               });
+               heap.<Long, Gauge<Long>>gauge("Max", new Gauge<Long> () {
+                       @Override
+                       public Long getValue() {
+                               return mxBean.getHeapMemoryUsage().getMax();
+                       }
+               });
+
+               MetricGroup nonHeap = metrics.addGroup("NonHeap");
+
+               nonHeap.<Long, Gauge<Long>>gauge("Used", new Gauge<Long> () {
+                       @Override
+                       public Long getValue() {
+                               return mxBean.getNonHeapMemoryUsage().getUsed();
+                       }
+               });
+               nonHeap.<Long, Gauge<Long>>gauge("Committed", new Gauge<Long> 
() {
+                       @Override
+                       public Long getValue() {
+                               return 
mxBean.getNonHeapMemoryUsage().getCommitted();
+                       }
+               });
+               nonHeap.<Long, Gauge<Long>>gauge("Max", new Gauge<Long> () {
+                       @Override
+                       public Long getValue() {
+                               return mxBean.getNonHeapMemoryUsage().getMax();
+                       }
+               });
+
+               final MBeanServer con = 
ManagementFactory.getPlatformMBeanServer();
+
+               final String directBufferPoolName = 
"java.nio:type=BufferPool,name=direct";
+
+               try {
+                       final ObjectName directObjectName = new 
ObjectName(directBufferPoolName);
+
+                       MetricGroup direct = metrics.addGroup("Direct");
+
+                       direct.<Long, Gauge<Long>>gauge("Count", new 
TaskExecutorMetricsInitializer.AttributeGauge<>(con, directObjectName, "Count", 
-1L));
+                       direct.<Long, Gauge<Long>>gauge("MemoryUsed", new 
TaskExecutorMetricsInitializer.AttributeGauge<>(con, directObjectName, 
"MemoryUsed", -1L));
+                       direct.<Long, Gauge<Long>>gauge("TotalCapacity", new 
TaskExecutorMetricsInitializer.AttributeGauge<>(con, directObjectName, 
"TotalCapacity", -1L));
+               } catch (MalformedObjectNameException e) {
+                       LOG.warn("Could not create object name {}.", 
directBufferPoolName, e);
+               }
+
+               final String mappedBufferPoolName = 
"java.nio:type=BufferPool,name=mapped";
+
+               try {
+                       final ObjectName mappedObjectName = new 
ObjectName(mappedBufferPoolName);
+
+                       MetricGroup mapped = metrics.addGroup("Mapped");
+
+                       mapped.<Long, Gauge<Long>>gauge("Count", new 
TaskExecutorMetricsInitializer.AttributeGauge<>(con, mappedObjectName, "Count", 
-1L));
+                       mapped.<Long, Gauge<Long>>gauge("MemoryUsed", new 
TaskExecutorMetricsInitializer.AttributeGauge<>(con, mappedObjectName, 
"MemoryUsed", -1L));
+                       mapped.<Long, Gauge<Long>>gauge("TotalCapacity", new 
TaskExecutorMetricsInitializer.AttributeGauge<>(con, mappedObjectName, 
"TotalCapacity", -1L));
+               } catch (MalformedObjectNameException e) {
+                       LOG.warn("Could not create object name {}.", 
mappedBufferPoolName, e);
+               }
+       }
+
+       private static void instantiateThreadMetrics(MetricGroup metrics) {
+               final ThreadMXBean mxBean = ManagementFactory.getThreadMXBean();
+
+               metrics.<Integer, Gauge<Integer>>gauge("Count", new 
Gauge<Integer> () {
+                       @Override
+                       public Integer getValue() {
+                               return mxBean.getThreadCount();
+                       }
+               });
+       }
+
+       private static void instantiateCPUMetrics(MetricGroup metrics) {
+               try {
+                       final OperatingSystemMXBean mxBean = 
(OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();
+
+                       metrics.<Double, Gauge<Double>>gauge("Load", new 
Gauge<Double> () {
+                               @Override
+                               public Double getValue() {
+                                       return mxBean.getProcessCpuLoad();
+                               }
+                       });
+                       metrics.<Long, Gauge<Long>>gauge("Time", new 
Gauge<Long> () {
+                               @Override
+                               public Long getValue() {
+                                       return mxBean.getProcessCpuTime();
+                               }
+                       });
+               } catch (Exception e) {
+                       LOG.warn("Cannot access 
com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()" +
+                               " - CPU load metrics will not be available.", 
e);
+               }
+       }
+
+       private static final class AttributeGauge<T> implements Gauge<T> {
+               private final MBeanServer server;
+               private final ObjectName objectName;
+               private final String attributeName;
+               private final T errorValue;
+
+               private AttributeGauge(MBeanServer server, ObjectName 
objectName, String attributeName, T errorValue) {
+                       this.server = Preconditions.checkNotNull(server);
+                       this.objectName = 
Preconditions.checkNotNull(objectName);
+                       this.attributeName = 
Preconditions.checkNotNull(attributeName);
+                       this.errorValue = errorValue;
+               }
+
+               @SuppressWarnings("unchecked")
+               @Override
+               public T getValue() {
+                       try {
+                               return (T) server.getAttribute(objectName, 
attributeName);
+                       } catch (MBeanException | AttributeNotFoundException | 
InstanceNotFoundException | ReflectionException e) {
+                               LOG.warn("Could not read attribute {}.", 
attributeName, e);
+                               return errorValue;
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskManagerActions.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskManagerActions.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskManagerActions.java
new file mode 100644
index 0000000..b3a0cbb
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskManagerActions.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.messages.TaskManagerMessages;
+import org.apache.flink.runtime.messages.TaskMessages;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Implementation using {@link ActorGateway} to forward the messages.
+ */
+public class ActorGatewayTaskManagerActions implements TaskManagerActions {
+
+       private final ActorGateway actorGateway;
+
+       public ActorGatewayTaskManagerActions(ActorGateway actorGateway) {
+               this.actorGateway = Preconditions.checkNotNull(actorGateway);
+       }
+
+       @Override
+       public void notifyFinalState(ExecutionAttemptID executionAttemptID) {
+               actorGateway.tell(new 
TaskMessages.TaskInFinalState(executionAttemptID));
+       }
+
+       @Override
+       public void notifyFatalError(String message, Throwable cause) {
+               actorGateway.tell(new TaskManagerMessages.FatalError(message, 
cause));
+       }
+
+       @Override
+       public void failTask(ExecutionAttemptID executionAttemptID, Throwable 
cause) {
+               actorGateway.tell(new TaskMessages.FailTask(executionAttemptID, 
cause));
+       }
+
+       @Override
+       public void updateTaskExecutionState(TaskExecutionState 
taskExecutionState) {
+               TaskMessages.UpdateTaskExecutionState actorMessage = new 
TaskMessages.UpdateTaskExecutionState(taskExecutionState);
+
+               actorGateway.tell(actorMessage);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskManagerConnection.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskManagerConnection.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskManagerConnection.java
deleted file mode 100644
index cddac55..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskManagerConnection.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager;
-
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.messages.TaskManagerMessages;
-import org.apache.flink.runtime.messages.TaskMessages;
-import org.apache.flink.util.Preconditions;
-
-/**
- * Implementation using {@link ActorGateway} to forward the messages.
- */
-public class ActorGatewayTaskManagerConnection implements 
TaskManagerConnection {
-
-       private final ActorGateway actorGateway;
-
-       public ActorGatewayTaskManagerConnection(ActorGateway actorGateway) {
-               this.actorGateway = Preconditions.checkNotNull(actorGateway);
-       }
-
-       @Override
-       public void notifyFinalState(ExecutionAttemptID executionAttemptID) {
-               actorGateway.tell(new 
TaskMessages.TaskInFinalState(executionAttemptID));
-       }
-
-       @Override
-       public void notifyFatalError(String message, Throwable cause) {
-               actorGateway.tell(new TaskManagerMessages.FatalError(message, 
cause));
-       }
-
-       @Override
-       public void failTask(ExecutionAttemptID executionAttemptID, Throwable 
cause) {
-               actorGateway.tell(new TaskMessages.FailTask(executionAttemptID, 
cause));
-       }
-
-       @Override
-       public void updateTaskExecutionState(TaskExecutionState 
taskExecutionState) {
-               TaskMessages.UpdateTaskExecutionState actorMessage = new 
TaskMessages.UpdateTaskExecutionState(taskExecutionState);
-
-               actorGateway.tell(actorMessage);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 02a41b5..977e563 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.runtime.io.network.PartitionState;
@@ -131,6 +132,9 @@ public class Task implements Runnable, TaskActions {
        /** The execution attempt of the parallel subtask */
        private final ExecutionAttemptID executionId;
 
+       /** ID which identifies the slot in which the task is supposed to run */
+       private final AllocationID allocationID;
+
        /** TaskInfo object for this task */
        private final TaskInfo taskInfo;
 
@@ -176,7 +180,7 @@ public class Task implements Runnable, TaskActions {
        private final Map<IntermediateDataSetID, SingleInputGate> 
inputGatesById;
 
        /** Connection to the task manager */
-       private final TaskManagerConnection taskManagerConnection;
+       private final TaskManagerActions taskManagerActions;
 
        /** Input split provider for the task */
        private final InputSplitProvider inputSplitProvider;
@@ -259,7 +263,7 @@ public class Task implements Runnable, TaskActions {
                IOManager ioManager,
                NetworkEnvironment networkEnvironment,
                BroadcastVariableManager bcVarManager,
-               TaskManagerConnection taskManagerConnection,
+               TaskManagerActions taskManagerActions,
                InputSplitProvider inputSplitProvider,
                CheckpointResponder checkpointResponder,
                LibraryCacheManager libraryCache,
@@ -274,6 +278,7 @@ public class Task implements Runnable, TaskActions {
                this.jobId = checkNotNull(tdd.getJobID());
                this.vertexId = checkNotNull(tdd.getVertexID());
                this.executionId  = checkNotNull(tdd.getExecutionId());
+               this.allocationID = checkNotNull(tdd.getAllocationID());
                this.taskNameWithSubtask = taskInfo.getTaskNameWithSubtasks();
                this.jobConfiguration = checkNotNull(tdd.getJobConfiguration());
                this.taskConfiguration = 
checkNotNull(tdd.getTaskConfiguration());
@@ -296,7 +301,7 @@ public class Task implements Runnable, TaskActions {
 
                this.inputSplitProvider = checkNotNull(inputSplitProvider);
                this.checkpointResponder = checkNotNull(checkpointResponder);
-               this.taskManagerConnection = 
checkNotNull(taskManagerConnection);
+               this.taskManagerActions = checkNotNull(taskManagerActions);
 
                this.libraryCache = checkNotNull(libraryCache);
                this.fileCache = checkNotNull(fileCache);
@@ -380,6 +385,10 @@ public class Task implements Runnable, TaskActions {
                return executionId;
        }
 
+       public AllocationID getAllocationID() {
+               return allocationID;
+       }
+
        public TaskInfo getTaskInfo() {
                return taskInfo;
        }
@@ -600,7 +609,7 @@ public class Task implements Runnable, TaskActions {
 
                        // notify everyone that we switched to running
                        notifyObservers(ExecutionState.RUNNING, null);
-                       taskManagerConnection.updateTaskExecutionState(new 
TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));
+                       taskManagerActions.updateTaskExecutionState(new 
TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));
 
                        // make sure the user code classloader is accessible 
thread-locally
                        
executingThread.setContextClassLoader(userCodeClassLoader);
@@ -793,11 +802,11 @@ public class Task implements Runnable, TaskActions {
        }
 
        private void notifyFinalState() {
-               taskManagerConnection.notifyFinalState(executionId);
+               taskManagerActions.notifyFinalState(executionId);
        }
 
        private void notifyFatalError(String message, Throwable cause) {
-               taskManagerConnection.notifyFatalError(message, cause);
+               taskManagerActions.notifyFatalError(message, cause);
        }
 
        // 
----------------------------------------------------------------------------------------------------------------
@@ -823,7 +832,7 @@ public class Task implements Runnable, TaskActions {
                                                
((StoppableTask)Task.this.invokable).stop();
                                        } catch(RuntimeException e) {
                                                LOG.error("Stopping task " + 
taskNameWithSubtask + " failed.", e);
-                                               
taskManagerConnection.failTask(executionId, e);
+                                               
taskManagerActions.failTask(executionId, e);
                                        }
                                }
                        };

http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
index 60aadf5..877cc1e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
@@ -24,6 +24,8 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.util.SerializedThrowable;
 
+import java.io.Serializable;
+
 /**
  * This class represents an update about a task's execution state.
  *
@@ -34,7 +36,7 @@ import org.apache.flink.runtime.util.SerializedThrowable;
  * exception field transient and deserialized it lazily, with the
  * appropriate class loader.
  */
-public class TaskExecutionState implements java.io.Serializable {
+public class TaskExecutionState implements Serializable {
 
        private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
index 60beae0..31c518a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.util.InstantiationUtil;
 
@@ -63,35 +64,45 @@ public class TaskInputSplitProvider implements 
InputSplitProvider {
        }
 
        @Override
-       public InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) {
+       public InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) 
throws InputSplitProviderException {
                Preconditions.checkNotNull(userCodeClassLoader);
 
+               final Future<Object> response = jobManager.ask(
+                       new JobManagerMessages.RequestNextInputSplit(jobID, 
vertexID, executionID),
+                       timeout);
+
+               final Object result;
+
                try {
-                       final Future<Object> response = jobManager.ask(
-                                       new 
JobManagerMessages.RequestNextInputSplit(jobID, vertexID, executionID),
-                                       timeout);
+                       result = Await.result(response, timeout);
+               } catch (Exception e) {
+                       throw new InputSplitProviderException("Did not receive 
next input split from JobManager.", e);
+               }
 
-                       final Object result = Await.result(response, timeout);
+               if(result instanceof JobManagerMessages.NextInputSplit){
+                       final JobManagerMessages.NextInputSplit nextInputSplit =
+                               (JobManagerMessages.NextInputSplit) result;
 
-                       if(result instanceof JobManagerMessages.NextInputSplit){
-                               final JobManagerMessages.NextInputSplit 
nextInputSplit =
-                                       (JobManagerMessages.NextInputSplit) 
result;
+                       byte[] serializedData = nextInputSplit.splitData();
 
-                               byte[] serializedData = 
nextInputSplit.splitData();
+                       if(serializedData == null) {
+                               return null;
+                       } else {
+                               final Object deserialized;
 
-                               if(serializedData == null) {
-                                       return null;
-                               } else {
-                                       Object deserialized = 
InstantiationUtil.deserializeObject(serializedData,
+                               try {
+                                       deserialized = 
InstantiationUtil.deserializeObject(serializedData,
                                                userCodeClassLoader);
-                                       return (InputSplit) deserialized;
+                               } catch (Exception e) {
+                                       throw new 
InputSplitProviderException("Could not deserialize the serialized input 
split.", e);
                                }
-                       } else {
-                               throw new Exception("RequestNextInputSplit 
requires a response of type " +
-                                       "NextInputSplit. Instead response is of 
type " + result.getClass() + '.');
+
+                               return (InputSplit) deserialized;
                        }
-               } catch (Exception e) {
-                       throw new RuntimeException("Requesting the next 
InputSplit failed.", e);
+               } else {
+                       throw new 
InputSplitProviderException("RequestNextInputSplit requires a response of type 
" +
+                               "NextInputSplit. Instead response is of type " 
+ result.getClass() + '.');
                }
+
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerActions.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerActions.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerActions.java
new file mode 100644
index 0000000..2f3a0cb
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerActions.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+
+/**
+ * Interface for the communication of the {@link Task} with the {@link 
TaskManager}.
+ */
+public interface TaskManagerActions {
+
+       /**
+        * Notifies the task manager that the given task is in a final state.
+        *
+        * @param executionAttemptID Execution attempt ID of the task
+        */
+       void notifyFinalState(ExecutionAttemptID executionAttemptID);
+
+       /**
+        * Notifies the task manager about a fatal error occurred in the task.
+        *
+        * @param message Message to report
+        * @param cause Cause of the fatal error
+        */
+       void notifyFatalError(String message, Throwable cause);
+
+       /**
+        * Tells the task manager to fail the given task.
+        *
+        * @param executionAttemptID Execution attempt ID of the task to fail
+        * @param cause Cause of the failure
+        */
+       void failTask(ExecutionAttemptID executionAttemptID, Throwable cause);
+
+       /**
+        * Notifies the task manager about the task execution state update.
+        *
+        * @param taskExecutionState Task execution state update
+        */
+       void updateTaskExecutionState(TaskExecutionState taskExecutionState);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerConnection.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerConnection.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerConnection.java
deleted file mode 100644
index dc1b40f..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerConnection.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager;
-
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-
-/**
- * Interface for the communication of the {@link Task} with the {@link 
TaskManager}.
- */
-public interface TaskManagerConnection {
-
-       /**
-        * Notifies the task manager that the given task is in a final state.
-        *
-        * @param executionAttemptID Execution attempt ID of the task
-        */
-       void notifyFinalState(ExecutionAttemptID executionAttemptID);
-
-       /**
-        * Notifies the task manager about a fatal error occurred in the task.
-        *
-        * @param message Message to report
-        * @param cause Cause of the fatal error
-        */
-       void notifyFatalError(String message, Throwable cause);
-
-       /**
-        * Tells the task manager to fail the given task.
-        *
-        * @param executionAttemptID Execution attempt ID of the task to fail
-        * @param cause Cause of the failure
-        */
-       void failTask(ExecutionAttemptID executionAttemptID, Throwable cause);
-
-       /**
-        * Notifies the task manager about the task execution state update.
-        *
-        * @param taskExecutionState Task execution state update
-        */
-       void updateTaskExecutionState(TaskExecutionState taskExecutionState);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 7a764ca..da8c14e 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -25,7 +25,6 @@ import java.net.{InetAddress, InetSocketAddress}
 import java.util
 import java.util.UUID
 import java.util.concurrent.TimeUnit
-import javax.management.ObjectName
 
 import _root_.akka.actor._
 import _root_.akka.pattern.ask
@@ -37,7 +36,6 @@ import com.fasterxml.jackson.databind.ObjectMapper
 import grizzled.slf4j.Logger
 import org.apache.flink.configuration._
 import org.apache.flink.core.fs.FileSystem
-import org.apache.flink.metrics.{MetricGroup, Gauge => FlinkGauge}
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
 import org.apache.flink.runtime.clusterframework.messages.StopCluster
 import org.apache.flink.runtime.clusterframework.types.ResourceID
@@ -59,7 +57,7 @@ import 
org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, Leader
 import org.apache.flink.runtime.memory.MemoryManager
 import org.apache.flink.runtime.messages.Messages._
 import org.apache.flink.runtime.messages.RegistrationMessages._
-import 
org.apache.flink.runtime.messages.StackTraceSampleMessages.{ResponseStackTraceSampleFailure,
 ResponseStackTraceSampleSuccess, SampleTaskStackTrace, 
StackTraceSampleMessages, TriggerStackTraceSample}
+import org.apache.flink.runtime.messages.StackTraceSampleMessages._
 import org.apache.flink.runtime.messages.TaskManagerMessages._
 import org.apache.flink.runtime.messages.TaskMessages._
 import 
org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, 
NotifyCheckpointComplete, TriggerCheckpoint}
@@ -68,7 +66,8 @@ import 
org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup
 import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.security.SecurityContext.{FlinkSecuredRunner, 
SecurityConfiguration}
 import org.apache.flink.runtime.security.SecurityContext
-import org.apache.flink.runtime.taskexecutor.{TaskManagerConfiguration, 
TaskManagerServices, TaskManagerServicesConfiguration}
+import org.apache.flink.runtime.taskexecutor._
+import 
org.apache.flink.runtime.taskexecutor.utils.TaskExecutorMetricsInitializer
 import org.apache.flink.runtime.util._
 import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, 
LogMessages}
 import org.apache.flink.util.NetUtils
@@ -150,7 +149,7 @@ class TaskManager(
   protected val bcVarManager = new BroadcastVariableManager()
 
   /** Handler for distributed files cached by this TaskManager */
-  protected val fileCache = new FileCache(config.getConfiguration())
+  protected val fileCache = new FileCache(config.getTmpDirPaths())
 
   /** Registry of metrics periodically transmitted to the JobManager */
   private val metricRegistry = TaskManager.createMetricsRegistry()
@@ -196,7 +195,7 @@ class TaskManager(
     CheckpointResponder,
     PartitionStateChecker,
     ResultPartitionConsumableNotifier,
-    TaskManagerConnection)] = None
+    TaskManagerActions)] = None
 
   // --------------------------------------------------------------------------
   //  Actor messages and life cycle
@@ -940,9 +939,9 @@ class TaskManager(
     val jobManagerGateway = new AkkaActorGateway(jobManager, 
leaderSessionID.orNull)
     val taskManagerGateway = new AkkaActorGateway(self, leaderSessionID.orNull)
 
-    val checkpointResponder = new 
ActorGatewayCheckpointResponder(jobManagerGateway);
+    val checkpointResponder = new 
ActorGatewayCheckpointResponder(jobManagerGateway)
 
-    val taskManagerConnection = new 
ActorGatewayTaskManagerConnection(taskManagerGateway)
+    val taskManagerConnection = new 
ActorGatewayTaskManagerActions(taskManagerGateway)
 
     val partitionStateChecker = new ActorGatewayPartitionStateChecker(
       jobManagerGateway,
@@ -998,7 +997,7 @@ class TaskManager(
     taskManagerMetricGroup = 
       new TaskManagerMetricGroup(metricsRegistry, 
this.runtimeInfo.getHostname, id.toString)
     
-    TaskManager.instantiateStatusMetrics(taskManagerMetricGroup, network)
+    
TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, 
network)
     
     // watch job manager to detect when it dies
     context.watch(jobManager)
@@ -2008,23 +2007,23 @@ object TaskManager {
 
     // Pre-processing steps for registering cpuLoad
     val osBean: OperatingSystemMXBean = 
ManagementFactory.getOperatingSystemMXBean()
-        
-    val fetchCPULoadMethod: Option[Method] = 
+
+    val fetchCPULoadMethod: Option[Method] =
       try {
         Class.forName("com.sun.management.OperatingSystemMXBean")
           .getMethods()
-          .find( _.getName() == "getProcessCpuLoad" )
+          .find(_.getName() == "getProcessCpuLoad")
       }
       catch {
         case t: Throwable =>
           LOG.warn("Cannot access 
com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()" +
-            " - CPU load metrics will not be available.")
+                     " - CPU load metrics will not be available.")
           None
       }
 
     metricRegistry.register("cpuLoad", new Gauge[Double] {
       override def getValue: Double = {
-        try{
+        try {
           
fetchCPULoadMethod.map(_.invoke(osBean).asInstanceOf[Double]).getOrElse(-1.0)
         }
         catch {
@@ -2036,146 +2035,4 @@ object TaskManager {
     })
     metricRegistry
   }
-
-  private def instantiateStatusMetrics(
-      taskManagerMetricGroup: MetricGroup,
-      network: NetworkEnvironment)
-    : Unit = {
-    val status = taskManagerMetricGroup
-      .addGroup("Status")
-
-    instantiateNetworkMetrics(status.addGroup("Network"), network)
-
-    val jvm = status
-      .addGroup("JVM")
-
-    instantiateClassLoaderMetrics(jvm.addGroup("ClassLoader"))
-    instantiateGarbageCollectorMetrics(jvm.addGroup("GarbageCollector"))
-    instantiateMemoryMetrics(jvm.addGroup("Memory"))
-    instantiateThreadMetrics(jvm.addGroup("Threads"))
-    instantiateCPUMetrics(jvm.addGroup("CPU"))
-  }
-
-  private def instantiateNetworkMetrics(
-        metrics: MetricGroup,
-        network: NetworkEnvironment)
-    : Unit = {
-    metrics.gauge[Long, FlinkGauge[Long]]("TotalMemorySegments", new 
FlinkGauge[Long] {
-      override def getValue: Long = 
network.getNetworkBufferPool.getTotalNumberOfMemorySegments
-    })
-    metrics.gauge[Long, FlinkGauge[Long]]("AvailableMemorySegments", new 
FlinkGauge[Long] {
-      override def getValue: Long = 
network.getNetworkBufferPool.getNumberOfAvailableMemorySegments
-    })
-  }
-
-  private def instantiateClassLoaderMetrics(metrics: MetricGroup) {
-    val mxBean = ManagementFactory.getClassLoadingMXBean
-
-    metrics.gauge[Long, FlinkGauge[Long]]("ClassesLoaded", new 
FlinkGauge[Long] {
-      override def getValue: Long = mxBean.getTotalLoadedClassCount
-    })
-    metrics.gauge[Long, FlinkGauge[Long]]("ClassesUnloaded", new 
FlinkGauge[Long] {
-      override def getValue: Long = mxBean.getUnloadedClassCount
-    })
-  }
-
-  private def instantiateGarbageCollectorMetrics(metrics: MetricGroup) {
-    val garbageCollectors = ManagementFactory.getGarbageCollectorMXBeans
-
-    for (garbageCollector <- garbageCollectors.asScala) {
-      val gcGroup = metrics.addGroup(garbageCollector.getName)
-      gcGroup.gauge[Long, FlinkGauge[Long]]("Count", new FlinkGauge[Long] {
-        override def getValue: Long = garbageCollector.getCollectionCount
-      })
-      gcGroup.gauge[Long, FlinkGauge[Long]]("Time", new FlinkGauge[Long] {
-        override def getValue: Long = garbageCollector.getCollectionTime
-      })
-    }
-  }
-
-  private def instantiateMemoryMetrics(metrics: MetricGroup) {
-    val mxBean = ManagementFactory.getMemoryMXBean
-    val heap = metrics.addGroup("Heap")
-    heap.gauge[Long, FlinkGauge[Long]]("Used", new FlinkGauge[Long] {
-      override def getValue: Long = mxBean.getHeapMemoryUsage.getUsed
-    })
-    heap.gauge[Long, FlinkGauge[Long]]("Committed", new FlinkGauge[Long] {
-      override def getValue: Long = mxBean.getHeapMemoryUsage.getCommitted
-    })
-    heap.gauge[Long, FlinkGauge[Long]]("Max", new FlinkGauge[Long] {
-      override def getValue: Long = mxBean.getHeapMemoryUsage.getMax
-    })
-
-    val nonHeap = metrics.addGroup("NonHeap")
-    nonHeap.gauge[Long, FlinkGauge[Long]]("Used", new FlinkGauge[Long] {
-      override def getValue: Long = mxBean.getNonHeapMemoryUsage.getUsed
-    })
-    nonHeap.gauge[Long, FlinkGauge[Long]]("Committed", new FlinkGauge[Long] {
-      override def getValue: Long = mxBean.getNonHeapMemoryUsage.getCommitted
-    })
-    nonHeap.gauge[Long, FlinkGauge[Long]]("Max", new FlinkGauge[Long] {
-      override def getValue: Long = mxBean.getNonHeapMemoryUsage.getMax
-    })
-
-    val con = ManagementFactory.getPlatformMBeanServer;
-
-    val directObjectName = new 
ObjectName("java.nio:type=BufferPool,name=direct")
-
-    val direct = metrics.addGroup("Direct")
-    direct.gauge[Long, FlinkGauge[Long]]("Count", new FlinkGauge[Long] {
-      override def getValue: Long = con
-        .getAttribute(directObjectName, "Count").asInstanceOf[Long]
-    })
-    direct.gauge[Long, FlinkGauge[Long]]("MemoryUsed", new FlinkGauge[Long] {
-      override def getValue: Long = con
-        .getAttribute(directObjectName, "MemoryUsed").asInstanceOf[Long]
-    })
-    direct.gauge[Long, FlinkGauge[Long]]("TotalCapacity", new FlinkGauge[Long] 
{
-      override def getValue: Long = con
-        .getAttribute(directObjectName, "TotalCapacity").asInstanceOf[Long]
-    })
-
-    val mappedObjectName = new 
ObjectName("java.nio:type=BufferPool,name=mapped")
-
-    val mapped = metrics.addGroup("Mapped")
-    mapped.gauge[Long, FlinkGauge[Long]]("Count", new FlinkGauge[Long] {
-      override def getValue: Long = con
-        .getAttribute(mappedObjectName, "Count").asInstanceOf[Long]
-    })
-    mapped.gauge[Long, FlinkGauge[Long]]("MemoryUsed", new FlinkGauge[Long] {
-      override def getValue: Long = con
-        .getAttribute(mappedObjectName, "MemoryUsed").asInstanceOf[Long]
-    })
-    mapped.gauge[Long, FlinkGauge[Long]]("TotalCapacity", new FlinkGauge[Long] 
{
-      override def getValue: Long = con
-        .getAttribute(mappedObjectName, "TotalCapacity").asInstanceOf[Long]
-    })
-  }
-
-  private def instantiateThreadMetrics(metrics: MetricGroup): Unit = {
-    val mxBean = ManagementFactory.getThreadMXBean
-
-    metrics.gauge[Int, FlinkGauge[Int]]("Count", new FlinkGauge[Int] {
-      override def getValue: Int = mxBean.getThreadCount
-    })
-  }
-
-  private def instantiateCPUMetrics(metrics: MetricGroup): Unit = {
-    try {
-      val mxBean = ManagementFactory.getOperatingSystemMXBean
-        .asInstanceOf[com.sun.management.OperatingSystemMXBean]
-
-      metrics.gauge[Double, FlinkGauge[Double]]("Load", new FlinkGauge[Double] 
{
-          override def getValue: Double = mxBean.getProcessCpuLoad
-        })
-      metrics.gauge[Long, FlinkGauge[Long]]("Time", new FlinkGauge[Long] {
-          override def getValue: Long = mxBean.getProcessCpuTime
-        })
-    }
-    catch {
-     case t: Throwable =>
-       LOG.warn("Cannot access 
com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()" +
-        " - CPU load metrics will not be available.") 
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDeleteValidationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDeleteValidationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDeleteValidationTest.java
index c369674..4db0d93 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDeleteValidationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDeleteValidationTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.filecache;
 import java.io.File;
 import java.util.concurrent.Future;
 
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import 
org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
 import org.apache.flink.api.common.JobID;
@@ -62,8 +61,9 @@ public class FileCacheDeleteValidationTest {
        
        @Before
        public void setup() {
+               String[] tmpDirectories = 
System.getProperty("java.io.tmpdir").split(",|" + File.pathSeparator);
                try {
-                       fileCache = new FileCache(new Configuration());
+                       fileCache = new FileCache(tmpDirectories);
                }
                catch (Exception e) {
                        e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
index f8a0b6a..30dfef5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
@@ -42,7 +42,6 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 7710fa9..f5fe52c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -20,8 +20,11 @@ package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.highavailability.NonHaServices;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -29,6 +32,7 @@ import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRegistered;
 import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected;
@@ -38,7 +42,6 @@ import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.TestLogger;
 
-import org.hamcrest.Matchers;
 import org.junit.Test;
 
 import org.powermock.api.mockito.PowerMockito;
@@ -60,10 +63,14 @@ public class TaskExecutorTest extends TestLogger {
                        ResourceManagerGateway rmGateway = 
mock(ResourceManagerGateway.class);
                        TaskManagerConfiguration 
taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class);
                        
PowerMockito.when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
+                       
PowerMockito.when(taskManagerServicesConfiguration.getConfiguration()).thenReturn(new
 Configuration());
+                       
PowerMockito.when(taskManagerServicesConfiguration.getTmpDirPaths()).thenReturn(new
 String[1]);
+
                        rpc.registerGateway(resourceManagerAddress, rmGateway);
 
                        TaskManagerLocation taskManagerLocation = 
mock(TaskManagerLocation.class);
                        
when(taskManagerLocation.getResourceID()).thenReturn(resourceID);
+                       
when(taskManagerLocation.getHostname()).thenReturn("foobar");
 
                        NonHaServices haServices = new 
NonHaServices(resourceManagerAddress);
 
@@ -76,6 +83,9 @@ public class TaskExecutorTest extends TestLogger {
                                mock(NetworkEnvironment.class),
                                haServices,
                                mock(MetricRegistry.class),
+                               mock(TaskManagerMetricGroup.class),
+                               mock(BroadcastVariableManager.class),
+                               mock(FileCache.class),
                                mock(FatalErrorHandler.class));
 
                        taskManager.start();
@@ -113,9 +123,12 @@ public class TaskExecutorTest extends TestLogger {
 
                        TaskManagerConfiguration 
taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class);
                        
PowerMockito.when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
+                       
PowerMockito.when(taskManagerServicesConfiguration.getConfiguration()).thenReturn(new
 Configuration());
+                       
PowerMockito.when(taskManagerServicesConfiguration.getTmpDirPaths()).thenReturn(new
 String[1]);
 
                        TaskManagerLocation taskManagerLocation = 
mock(TaskManagerLocation.class);
                        
when(taskManagerLocation.getResourceID()).thenReturn(resourceID);
+                       
when(taskManagerLocation.getHostname()).thenReturn("foobar");
 
                        TaskExecutor taskManager = new TaskExecutor(
                                taskManagerServicesConfiguration,
@@ -126,6 +139,9 @@ public class TaskExecutorTest extends TestLogger {
                                mock(NetworkEnvironment.class),
                                haServices,
                                mock(MetricRegistry.class),
+                               mock(TaskManagerMetricGroup.class),
+                               mock(BroadcastVariableManager.class),
+                               mock(FileCache.class),
                                mock(FatalErrorHandler.class));
 
                        taskManager.start();
@@ -182,9 +198,12 @@ public class TaskExecutorTest extends TestLogger {
 
                        TaskManagerConfiguration 
taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class);
                        
PowerMockito.when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
+                       
PowerMockito.when(taskManagerServicesConfiguration.getConfiguration()).thenReturn(new
 Configuration());
+                       
PowerMockito.when(taskManagerServicesConfiguration.getTmpDirPaths()).thenReturn(new
 String[1]);
 
                        TaskManagerLocation taskManagerLocation = 
mock(TaskManagerLocation.class);
                        
when(taskManagerLocation.getResourceID()).thenReturn(resourceID);
+                       
when(taskManagerLocation.getHostname()).thenReturn("foobar");
 
                        TaskExecutor taskManager = new TaskExecutor(
                                taskManagerServicesConfiguration,
@@ -195,6 +214,9 @@ public class TaskExecutorTest extends TestLogger {
                                mock(NetworkEnvironment.class),
                                haServices,
                                mock(MetricRegistry.class),
+                               mock(TaskManagerMetricGroup.class),
+                               mock(BroadcastVariableManager.class),
+                               mock(FileCache.class),
                                mock(FatalErrorHandler.class));
 
                        taskManager.start();

http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index e2abe88..9a79935 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -173,7 +173,7 @@ public class TaskAsyncCallTest {
                        mock(IOManager.class),
                        networkEnvironment,
                        mock(BroadcastVariableManager.class),
-                       mock(TaskManagerConnection.class),
+                       mock(TaskManagerActions.class),
                        mock(InputSplitProvider.class),
                        mock(CheckpointResponder.class),
                        libCache,

http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProviderTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProviderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProviderTest.java
index 642300d..777633d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProviderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProviderTest.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.instance.BaseTestingActorGateway;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.junit.Test;
@@ -36,7 +37,7 @@ import java.util.concurrent.TimeUnit;
 public class TaskInputSplitProviderTest {
 
        @Test
-       public void testRequestNextInputSplitWithInvalidExecutionID() {
+       public void testRequestNextInputSplitWithInvalidExecutionID() throws 
InputSplitProviderException {
 
                final JobID jobID = new JobID();
                final JobVertexID vertexID = new JobVertexID();

http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
index 9791cee..5d3eb3a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskmanager;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
 import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
@@ -68,6 +69,7 @@ public class TaskStopTest {
                
when(tddMock.getTaskConfiguration()).thenReturn(mock(Configuration.class));
                
when(tddMock.getSerializedExecutionConfig()).thenReturn(mock(SerializedValue.class));
                when(tddMock.getInvokableClassName()).thenReturn("className");
+               
when(tddMock.getAllocationID()).thenReturn(mock(AllocationID.class));
 
                task = new Task(
                        tddMock,
@@ -75,7 +77,7 @@ public class TaskStopTest {
                        mock(IOManager.class),
                        mock(NetworkEnvironment.class),
                        mock(BroadcastVariableManager.class),
-                       mock(TaskManagerConnection.class),
+                       mock(TaskManagerActions.class),
                        mock(InputSplitProvider.class),
                        mock(CheckpointResponder.class),
                        mock(LibraryCacheManager.class),

http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 9a13cde..fe618ff 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -97,7 +97,7 @@ public class TaskTest {
        private ActorGateway listenerGateway;
 
        private ActorGatewayTaskExecutionStateListener listener;
-       private ActorGatewayTaskManagerConnection taskManagerConnection;
+       private ActorGatewayTaskManagerActions taskManagerConnection;
 
        private BlockingQueue<Object> taskManagerMessages;
        private BlockingQueue<Object> jobManagerMessages;
@@ -113,7 +113,7 @@ public class TaskTest {
                listenerGateway = new ForwardingActorGateway(listenerMessages);
 
                listener = new 
ActorGatewayTaskExecutionStateListener(listenerGateway);
-               taskManagerConnection = new 
ActorGatewayTaskManagerConnection(taskManagerGateway);
+               taskManagerConnection = new 
ActorGatewayTaskManagerActions(taskManagerGateway);
                
                awaitLatch = new OneShotLatch();
                triggerLatch = new OneShotLatch();

http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java
index 343affe..c067ca7 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 
 import java.util.Iterator;
@@ -146,7 +147,12 @@ public class InputFormatSourceFunction<OUT> extends 
RichParallelSourceFunction<O
                                        return true;
                                }
 
-                               InputSplit split = 
provider.getNextInputSplit(getRuntimeContext().getUserCodeClassLoader());
+                               final InputSplit split;
+                               try {
+                                       split = 
provider.getNextInputSplit(getRuntimeContext().getUserCodeClassLoader());
+                               } catch (InputSplitProviderException e) {
+                                       throw new RuntimeException("Could not 
retrieve next input split.", e);
+                               }
 
                                if (split != null) {
                                        this.nextSplit = split;

http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index b5b6582..ffda126 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -47,7 +47,7 @@ import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.runtime.taskmanager.Task;
-import org.apache.flink.runtime.taskmanager.TaskManagerConnection;
+import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.streaming.api.TimeCharacteristic;
@@ -59,6 +59,7 @@ import org.apache.flink.util.SerializedValue;
 import org.junit.Test;
 
 import java.io.EOFException;
+import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
 import java.net.URL;
@@ -153,17 +154,19 @@ public class InterruptSensitiveRestoreTest {
                
when(networkEnvironment.createKvStateTaskRegistry(any(JobID.class), 
any(JobVertexID.class)))
                                .thenReturn(mock(TaskKvStateRegistry.class));
 
+               String[] tmpDirectories = 
EnvironmentInformation.getTemporaryFileDirectory().split(",|" + 
File.pathSeparator);
+
                return new Task(
                                tdd,
                                mock(MemoryManager.class),
                                mock(IOManager.class),
                                networkEnvironment,
                                mock(BroadcastVariableManager.class),
-                               mock(TaskManagerConnection.class),
+                               mock(TaskManagerActions.class),
                                mock(InputSplitProvider.class),
                                mock(CheckpointResponder.class),
                                new FallbackLibraryCacheManager(),
-                               new FileCache(new Configuration()),
+                               new FileCache(tmpDirectories),
                                new TaskManagerRuntimeInfo(
                                                "localhost", new 
Configuration(), EnvironmentInformation.getTemporaryFileDirectory()),
                                new UnregisteredTaskMetricsGroup(),
@@ -266,4 +269,4 @@ public class InterruptSensitiveRestoreTest {
                        fail("should never be called");
                }
        }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 8aae19f..106d3df 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -51,7 +51,7 @@ import 
org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskExecutionStateListener;
-import org.apache.flink.runtime.taskmanager.TaskManagerConnection;
+import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -238,7 +238,7 @@ public class StreamTaskTest {
                        mock(IOManager.class),
                        network,
                        mock(BroadcastVariableManager.class),
-                       mock(TaskManagerConnection.class),
+                       mock(TaskManagerActions.class),
                        mock(InputSplitProvider.class),
                        mock(CheckpointResponder.class),
                        libCache,

Reply via email to