This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new cefebf8  [FLINK-12344][coordination] Remove legacy runtime messages
cefebf8 is described below

commit cefebf80410821704cbc153615c551f1117d2872
Author: Zili Chen <[email protected]>
AuthorDate: Sun Apr 28 00:22:14 2019 +0800

    [FLINK-12344][coordination] Remove legacy runtime messages
---
 .../org/apache/flink/client/cli/CliFrontend.java   |   6 +-
 .../messages/CheckAndAllocateContainers.java       |  68 ---
 .../messages/FatalErrorOccurred.java               |  65 ---
 .../messages/NewLeaderAvailable.java               |  56 ---
 .../messages/NotifyResourceStarted.java            |  47 --
 .../messages/ReconnectResourceManager.java         |  59 ---
 .../RegisterInfoMessageListenerSuccessful.java     |  69 ---
 .../messages/RegisterResourceManager.java          |  45 --
 .../RegisterResourceManagerSuccessful.java         |  78 ---
 .../clusterframework/messages/RemoveResource.java  |  64 ---
 .../clusterframework/messages/ResourceRemoved.java |  78 ---
 .../messages/SetWorkerPoolSize.java                |  63 ---
 .../messages/ShutdownClusterAfterJob.java          |  45 --
 .../clusterframework/messages/StopCluster.java     |  58 ---
 .../messages/StopClusterSuccessful.java            |  45 --
 .../messages/TriggerRegistrationAtJobManager.java  |  50 --
 .../clusterframework/messages/package-info.java    |  24 -
 .../PartitionProducerDisposedException.java        |   3 +-
 .../messages/LeaderSessionMessageDecorator.java    |  51 --
 .../flink/runtime/messages/MessageDecorator.java   |  33 --
 .../runtime/messages/RequiresLeaderSessionID.java  |  25 -
 .../apache/flink/runtime/messages/StackTrace.java  |  45 --
 .../flink/runtime/messages/ArchiveMessages.scala   |  76 ---
 .../runtime/messages/ExecutionGraphMessages.scala  |  87 ----
 .../runtime/messages/JobManagerMessages.scala      | 559 ---------------------
 .../apache/flink/runtime/messages/Messages.scala   |  40 --
 .../runtime/messages/RegistrationMessages.scala    | 105 ----
 .../messages/StackTraceSampleMessages.scala        |  79 ---
 .../runtime/messages/TaskControlMessages.scala     | 160 ------
 .../runtime/messages/TaskManagerMessages.scala     | 183 -------
 .../accumulators/AccumulatorMessages.scala         |  96 ----
 .../StackTraceSampleCoordinatorTest.java           |  11 -
 32 files changed, 3 insertions(+), 2470 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 1073936..cdd7616 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -52,7 +52,6 @@ import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.util.EnvironmentInformation;
@@ -691,8 +690,7 @@ public class CliFrontend {
        }
 
        /**
-        * Sends a {@link 
org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint}
-        * message to the job manager.
+        * Sends a SavepointTriggerMessage to the job manager.
         */
        private String triggerSavepoint(ClusterClient<?> clusterClient, JobID 
jobId, String savepointDirectory) throws FlinkException {
                logAndSysout("Triggering savepoint for job " + jobId + '.');
@@ -717,7 +715,7 @@ public class CliFrontend {
        }
 
        /**
-        * Sends a {@link JobManagerMessages.DisposeSavepoint} message to the 
job manager.
+        * Sends a SavepointDisposalRequest to the job manager.
         */
        private void disposeSavepoint(ClusterClient<?> clusterClient, String 
savepointPath) throws FlinkException {
                Preconditions.checkNotNull(savepointPath, "Missing required 
argument: savepoint path. " +
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/CheckAndAllocateContainers.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/CheckAndAllocateContainers.java
deleted file mode 100644
index 3380939..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/CheckAndAllocateContainers.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.clusterframework.messages;
-
-/**
- * This message signals the resource master to check how many TaskManagers are 
- * desired, how many are available, and to trigger adjustments if needed.
- */
-public class CheckAndAllocateContainers implements java.io.Serializable {
-
-       private static final long serialVersionUID = 7808628311617273755L;
-
-       /** The singleton instance */
-       private static final CheckAndAllocateContainers INSTANCE = new 
CheckAndAllocateContainers();
-
-       /**
-        * Gets the singleton instance.
-        * @return The singleton instance.
-        */
-       public static CheckAndAllocateContainers get() {
-               return INSTANCE;
-       }
-       
-       // 
------------------------------------------------------------------------
-       
-       /** Private constructor to prevent instantiation */
-       private CheckAndAllocateContainers() {}
-
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public boolean equals(Object obj) {
-               return obj != null && obj.getClass() == 
CheckAndAllocateContainers.class;
-       }
-
-       @Override
-       public int hashCode() {
-               return 1725602876;
-       }
-
-       @Override
-       public String toString() {
-               return getClass().getSimpleName();
-       }
-
-       /**
-        * Read resolve to preserve the singleton object property.
-        */
-       private Object readResolve() throws java.io.ObjectStreamException {
-               return INSTANCE;
-       }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/FatalErrorOccurred.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/FatalErrorOccurred.java
deleted file mode 100644
index ae6bb39..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/FatalErrorOccurred.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.clusterframework.messages;
-
-import org.apache.flink.util.SerializedThrowable;
-
-import java.io.Serializable;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Message sent to the Flink's resource manager in case of a fatal error that
- * cannot be recovered in the running process.
- * 
- * When master high-availability is enabled, this message should fail the 
resource
- * manager (for example process kill) such that it gets recovered (restarted or
- * another process takes over).
- */
-public class FatalErrorOccurred implements Serializable {
-
-       private static final long serialVersionUID = -2246792138413563536L;
-       
-       private final String message;
-       
-       private final SerializedThrowable error;
-       
-       public FatalErrorOccurred(String message, Throwable error) {
-               this.message = requireNonNull(message);
-               this.error = new SerializedThrowable(requireNonNull(error));
-       }
-       
-       // 
------------------------------------------------------------------------
-       
-       public String message() {
-               return message;
-       }
-       
-       public Throwable error() {
-               return error.deserializeError(getClass().getClassLoader());
-       }
-
-       // 
------------------------------------------------------------------------
-       
-       @Override
-       public String toString() {
-               return "FatalErrorOccurred { message: '" + message + "', error: 
'"
-                       + error.getMessage() + "' }";
-       }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/NewLeaderAvailable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/NewLeaderAvailable.java
deleted file mode 100644
index f751718..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/NewLeaderAvailable.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.clusterframework.messages;
-
-import java.util.UUID;
-
-/**
- * Message sent to the Flink resource manager to indicate that a new leader is 
available.
- */
-public class NewLeaderAvailable implements java.io.Serializable {
-       private static final long serialVersionUID = 1L;
-       
-       /** The leader Akka URL */
-       private final String leaderAddress;
-       
-       /** The session ID for the leadership status */
-       private final UUID leaderSessionId;
-       
-       public NewLeaderAvailable(String leaderAddress, UUID leaderSessionId) {
-               this.leaderAddress = leaderAddress;
-               this.leaderSessionId = leaderSessionId;
-       }
-       
-       // 
------------------------------------------------------------------------
-
-       public String leaderAddress() {
-               return leaderAddress;
-       }
-       
-       public UUID leaderSessionId() {
-               return leaderSessionId;
-       }
-
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public String toString() {
-               return "NewLeaderAvailable (" + leaderAddress + " / " + 
leaderSessionId + ')';
-       }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/NotifyResourceStarted.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/NotifyResourceStarted.java
deleted file mode 100644
index 1427ba8..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/NotifyResourceStarted.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.clusterframework.messages;
-
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
-
-/**
- * Notifies the ResourceManager that a TaskManager has been started in a 
container with the given
- * resource id.
- */
-public class NotifyResourceStarted implements RequiresLeaderSessionID, 
java.io.Serializable {
-       private static final long serialVersionUID = 1L;
-
-       private final ResourceID resourceID;
-
-       public NotifyResourceStarted(ResourceID resourceID) {
-               this.resourceID = resourceID;
-       }
-
-       public ResourceID getResourceID() {
-               return resourceID;
-       }
-
-       @Override
-       public String toString() {
-               return "NotifyResourceStarted{" +
-                       ", resourceID=" + resourceID +
-                       '}';
-       }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ReconnectResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ReconnectResourceManager.java
deleted file mode 100644
index 1f852e8..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ReconnectResourceManager.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.clusterframework.messages;
-
-import akka.actor.ActorRef;
-import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
-import org.apache.flink.util.Preconditions;
-
-import java.io.Serializable;
-
-/**
- * This message signals that the ResourceManager should reconnect to the 
JobManager. It is processed
- * by the JobManager if it fails to register resources with the 
ResourceManager. The JobManager wants
- * the ResourceManager to go through the reconciliation phase to sync up with 
the JobManager bookkeeping.
- * This is done by forcing the ResourceManager to reconnect.
- */
-public class ReconnectResourceManager implements RequiresLeaderSessionID, 
Serializable {
-       private static final long serialVersionUID = 1L;
-
-       private final ActorRef resourceManager;
-
-       private final long connectionId;
-
-       public ReconnectResourceManager(ActorRef resourceManager, long 
connectionId) {
-               this.resourceManager = 
Preconditions.checkNotNull(resourceManager);
-               this.connectionId = Preconditions.checkNotNull(connectionId);
-       }
-       
-       public ActorRef resourceManager() {
-               return resourceManager;
-       }
-
-       public long getConnectionId() {
-               return connectionId;
-       }
-
-       @Override
-       public String toString() {
-               return "ReconnectResourceManager(" +
-                       resourceManager.path() + ", " +
-                       connectionId + ')';
-       }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterInfoMessageListenerSuccessful.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterInfoMessageListenerSuccessful.java
deleted file mode 100644
index 79e5618..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterInfoMessageListenerSuccessful.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.clusterframework.messages;
-
-import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
-
-/**
- * This message signals to the application client that the registration was 
successful.
- */
-public class RegisterInfoMessageListenerSuccessful implements 
RequiresLeaderSessionID, java.io.Serializable {
-
-       private static final long serialVersionUID = 7808628311617273755L;
-
-       /** The singleton instance */
-       private static final RegisterInfoMessageListenerSuccessful INSTANCE = 
new RegisterInfoMessageListenerSuccessful();
-
-       /**
-        * Gets the singleton instance.
-        * @return The singleton instance.
-        */
-       public static RegisterInfoMessageListenerSuccessful get() {
-               return INSTANCE;
-       }
-
-       // 
------------------------------------------------------------------------
-
-       /** Private constructor to prevent instantiation */
-       private RegisterInfoMessageListenerSuccessful() {}
-
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public boolean equals(Object obj) {
-               return obj != null && obj.getClass() == 
RegisterInfoMessageListenerSuccessful.class;
-       }
-
-       @Override
-       public int hashCode() {
-               return 2018741656;
-       }
-
-       @Override
-       public String toString() {
-               return getClass().getSimpleName();
-       }
-
-       /**
-        * Read resolve to preserve the singleton object property.
-        */
-       private Object readResolve() throws java.io.ObjectStreamException {
-               return INSTANCE;
-       }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterResourceManager.java
deleted file mode 100644
index fce2b87..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterResourceManager.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.clusterframework.messages;
-
-import akka.actor.ActorRef;
-import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
-import org.apache.flink.util.Preconditions;
-
-/**
- * This message signals that the resource manager wants to register at the 
JobManager leader. 
- */
-public class RegisterResourceManager implements RequiresLeaderSessionID, 
java.io.Serializable {
-       private static final long serialVersionUID = 1L;
-
-       private final ActorRef resourceManager;
-       
-       public RegisterResourceManager(ActorRef resourceManager) {
-               this.resourceManager = 
Preconditions.checkNotNull(resourceManager);
-       }
-       
-       public ActorRef resourceManager() {
-               return resourceManager;
-       }
-
-       @Override
-       public String toString() {
-               return "RegisterResourceManager " + resourceManager.path();
-       }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterResourceManagerSuccessful.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterResourceManagerSuccessful.java
deleted file mode 100644
index df72195..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterResourceManagerSuccessful.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.clusterframework.messages;
-
-import akka.actor.ActorRef;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
-
-import java.io.Serializable;
-import java.util.Collection;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Message that informs the resource manager that the JobManager accepted its 
registration.
- * Carries information about the JobManager, and about the TaskManagers that 
the JobManager
- * still has registered.
- */
-public class RegisterResourceManagerSuccessful implements 
RequiresLeaderSessionID, Serializable {
-
-       private static final long serialVersionUID = 817011779310941753L;
-
-       /** The JobManager which we registered with. */
-       private final ActorRef jobManager;
-
-       /** The list of registered TaskManagers that the JobManager currently 
knows */
-       private final Collection<ResourceID> currentlyRegisteredTaskManagers;
-
-       /**
-        * Creates a new message with a list of existing known TaskManagers.
-        * 
-        * @param currentlyRegisteredTaskManagers
-        *         The list of TaskManagers that the JobManager currently 
knows. 
-        */
-       public RegisterResourceManagerSuccessful(ActorRef jobManager,
-                       Collection<ResourceID> currentlyRegisteredTaskManagers)
-       {
-               this.jobManager = jobManager;
-               this.currentlyRegisteredTaskManagers = 
requireNonNull(currentlyRegisteredTaskManagers);
-       }
-
-       // 
------------------------------------------------------------------------
-
-
-       public ActorRef jobManager() {
-               return jobManager;
-       }
-
-       public Collection<ResourceID> currentlyRegisteredTaskManagers() {
-               return currentlyRegisteredTaskManagers;
-       }
-
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public String toString() {
-               return "RegisterResourceManagerSuccessful{" +
-                       "jobManager=" + jobManager +
-                       ", currentlyRegisteredTaskManagers=" + 
currentlyRegisteredTaskManagers +
-                       '}';
-       }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RemoveResource.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RemoveResource.java
deleted file mode 100644
index 08fa90e..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RemoveResource.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.clusterframework.messages;
-
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
-
-import java.io.Serializable;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Message sent to the ResourceManager by the JobManager to instruct to remove 
a resource.
- */
-public class RemoveResource implements RequiresLeaderSessionID, Serializable {
-       private static final long serialVersionUID = 1L;
-
-       /** The ID under which the resource is registered (for example 
container ID) */
-       private final ResourceID resourceId;
-
-       /**
-        * Constructor for a shutdown of a registered resource.
-        * @param resourceId The ID under which the resource is registered (for 
example container ID).
-        */
-       public RemoveResource(ResourceID resourceId) {
-               this.resourceId = requireNonNull(resourceId);
-       }
-
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Gets the ID under which the resource is registered (for example 
container ID).
-        * @return The resource ID
-        */
-       public ResourceID resourceId() {
-               return resourceId;
-       }
-
-
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public String toString() {
-               return "RemoveResource{" +
-                       "resourceId=" + resourceId +
-                       '}';
-       }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ResourceRemoved.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ResourceRemoved.java
deleted file mode 100644
index 1dfa0eb..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ResourceRemoved.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.clusterframework.messages;
-
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
-
-import java.io.Serializable;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Message sent to the JobManager by the Resource Manager to inform
- * about the removal of a resource.
- */
-public class ResourceRemoved implements RequiresLeaderSessionID, Serializable {
-       private static final long serialVersionUID = 1L;
-
-       /** The ID under which the resource is registered (for example 
container ID) */
-       private final ResourceID resourceId;
-
-       /** Optional message with details, for logging and debugging */
-       private final String message;
-
-       /**
-        * Constructor for a shutdown of a registered resource.
-        * @param resourceId The ID under which the resource is registered (for 
example container ID).
-        * @param message Optional message with details, for logging and 
debugging.
-        */
-       public ResourceRemoved(ResourceID resourceId, String message) {
-               this.resourceId = requireNonNull(resourceId);
-               this.message = message;
-       }
-
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Gets the ID under which the resource is registered (for example 
container ID).
-        * @return The resource ID
-        */
-       public ResourceID resourceId() {
-               return resourceId;
-       }
-
-       /**
-        * Gets the optional message with details, for logging and debugging.
-        * @return Optional message, may be null
-        */
-       public String message() {
-               return message;
-       }
-
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public String toString() {
-               return "ResourceRemoved{" +
-                       "resourceId=" + resourceId +
-                       ", message='" + message + '\'' +
-                       '}';
-       }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/SetWorkerPoolSize.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/SetWorkerPoolSize.java
deleted file mode 100644
index 705bf91..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/SetWorkerPoolSize.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.clusterframework.messages;
-
-import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
-
-/**
- * Message sent to the resource master actor to adjust the designated number of
- * workers it maintains.
- */
-public class SetWorkerPoolSize implements RequiresLeaderSessionID, 
java.io.Serializable{
-
-       private static final long serialVersionUID = -335911350781207609L;
-       
-       private final int numberOfWorkers;
-
-       public SetWorkerPoolSize(int numberOfWorkers) {
-               if (numberOfWorkers < 0) {
-                       throw new IllegalArgumentException("Number of workers 
must not be negative.");
-               }
-               this.numberOfWorkers = numberOfWorkers;
-       }
-
-       // 
------------------------------------------------------------------------
-       
-       public int numberOfWorkers() {
-               return numberOfWorkers;
-       }
-       
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public int hashCode() {
-               return numberOfWorkers;
-       }
-
-       @Override
-       public boolean equals(Object obj) {
-               return obj != null && obj.getClass() == SetWorkerPoolSize.class 
&& 
-                                       this.numberOfWorkers == 
((SetWorkerPoolSize) obj).numberOfWorkers;
-       }
-
-       @Override
-       public String toString() {
-               return "SetWorkerPoolSize (" + numberOfWorkers + ')';
-       }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ShutdownClusterAfterJob.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ShutdownClusterAfterJob.java
deleted file mode 100644
index add4d39..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ShutdownClusterAfterJob.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.clusterframework.messages;
-
-import org.apache.flink.api.common.JobID;
-
-/**
- * Message sent to the cluster framework master to signal it that the cluster
- * should be shut down upon completion of a certain job identified by a job id.
- */
-public class ShutdownClusterAfterJob implements java.io.Serializable {
-
-       private static final long serialVersionUID = -2723662264779569704L;
-
-       private final JobID jobId;
-
-       public ShutdownClusterAfterJob(JobID jobId) {
-               this.jobId = jobId;
-       }
-
-       public JobID jobId() {
-               return jobId;
-       }
-
-       @Override
-       public String toString() {
-               return "ShutdownClusterAfterJob { jobId=" + jobId + " } "; 
-       }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/StopCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/StopCluster.java
deleted file mode 100644
index f366d7c..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/StopCluster.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.clusterframework.messages;
-
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Generic message to signal the cluster framework to shut the cluster down.
- */
-public class StopCluster implements RequiresLeaderSessionID, 
java.io.Serializable {
-
-       private static final long serialVersionUID = -8957259342982181684L;
-
-       private final ApplicationStatus finalStatus;
-
-       private final String message;
-
-       public StopCluster(ApplicationStatus finalStatus, String message) {
-               this.finalStatus = requireNonNull(finalStatus);
-               this.message = message == null ? "" : message;
-       }
-
-       // 
------------------------------------------------------------------------
-
-       public ApplicationStatus finalStatus() {
-               return finalStatus;
-       }
-
-       public String message() {
-               return message;
-       }
-
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public String toString() {
-               return "StopApplication { message='" + message + "' }";
-       }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/StopClusterSuccessful.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/StopClusterSuccessful.java
deleted file mode 100644
index 7f9d855..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/StopClusterSuccessful.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.clusterframework.messages;
-
-import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
-
-/**
- * Generic message to signal the cluster listener that the cluster has been 
shut down.
- */
-public class StopClusterSuccessful implements RequiresLeaderSessionID, 
java.io.Serializable {
-
-       private static final long serialVersionUID = 42L;
-
-       private static StopClusterSuccessful INSTANCE = new 
StopClusterSuccessful();
-
-       /**
-        * Private constructor. Initial singleton in get() method.
-        */
-       private StopClusterSuccessful() {}
-
-       public static StopClusterSuccessful getInstance() {
-               return INSTANCE;
-       }
-
-       @Override
-       public String toString() {
-               return "StopClusterSuccessful{}";
-       }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/TriggerRegistrationAtJobManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/TriggerRegistrationAtJobManager.java
deleted file mode 100644
index d817d9c..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/TriggerRegistrationAtJobManager.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.clusterframework.messages;
-
-import akka.actor.ActorRef;
-import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
-
-import java.io.Serializable;
-
-/**
- * Causes the resource manager to try and apply at the leader JobManager.
- */
-public class TriggerRegistrationAtJobManager implements 
RequiresLeaderSessionID, Serializable {
-       private static final long serialVersionUID = 1L;
-       
-       private final String jobManagerAddress;
-       
-       public TriggerRegistrationAtJobManager(ActorRef jobManager) {
-               this(jobManager.path().toSerializationFormat());
-       }
-
-       public TriggerRegistrationAtJobManager(String jobManagerAddress) {
-               this.jobManagerAddress = jobManagerAddress;
-       }
-
-       public String jobManagerAddress() {
-               return jobManagerAddress;
-       }
-
-       @Override
-       public String toString() {
-               return "TriggerRegistrationAtJobManager " + jobManagerAddress;
-       }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/package-info.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/package-info.java
deleted file mode 100644
index d647dea..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/package-info.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * This package contains the actor messages that are sent between the
- * cluster resource framework and the JobManager, as well as the generic
- * messages sent between the cluster resource framework and the client.
- */
-package org.apache.flink.runtime.clusterframework.messages;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/PartitionProducerDisposedException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/PartitionProducerDisposedException.java
index 12f2433..1cf77d7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/PartitionProducerDisposedException.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/PartitionProducerDisposedException.java
@@ -19,10 +19,9 @@
 package org.apache.flink.runtime.jobmanager;
 
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import 
org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionProducerState;
 
 /**
- * Exception returned to a TaskManager on {@link RequestPartitionProducerState}
+ * Exception returned to a TaskManager on JobMaster requesting partition state,
  * if the producer of a partition has been disposed.
  */
 public class PartitionProducerDisposedException extends Exception {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/LeaderSessionMessageDecorator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/LeaderSessionMessageDecorator.java
deleted file mode 100644
index 6e49f44..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/LeaderSessionMessageDecorator.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.messages;
-
-import java.util.UUID;
-
-/**
- * Message decorator which wraps message which implement {@link 
RequiresLeaderSessionID} into
- * a {@link 
org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage}.
- */
-public class LeaderSessionMessageDecorator implements MessageDecorator {
-
-       private static final long serialVersionUID = 5359618147408392706L;
-       
-       /** Leader session ID with which the RequiresLeaderSessionID messages 
will be decorated */
-       private final UUID leaderSessionID;
-
-       /**
-        * Sets the leader session ID with which the messages will be decorated.
-        *
-        * @param leaderSessionID Leader session ID to be used for decoration
-        */
-       public LeaderSessionMessageDecorator(UUID leaderSessionID) {
-               this.leaderSessionID = leaderSessionID;
-       }
-
-       @Override
-       public Object decorate(Object message) {
-               if (message instanceof RequiresLeaderSessionID) {
-                       return new 
JobManagerMessages.LeaderSessionMessage(leaderSessionID, message);
-               } else {
-                       return message;
-               }
-       }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/MessageDecorator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/MessageDecorator.java
deleted file mode 100644
index 8bbab73..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/MessageDecorator.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.messages;
-
-/**
- * Interface for message decorators
- */
-public interface MessageDecorator extends java.io.Serializable {
-
-       /**
-        * Decorates a message
-        *
-        * @param message Message to decorate
-        * @return Decorated message
-        */
-       Object decorate(Object message);
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/RequiresLeaderSessionID.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/RequiresLeaderSessionID.java
deleted file mode 100644
index 7e6a473..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/RequiresLeaderSessionID.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.messages;
-
-/**
- * Marks messages to be sent wrapped in a
- * {@link 
org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage}
- */
-public interface RequiresLeaderSessionID {}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/StackTrace.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/StackTrace.java
deleted file mode 100644
index c041655..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/StackTrace.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.messages;
-
-import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.util.Preconditions;
-
-import java.io.Serializable;
-
-public class StackTrace implements Serializable {
-
-       private static final long serialVersionUID = -899464298250067416L;
-
-       private final InstanceID instanceId;
-       private final String stackTrace;
-
-       public StackTrace(InstanceID instanceId, String stackTrace) {
-               this.instanceId = Preconditions.checkNotNull(instanceId);
-               this.stackTrace = Preconditions.checkNotNull(stackTrace);
-       }
-
-       public InstanceID getInstanceId() {
-               return instanceId;
-       }
-
-       public String getStackTrace() {
-               return stackTrace;
-       }
-}
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
deleted file mode 100644
index c1227dc..0000000
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.messages
-
-import org.apache.flink.api.common.JobID
-import org.apache.flink.runtime.executiongraph.{ArchivedExecutionGraph, 
ExecutionGraph}
-
-/**
- * This object contains the archive specific messages.
- */
-object ArchiveMessages {
-  
-  case class ArchiveExecutionGraph(jobID: JobID, graph: ArchivedExecutionGraph)
-
-  /**
-   * Request the currently archived jobs in the archiver. The resulting 
response is [[ArchivedJobs]]
-   */
-  case object RequestArchivedJobs
-
-  /**
-   * Requests the number of finished, canceled, and failed jobs
-   */
-  case object RequestJobCounts
-
-  /**
-   * Request a specific ExecutionGraph by JobID. The response is 
[[RequestArchivedJob]]
-   * @param jobID
-   */
-  case class RequestArchivedJob(jobID: JobID)
-
-  case class ArchivedJob(job: Option[ArchivedExecutionGraph])
-
-  /**
-   * Response to [[RequestArchivedJobs]] message. The response contains the 
archived jobs.
-   * @param jobs
-   */
-  case class ArchivedJobs(jobs: Iterable[ArchivedExecutionGraph]){
-    def asJavaIterable: java.lang.Iterable[ArchivedExecutionGraph] = {
-      import scala.collection.JavaConverters._
-      jobs.asJava
-    }
-
-    def asJavaCollection: java.util.Collection[ArchivedExecutionGraph] = {
-      import scala.collection.JavaConverters._
-      jobs.asJavaCollection
-    }
-  }
-
-  // --------------------------------------------------------------------------
-  // Utility methods to allow simpler case object access from Java
-  // --------------------------------------------------------------------------
-  
-  def getRequestArchivedJobs : AnyRef = {
-    RequestArchivedJobs
-  }
-
-  def getRequestJobCounts : AnyRef = {
-    RequestJobCounts
-  }
-}
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala
deleted file mode 100644
index 59efa73..0000000
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.messages
-
-import java.text.SimpleDateFormat
-import java.util.{UUID, Date}
-
-import org.apache.flink.api.common.JobID
-import org.apache.flink.runtime.execution.ExecutionState
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
-import org.apache.flink.runtime.jobgraph.{JobStatus, JobVertexID}
-
-/**
- * This object contains the execution graph specific messages.
- */
-object ExecutionGraphMessages {
-
-  // --------------------------------------------------------------------------
-  //  Messages
-  // --------------------------------------------------------------------------
-  
-  /**
-   * Denotes the execution state change of an
-   * [[org.apache.flink.runtime.executiongraph.ExecutionVertex]]
-   *
-   * @param jobID to which the vertex belongs
-   * @param vertexID of the ExecutionJobVertex to which the ExecutionVertex 
belongs
-   * @param taskName
-   * @param totalNumberOfSubTasks denotes the number of parallel subtasks
-   * @param subtaskIndex denotes the index of the ExecutionVertex
-   * @param executionID
-   * @param newExecutionState
-   * @param timestamp of the execution state change
-   * @param optionalMessage
-   */
-  case class ExecutionStateChanged(
-      jobID: JobID,
-      vertexID: JobVertexID,
-      taskName: String,
-      totalNumberOfSubTasks: Int,
-      subtaskIndex: Int,
-      executionID: ExecutionAttemptID,
-      newExecutionState: ExecutionState,
-      timestamp: Long,
-      optionalMessage: String)
-    extends RequiresLeaderSessionID {
-
-    override def toString: String = {
-      val oMsg = if (optionalMessage != null) {
-        s"\n$optionalMessage"
-      } else {
-        ""
-      }
-      
-      s"${timestampToString(timestamp)}\t$taskName(${subtaskIndex +
-        1}/$totalNumberOfSubTasks) switched to $newExecutionState $oMsg"
-    }
-  }
-
-  // --------------------------------------------------------------------------
-  //  Utilities
-  // --------------------------------------------------------------------------
-  
-  private val DATE_FORMATTER: SimpleDateFormat = new 
SimpleDateFormat("MM/dd/yyyy HH:mm:ss")
-
-  private def timestampToString(timestamp: Long): String = {
-    DATE_FORMATTER.synchronized {
-      DATE_FORMATTER.format(new Date(timestamp))
-    }
-  }
-}
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
deleted file mode 100644
index 0206872..0000000
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ /dev/null
@@ -1,559 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.messages
-
-import java.net.URL
-import java.util.UUID
-
-import akka.actor.ActorRef
-import org.apache.flink.api.common.JobID
-import org.apache.flink.runtime.akka.ListeningBehaviour
-import org.apache.flink.runtime.blob.PermanentBlobKey
-import org.apache.flink.runtime.client.{JobStatusMessage, 
SerializedJobExecutionResult}
-import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.executiongraph.{AccessExecutionGraph, 
ExecutionAttemptID, ExecutionGraph}
-import org.apache.flink.runtime.instance.{Instance, InstanceID}
-import org.apache.flink.runtime.io.network.partition.ResultPartitionID
-import org.apache.flink.runtime.jobgraph.{IntermediateDataSetID, JobGraph, 
JobStatus, JobVertexID}
-import org.apache.flink.runtime.jobmanager.SubmittedJobGraph
-import org.apache.flink.runtime.messages.checkpoint.AbstractCheckpointMessage
-import org.apache.flink.util.SerializedThrowable
-
-import scala.collection.JavaConverters._
-
-/**
- * The job manager specific actor messages
- */
-object JobManagerMessages {
-
-  /** Wrapper class for leader session messages. Leader session messages 
implement the
-    * [[RequiresLeaderSessionID]] interface and have to be wrapped in a 
[[LeaderSessionMessage]],
-    * which also contains the current leader session ID.
-    *
-    * @param leaderSessionID Current leader session ID
-    * @param message [[RequiresLeaderSessionID]] message to be wrapped in a 
[[LeaderSessionMessage]]
-    */
-  case class LeaderSessionMessage(leaderSessionID: UUID, message: Any)
-
-  /**
-   * Submits a job to the job manager. Depending on the [[listeningBehaviour]],
-   * the sender registers for different messages. If 
[[ListeningBehaviour.DETACHED]], then
-   * it will only be informed whether the submission was successful or not. If
-   * [[ListeningBehaviour.EXECUTION_RESULT]], then it will additionally 
receive the execution
-   * result. If [[ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES]], 
then it will additionally
-   * receive the job status change notifications.
-   *
-   * The submission result will be sent back to the sender as a success 
message.
-   *
-   * @param jobGraph The job to be submitted to the JobManager
-   * @param listeningBehaviour Specifies to what the sender wants to listen 
(detached, execution
-   *                           result, execution result and state changes)
-   */
-  case class SubmitJob(
-      jobGraph: JobGraph,
-      listeningBehaviour: ListeningBehaviour)
-    extends RequiresLeaderSessionID
-
-  /**
-    * Registers the sender of the message as the client for the provided job 
identifier.
-    * This message is acknowledged by the JobManager with 
[[RegisterJobClientSuccess]]
-    * or [[JobNotFound]] if the job was not running.
-    * @param jobID The job id of the job
-    * @param listeningBehaviour The types of updates which will be sent to the 
sender
-    * after registration
-    */
-  case class RegisterJobClient(
-      jobID: JobID,
-      listeningBehaviour: ListeningBehaviour)
-    extends RequiresLeaderSessionID
-
-  /**
-   * Triggers the recovery of the job with the given ID.
-   *
-   * @param jobId ID of the job to recover
-   */
-  case class RecoverJob(jobId: JobID) extends RequiresLeaderSessionID
-
-  /**
-   * Triggers the submission of the recovered job
-   *
-   * @param submittedJobGraph Contains the submitted JobGraph
-   */
-  case class RecoverSubmittedJob(submittedJobGraph: SubmittedJobGraph)
-    extends RequiresLeaderSessionID
-
-  /**
-   * Triggers recovery of all available jobs.
-   */
-  case object RecoverAllJobs extends RequiresLeaderSessionID
-
-  /**
-   * Cancels a job with the given [[jobID]] at the JobManager. The result of 
the cancellation is
-   * sent back to the sender as a [[CancellationResponse]] message.
-   *
-   * @param jobID
-   */
-  case class CancelJob(jobID: JobID) extends RequiresLeaderSessionID
-
-  /**
-    * Cancels the job with the given [[jobID]] at the JobManager. Before 
cancellation a savepoint
-    * is triggered without any other checkpoints in between. The result of the 
cancellation is
-    * the path of the triggered savepoint on success or an exception.
-    *
-    * @param jobID ID of the job to cancel
-    * @param savepointDirectory Optional target directory for the savepoint.
-    *                           If no target directory is specified here, the
-    *                           cluster default is used.
-    */
-  case class CancelJobWithSavepoint(
-      jobID: JobID,
-      savepointDirectory: String = null)
-    extends RequiresLeaderSessionID
-
-  /**
-   * Requesting next input split for the
-   * [[org.apache.flink.runtime.executiongraph.ExecutionJobVertex]]
-   * of the job specified by [[jobID]]. The next input split is sent back to 
the sender as a
-   * [[NextInputSplit]] message.
-   *
-   * @param jobID
-   * @param vertexID
-   */
-  case class RequestNextInputSplit(
-      jobID: JobID,
-      vertexID: JobVertexID,
-      executionAttempt: ExecutionAttemptID)
-    extends RequiresLeaderSessionID
-
-  /**
-   * Contains the next input split for a task. This message is a response to
-   * 
[[org.apache.flink.runtime.messages.JobManagerMessages.RequestNextInputSplit]].
-   *
-   * @param splitData
-   */
-  case class NextInputSplit(splitData: Array[Byte])
-
-  /**
-    * Requests the execution state of the execution producing a result 
partition.
-    *
-    * @param jobId                 ID of the job the partition belongs to.
-    * @param intermediateDataSetId ID of the parent intermediate data set.
-    * @param resultPartitionId     ID of the result partition to check. This
-    *                              identifies the producing execution and
-    *                              partition.
-    */
-  case class RequestPartitionProducerState(
-      jobId: JobID,
-      intermediateDataSetId: IntermediateDataSetID,
-      resultPartitionId: ResultPartitionID)
-    extends RequiresLeaderSessionID
-
-  /**
-   * Notifies the org.apache.flink.runtime.jobmanager.JobManager about 
available data for a
-   * produced partition.
-   * <p>
-   * There is a call to this method for each
-   * [[org.apache.flink.runtime.executiongraph.ExecutionVertex]] instance once 
per produced
-   * [[org.apache.flink.runtime.io.network.partition.ResultPartition]] 
instance,
-   * either when first producing data (for pipelined executions) or when all 
data has been produced
-   * (for staged executions).
-   * <p>
-   * The org.apache.flink.runtime.jobmanager.JobManager then can decide when 
to schedule the
-   * partition consumers of the given session.
-   *
-   * @see [[org.apache.flink.runtime.io.network.partition.ResultPartition]]
-   */
-  case class ScheduleOrUpdateConsumers(jobId: JobID, partitionId: 
ResultPartitionID)
-    extends RequiresLeaderSessionID
-
-  /**
-   * Requests the current [[JobStatus]] of the job identified by [[jobID]]. 
This message triggers
-   * as response a [[JobStatusResponse]] message.
-   *
-   * @param jobID
-   */
-  case class RequestJobStatus(jobID: JobID)
-
-  sealed trait JobStatusResponse {
-    def jobID: JobID
-  }
-
-  /**
-   * Denotes the current [[JobStatus]] of the job with [[jobID]].
-   *
-   * @param jobID
-   * @param status
-   */
-  case class CurrentJobStatus(jobID: JobID, status: JobStatus) extends 
JobStatusResponse
-
-  /**
-   * Requests the number of currently registered task manager at the job 
manager. The result is
-   * sent back to the sender as an [[Int]].
-   */
-  case object RequestNumberRegisteredTaskManager
-
-  /**
-   * Requests the maximum number of slots available to the job manager. The 
result is sent back
-   * to the sender as an [[Int]].
-   */
-  case object RequestTotalNumberOfSlots
-
-  /**
-    * Requests all entities necessary for reconstructing a job class loader
-    * May respond with [[ClassloadingProps]] or [[JobNotFound]]
-    * @param jobId The job id of the registered job
-    */
-  case class RequestClassloadingProps(jobId: JobID)
-
-  /**
-    * Response to [[RequestClassloadingProps]]
-    * @param blobManagerPort The port of the blobManager
-    * @param requiredJarFiles The blob keys of the required jar files
-    * @param requiredClasspaths The urls of the required classpaths
-    */
-  case class ClassloadingProps(blobManagerPort: Integer,
-                               requiredJarFiles: 
java.util.Collection[PermanentBlobKey],
-                               requiredClasspaths: java.util.Collection[URL])
-
-  /**
-   * Requests the port of the blob manager from the job manager. The result is 
sent back to the
-   * sender as an [[Int]].
-   */
-  case object RequestBlobManagerPort
-
-  /** Requests the current leader session ID of the job manager. The result is 
sent back to the
-    * sender as an [[ResponseLeaderSessionID]]
-    */
-  case object RequestLeaderSessionID
-
-  /** Response to the [[RequestLeaderSessionID]] message.
-    *
-    * @param leaderSessionID
-    */
-  case class ResponseLeaderSessionID(leaderSessionID: UUID)
-
-  /**
-   * Denotes a successful job submission.
-   * @param jobId Ths job's ID.
-   */
-  case class JobSubmitSuccess(jobId: JobID)
-
-  /**
-    * Denotes a successful registration of a JobClientActor for a running job
-    * @param jobId The job id of the registered job
-    */
-  case class RegisterJobClientSuccess(jobId: JobID)
-
-  /**
-    * Denotes messages which contain the result of a completed job execution
-    */
-  sealed trait JobResultMessage
-
-  /**
-   * Denotes a successful job execution.
-   * @param result The result of the job execution, in serialized form.
-   */
-  case class JobResultSuccess(result: SerializedJobExecutionResult) extends 
JobResultMessage
-
-  /**
-   * Denotes an unsuccessful job execution.
-   * @param cause The exception that caused the job to fail, in serialized 
form.
-   */
-  case class JobResultFailure(cause: SerializedThrowable) extends 
JobResultMessage
-
-
-  sealed trait CancellationResponse{
-    def jobID: JobID
-  }
-
-  /**
-   * Denotes a successful job cancellation
-   * @param jobID
-   */
-  case class CancellationSuccess(
-    jobID: JobID,
-    savepointPath: String = null) extends CancellationResponse
-
-  /**
-   * Denotes a failed job cancellation
-   * @param jobID
-   * @param cause
-   */
-  case class CancellationFailure(jobID: JobID, cause: Throwable) extends 
CancellationResponse
-
-  /**
-   * Requests all currently running jobs from the job manager. This message 
triggers a
-   * [[RunningJobs]] response.
-   */
-  case object RequestRunningJobs
-
-  /**
-   * This message is the response to the [[RequestRunningJobs]] message. It 
contains all
-   * execution graphs of the currently running jobs.
-   */
-  case class RunningJobs(runningJobs: Iterable[ExecutionGraph]) {
-    def this() = this(Seq())
-    def asJavaIterable: java.lang.Iterable[ExecutionGraph] = {
-      runningJobs.asJava
-    }
-  }
-
-  /**
-   * Requests the status of all currently running jobs from the job manager.
-   * This message triggers a [[RunningJobsStatus]] response.
-   */
-  case object RequestRunningJobsStatus
-
-  case class RunningJobsStatus(runningJobs: Iterable[JobStatusMessage]) {
-    def this() = this(Seq())
-
-    def getStatusMessages(): java.util.List[JobStatusMessage] = {
-      new java.util.ArrayList[JobStatusMessage](runningJobs.asJavaCollection)
-    }
-  }
-
-  /**
-   * Requests the execution graph of a specific job identified by [[jobID]].
-   * The result is sent back to the sender as a [[JobResponse]].
-   */
-  case class RequestJob(jobID: JobID)
-
-  sealed trait JobResponse{
-    def jobID: JobID
-  }
-
-  /**
-   * Contains the [[executionGraph]] of a job with [[jobID]]. This is the 
response to
-   * [[RequestJob]] if the job runs or is archived.
-   *
-   * @param jobID
-   * @param executionGraph
-   */
-  case class JobFound(jobID: JobID, executionGraph: AccessExecutionGraph) 
extends JobResponse
-
-  /**
-   * Denotes that there is no job with [[jobID]] retrievable. This message can 
be the response of
-   * [[RequestJob]], [[RequestJobStatus]] or [[RegisterJobClient]].
-   *
-   * @param jobID
-   */
-  case class JobNotFound(jobID: JobID) extends JobResponse with 
JobStatusResponse
-
-  /** Triggers the removal of the job with the given job ID
-    *
-    * @param jobID
-    * @param removeJobFromStateBackend true if the job has properly finished
-    */
-  case class RemoveJob(jobID: JobID, removeJobFromStateBackend: Boolean = true)
-    extends RequiresLeaderSessionID
-
-  /**
-   * Removes the job belonging to the job identifier from the job manager and 
archives it.
-   * @param jobID The job identifier
-   */
-  case class RemoveCachedJob(jobID: JobID)
-
-  /**
-   * Requests the instances of all registered task managers.
-   */
-  case object RequestRegisteredTaskManagers
-
-  /**
-   * Contains the [[Instance]] objects of all registered task managers. It is 
the response to the
-   * message [[RequestRegisteredTaskManagers]].
-   *
-   * @param taskManagers
-   */
-  case class RegisteredTaskManagers(taskManagers: Iterable[Instance]){
-    def asJavaIterable: java.lang.Iterable[Instance] = {
-      import scala.collection.JavaConverters._
-      taskManagers.asJava
-    }
-
-    def asJavaCollection: java.util.Collection[Instance] = {
-      import scala.collection.JavaConverters._
-      taskManagers.asJavaCollection
-    }
-  }
-
-  /**
-   * Requests the [[Instance]] object of the task manager with the given 
instance ID
-   *
-   * @param resourceId identifying the TaskManager which shall be retrieved
-   */
-  case class RequestTaskManagerInstance(resourceId: ResourceID)
-
-  /**
-   * Returns the [[Instance]] object of the requested task manager. This is in 
response to
-   * [[RequestTaskManagerInstance]]
-   */
-  case class TaskManagerInstance(instance: Option[Instance])
-
-  /**
-   * Requests stack trace messages of the task manager
-   *
-   * @param instanceID Instance ID of the task manager
-   */
-  case class RequestStackTrace(instanceID: InstanceID)
-
-  /**
-   * Requests the current state of the job manager
-   */
-  case object RequestJobManagerStatus
-
-  /**
-   * Response to RequestJobManagerStatus
-   */
-  sealed trait JobManagerStatus
-
-  case object JobManagerStatusAlive extends JobManagerStatus
-
-  /** Grants leadership to the receiver. The message contains the new leader 
session id.
-    *
-     * @param leaderSessionID
-    */
-  case class GrantLeadership(leaderSessionID: Option[UUID])
-
-  /** Revokes leadership of the receiver.
-    */
-  case object RevokeLeadership
-
-  /** Requests the ActorRef of the archiver */
-  case object RequestArchive
-
-  /** Response containing the ActorRef of the archiver */
-  case class ResponseArchive(actor: ActorRef)
-
-  /** Request for the JobManager's REST endpoint address */
-  case object RequestRestAddress
-
-  /**
-    * Triggers a savepoint for the specified job.
-    *
-    * This is not a subtype of [[AbstractCheckpointMessage]], because it is a
-    * control-flow message, which is *not* part of the checkpointing mechanism
-    * of triggering and acknowledging checkpoints.
-    *
-    * @param jobId The JobID of the job to trigger the savepoint for.
-    * @param savepointDirectory Optional target directory
-    */
-  case class TriggerSavepoint(
-      jobId: JobID,
-      savepointDirectory : Option[String] = Option.empty) extends 
RequiresLeaderSessionID
-
-  /**
-    * Response after a successful savepoint trigger containing the savepoint 
path.
-    *
-    * @param jobId The job ID for which the savepoint was triggered.
-    * @param savepointPath The path of the savepoint.
-    */
-  case class TriggerSavepointSuccess(
-    jobId: JobID,
-    checkpointId: Long,
-    savepointPath: String,
-    triggerTime: Long
-  )
-
-  /**
-    * Response after a failed savepoint trigger containing the failure cause.
-    *
-    * @param jobId The job ID for which the savepoint was triggered.
-    * @param cause The cause of the failure.
-    */
-  case class TriggerSavepointFailure(jobId: JobID, cause: Throwable)
-
-  /**
-    * Disposes a savepoint.
-    *
-    * @param savepointPath The path of the savepoint to dispose.
-    */
-  case class DisposeSavepoint(
-      savepointPath: String)
-    extends RequiresLeaderSessionID
-
-  /** Response after a successful savepoint dispose. */
-  case object DisposeSavepointSuccess
-
-  /**
-    * Response after a failed savepoint dispose containing the failure cause.
-    *
-    * @param cause The cause of the failure.
-    */
-  case class DisposeSavepointFailure(cause: Throwable)
-
-  // --------------------------------------------------------------------------
-  // Utility methods to allow simpler case object access from Java
-  // --------------------------------------------------------------------------
-
-  def getRequestJobStatus(jobId : JobID) : AnyRef = {
-    RequestJobStatus(jobId)
-  }
-
-  def getRequestNumberRegisteredTaskManager : AnyRef = {
-    RequestNumberRegisteredTaskManager
-  }
-
-  def getRequestTotalNumberOfSlots : AnyRef = {
-    RequestTotalNumberOfSlots
-  }
-
-  def getRequestBlobManagerPort : AnyRef = {
-    RequestBlobManagerPort
-  }
-
-  def getRequestRunningJobs : AnyRef = {
-    RequestRunningJobs
-  }
-
-  def getRequestRunningJobsStatus : AnyRef = {
-    RequestRunningJobsStatus
-  }
-
-  def getRequestRegisteredTaskManagers : AnyRef = {
-    RequestRegisteredTaskManagers
-  }
-
-  def getRequestJobManagerStatus : AnyRef = {
-    RequestJobManagerStatus
-  }
-
-  def getJobManagerStatusAlive : AnyRef = {
-    JobManagerStatusAlive
-  }
-
-  def getRequestLeaderSessionID: AnyRef = {
-    RequestLeaderSessionID
-  }
-
-  def getRequestArchive: AnyRef = {
-    RequestArchive
-  }
-
-  def getRecoverAllJobs: AnyRef = {
-    RecoverAllJobs
-  }
-
-  def getRequestRestAddress: AnyRef = {
-    RequestRestAddress
-  }
-
-  def getDisposeSavepointSuccess: AnyRef = {
-    DisposeSavepointSuccess
-  }
-}
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala
deleted file mode 100644
index 1103a69..0000000
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.messages
-
-import org.apache.flink.runtime.instance.InstanceID
-
-/**
- * Generic messages between JobManager, TaskManager, JobClient.
- */
-object Messages {
-
-  /**
-   * Signals that the receiver (JobManager/TaskManager) shall disconnect the 
sender.
-   *
-   * The TaskManager may send this on shutdown to let the JobManager realize 
the TaskManager
-   * loss more quickly.
-   *
-   * The JobManager may send this message to its TaskManagers to let them 
clean up their
-   * tasks that depend on the JobManager and go into a clean state.
-   *
-   * @param cause The reason for disconnecting, to be displayed in log and 
error messages.
-   */
-  case class Disconnect(instanceId: InstanceID, cause: Exception) extends 
RequiresLeaderSessionID
-}
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
deleted file mode 100644
index 5648bc6..0000000
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.messages
-
-import java.util.UUID
-
-import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.instance.{HardwareDescription, InstanceID}
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation
-
-import scala.concurrent.duration.{Deadline, FiniteDuration}
-
-/**
- * A set of messages between TaskManager and JobManager to handle the
- * registration of the TaskManager at the JobManager.
- */
-object RegistrationMessages {
-
-  /**
-   * Marker trait for registration messages.
-   */
-  trait RegistrationMessage extends RequiresLeaderSessionID {}
-
-  /**
-   * Triggers the TaskManager to attempt a registration at the JobManager.
-   *
-   * @param jobManagerURL Akka URL to the JobManager
-   * @param timeout The timeout for the message. The next retry will double 
this timeout.
-   * @param deadline Optional deadline until when the registration must be 
completed.
-   * @param attempt The attempt number, for logging.
-   * @param registrationRun UUID of the current registration run to filter out 
outdated runs
-   */
-  case class TriggerTaskManagerRegistration(
-      jobManagerURL: String,
-      timeout: FiniteDuration,
-      deadline: Option[Deadline],
-      attempt: Int,
-      registrationRun: UUID)
-    extends RegistrationMessage
-
-  /**
-   * Registers a task manager at the JobManager. A successful registration is 
acknowledged by
-   * [[AcknowledgeRegistration]].
-   *
-   * @param connectionInfo The TaskManagers connection information.
-   * @param resources The TaskManagers resources.
-   * @param numberOfSlots The number of processing slots offered by the 
TaskManager.
-   */
-  case class RegisterTaskManager(
-                                  resourceId: ResourceID,
-                                  connectionInfo: TaskManagerLocation,
-                                  resources: HardwareDescription,
-                                  numberOfSlots: Int)
-    extends RegistrationMessage
-
-  /**
-   * Denotes the successful registration of a task manager at the JobManager. 
This is the
-   * response triggered by the [[RegisterTaskManager]] message when the 
JobManager has registered
-   * the task manager with the resource manager.
-   *
-   * @param instanceID The instance ID under which the TaskManager is 
registered at the
-   *                   JobManager.
-   * @param blobPort The server port where the JobManager's BLOB service runs.
-   */
-  case class AcknowledgeRegistration(
-      instanceID: InstanceID,
-      blobPort: Int)
-    extends RegistrationMessage
-
-  /**
-   * Denotes that the TaskManager has already been registered at the 
JobManager.
-   *
-   * @param instanceID The instance ID under which the TaskManager is 
registered.
-   * @param blobPort The server port where the JobManager's BLOB service runs.
-   */
-  case class AlreadyRegistered(
-      instanceID: InstanceID,
-      blobPort: Int)
-    extends RegistrationMessage
-
-  /**
-   * Denotes the unsuccessful registration of a task manager at the 
JobManager. This is the
-   * response triggered by the [[RegisterTaskManager]] message.
-   *
-   * @param reason Reason why the task manager registration was refused
-   */
-  case class RefuseRegistration(reason: Throwable) extends RegistrationMessage
-
-}
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/StackTraceSampleMessages.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/StackTraceSampleMessages.scala
deleted file mode 100644
index 3a7c057..0000000
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/StackTraceSampleMessages.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.messages
-
-import akka.actor.ActorRef
-import org.apache.flink.api.common.time.Time
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
-
-import scala.concurrent.duration.FiniteDuration
-
-/**
-  * A set of messages exchanged with task manager instances in order to sample
-  * the stack traces of running tasks.
-  */
-object StackTraceSampleMessages {
-
-  trait StackTraceSampleMessages
-
-  /**
-    * Triggers the sampling of a running task (sent by the job manager to the
-    * task managers).
-    *
-    * @param sampleId ID of this sample.
-    * @param executionId ID of the task to sample.
-    * @param numSamples Number of stack trace samples to collect.
-    * @param delayBetweenSamples Delay between consecutive samples.
-    * @param maxStackTraceDepth Maximum depth of the stack trace. 0 indicates
-    *                           no maximum and collects the complete stack
-    *                           trace.
-    */
-  case class TriggerStackTraceSample(
-      sampleId: Int,
-      executionId: ExecutionAttemptID,
-      numSamples: Int,
-      delayBetweenSamples: Time,
-      maxStackTraceDepth: Int = 0)
-    extends StackTraceSampleMessages with java.io.Serializable
-
-  /**
-    * Task manager internal sample message.
-    *
-    * @param sampleId ID of the this sample.
-    * @param executionId ID of the task to sample.
-    * @param delayBetweenSamples Delay between consecutive samples.
-    * @param maxStackTraceDepth Maximum depth of the stack trace. 0 indicates
-    *                           no maximum and collects the complete stack
-    *                           trace.
-    * @param numRemainingSamples Number of remaining samples before this
-    *                            sample is finished.
-    * @param currentTraces The current list of gathered stack traces.
-    * @param sender Actor triggering this sample (receiver of result).
-    */
-  case class SampleTaskStackTrace(
-      sampleId: Int,
-      executionId: ExecutionAttemptID,
-      delayBetweenSamples: Time,
-      maxStackTraceDepth: Int,
-      numRemainingSamples: Int,
-      currentTraces: java.util.List[Array[StackTraceElement]],
-      sender: ActorRef)
-    extends StackTraceSampleMessages
-
-}
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala
deleted file mode 100644
index d984618..0000000
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.messages
-
-import java.util
-
-import org.apache.flink.runtime.deployment.{InputChannelDeploymentDescriptor, 
TaskDeploymentDescriptor}
-import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, 
PartitionInfo}
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID
-import org.apache.flink.runtime.taskmanager.TaskExecutionState
-
-/**
- * A set of messages that control the deployment and the state of Tasks 
executed
- * on the TaskManager.
- */
-object TaskMessages {
-
-  /**
-   * Marker trait for task messages.
-   */
-  trait TaskMessage
-
-  // --------------------------------------------------------------------------
-  //  Starting and stopping Tasks
-  // --------------------------------------------------------------------------
-
-  /**
-   * Submits a task to the task manager. The result is to this message is a
-   * [[TaskOperationResult]] message.
-   *
-   * @param tasks Descriptor which contains the information to start the task.
-   */
-  case class SubmitTask(tasks: TaskDeploymentDescriptor)
-    extends TaskMessage with RequiresLeaderSessionID
-
-  /**
-   * Cancels the task associated with [[attemptID]]. The result is sent back 
to the sender as a
-   * [[TaskOperationResult]] message.
-   *
-   * @param attemptID The task's execution attempt ID.
-   */
-  case class CancelTask(attemptID: ExecutionAttemptID)
-    extends TaskMessage with RequiresLeaderSessionID
-
-  /**
-   * Triggers a fail of specified task from the outside (as opposed to the 
task throwing
-   * an exception itself) with the given exception as the cause.
-   *
-   * @param executionID The task's execution attempt ID.
-   * @param cause The reason for the external failure.
-   */
-  case class FailTask(executionID: ExecutionAttemptID, cause: Throwable)
-    extends TaskMessage
-
-  /**
-   * Notifies the TaskManager that the task has reached its final state,
-   * either FINISHED, CANCELED, or FAILED.
-   *
-   * @param executionID The task's execution attempt ID.
-   */
-  case class TaskInFinalState(executionID: ExecutionAttemptID)
-    extends TaskMessage
-
-
-  // --------------------------------------------------------------------------
-  //  Updates to Intermediate Results
-  // --------------------------------------------------------------------------
-
-  /**
-   * Base class for messages that update the information about location of 
input partitions
-   */
-  abstract sealed class UpdatePartitionInfo extends TaskMessage with 
RequiresLeaderSessionID {
-    def executionID: ExecutionAttemptID
-  }
-
-  /**
-   *
-   * @param executionID The task's execution attempt ID.
-   * @param resultId The input reader to update.
-   * @param partitionInfo The partition info update.
-   */
-  case class UpdateTaskSinglePartitionInfo(
-      executionID: ExecutionAttemptID,
-      resultId: IntermediateDataSetID,
-      partitionInfo: InputChannelDeploymentDescriptor)
-    extends UpdatePartitionInfo
-
-  /**
-   *
-   * @param executionID The task's execution attempt ID.
-   * @param partitionInfos List of input gates with channel descriptors to 
update.
-   */
-  case class UpdateTaskMultiplePartitionInfos(
-      executionID: ExecutionAttemptID,
-      partitionInfos: java.lang.Iterable[PartitionInfo])
-    extends UpdatePartitionInfo
-
-  /**
-   * Fails (and releases) all intermediate result partitions identified by
-   * [[executionID]] from the task manager.
-   *
-   * @param executionID The task's execution attempt ID.
-   */
-  case class FailIntermediateResultPartitions(executionID: ExecutionAttemptID)
-    extends TaskMessage with RequiresLeaderSessionID
-
-
-  // --------------------------------------------------------------------------
-  //  Report Messages
-  // --------------------------------------------------------------------------
-
-  /**
-   * Denotes a state change of a task at the JobManager. The update success is 
acknowledged by a
-   * boolean value which is sent back to the sender.
-   *
-   * @param taskExecutionState The changed task state
-   */
-  case class UpdateTaskExecutionState(taskExecutionState: TaskExecutionState)
-    extends TaskMessage with RequiresLeaderSessionID
-
-  // --------------------------------------------------------------------------
-  //  Utility Functions
-  // --------------------------------------------------------------------------
-
-  def createUpdateTaskMultiplePartitionInfos(
-      executionID: ExecutionAttemptID,
-      resultIDs: java.util.List[IntermediateDataSetID],
-      partitionInfos: java.util.List[InputChannelDeploymentDescriptor])
-    : UpdateTaskMultiplePartitionInfos = {
-
-    require(resultIDs.size() == partitionInfos.size(),
-      "ResultIDs must have the same length as partitionInfos.")
-
-    val partitionInfoList = new util.ArrayList[PartitionInfo](resultIDs.size())
-
-    for (i <- 0 until resultIDs.size()) {
-      partitionInfoList.add(new PartitionInfo(resultIDs.get(i), 
partitionInfos.get(i)))
-    }
-
-    new UpdateTaskMultiplePartitionInfos(
-      executionID,
-      partitionInfoList)
-  }
-}
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
deleted file mode 100644
index acccd4e..0000000
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.messages
-
-import java.util.UUID
-import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
-import org.apache.flink.runtime.instance.InstanceID
-
-/**
- * Miscellaneous actor messages exchanged with the TaskManager.
- */
-object TaskManagerMessages {
-  
-  /**
-   * This message informs the TaskManager about a fatal error that prevents
-   * it from continuing.
-   * 
-   * @param description The description of the problem
-   */
-  case class FatalError(description: String, cause: Throwable)
-  
-  /**
-   * Tells the task manager to send a heartbeat message to the job manager.
-   */
-  case object SendHeartbeat {
-
-    /**
-     * Accessor for the case object instance, to simplify Java 
interoperability.
-     *
-     * @return The SendHeartbeat case object instance.
-     */
-    def get() : SendHeartbeat.type = SendHeartbeat
-  }
-
-  /**
-   * Reports liveliness of the TaskManager instance with the given instance ID 
to the
-   * This message is sent to the job.
-   *
-   * @param instanceID The instance ID of the reporting TaskManager.
-   * @param accumulators Accumulators of tasks serialized as Tuple2[internal, 
user-defined]
-   */
-  case class Heartbeat(instanceID: InstanceID, accumulators: 
Seq[AccumulatorSnapshot])
-
-
-  // --------------------------------------------------------------------------
-  //  Reporting the current TaskManager stack trace
-  // --------------------------------------------------------------------------
-
-  /**
-   * Tells the TaskManager to send a stack trace of all threads to the sender.
-   * The response to this message is the [[StackTrace]] message.
-   */
-  case object SendStackTrace {
-
-    /**
-     * Accessor for the case object instance, to simplify Java 
interoperability.
-     *
-     * @return The SendStackTrace case object instance.
-     */
-    def get() : SendStackTrace.type = SendStackTrace
-  }
-
-  /**
-   * Communicates the stack trace of the TaskManager with the given ID.
-   * This message is the response to [[SendStackTrace]].
-   *
-   * @param instanceID The ID of the responding task manager.
-   * @param stackTrace The stack trace, as a string.
-   */
-  case class StackTrace(instanceID: InstanceID, stackTrace: String)
-
-
-  // --------------------------------------------------------------------------
-  //  Utility messages used for notifications during TaskManager startup
-  // --------------------------------------------------------------------------
-
-  /**
-   * Requests a notification from the task manager as soon as the task manager 
has been
-   * registered at a job manager. Once the task manager is registered at a job 
manager a
-   * [[RegisteredAtJobManager]] message will be sent to the sender.
-   */
-  case object NotifyWhenRegisteredAtJobManager
-
-  /**
-   * Acknowledges that the task manager has been successfully registered at 
any job manager. This
-   * message is a response to [[NotifyWhenRegisteredAtJobManager]] and 
contains the current leader
-   * session id.
-   */
-  case class RegisteredAtJobManager(leaderId: UUID)
-
-  /** Tells the address of the new leading 
org.apache.flink.runtime.jobmanager.JobManager
-    * and the new leader session ID.
-    *
-    * @param jobManagerAddress Address of the new leading JobManager
-    * @param leaderSessionID New leader session ID
-    */
-  case class JobManagerLeaderAddress(jobManagerAddress: String, 
leaderSessionID: UUID)
-
-  /** Trait do differentiate which log file is requested */
-  sealed trait LogTypeRequest
-
-  /** Indicates a request for the .log file */
-  case object LogFileRequest extends LogTypeRequest
-
-  /** Indicates a request for the .out file */
-  case object StdOutFileRequest extends LogTypeRequest
-
-  /** Requests the TaskManager to upload either his log/stdout file to the 
Blob store 
-    * param requestType LogTypeRequest indicating which file is requested
-    */
-  case class RequestTaskManagerLog(requestType : LogTypeRequest)
-
-  /** Requests the number of active connections at the ConnectionManager */
-  case object RequestNumActiveConnections
-
-  case class ResponseNumActiveConnections(number: Int)
-
-  /** Requests the number of broadcast variables with references */
-  case object RequestBroadcastVariablesWithReferences
-
-  case class ResponseBroadcastVariablesWithReferences(number: Int)
-
-
-  // --------------------------------------------------------------------------
-  //  Utility getters for case objects to simplify access from Java
-  // --------------------------------------------------------------------------
-
-  /**
-   * Accessor for the case object instance, to simplify Java interoperability.
-   *
-   * @return The NotifyWhenRegisteredAtJobManager case object instance.
-   */
-  def getNotifyWhenRegisteredAtJobManagerMessage:
-  NotifyWhenRegisteredAtJobManager.type = NotifyWhenRegisteredAtJobManager
-
-  /**
-    * Accessor for the case object instance, to simplify Java interoperability.
-    * @return The RequestTaskManagerLog case object instance.
-    */
-  def getRequestTaskManagerLog(): AnyRef = {
-    RequestTaskManagerLog(LogFileRequest)
-  }
-
-  /**
-    * Accessor for the case object instance, to simplify Java interoperability.
-    * @return The RequestTaskManagerStdout case object instance.
-    */
-  def getRequestTaskManagerStdout(): AnyRef = {
-    RequestTaskManagerLog(StdOutFileRequest)
-  }
-
-  /**
-    * Accessor for the case object instance, to simplify Java interoperability.
-    * @return The RequestBroadcastVariablesWithReferences case object instance.
-    */
-  def getRequestBroadcastVariablesWithReferences(): 
RequestBroadcastVariablesWithReferences.type = {
-    RequestBroadcastVariablesWithReferences
-  }
-
-  /**
-    * Accessor for the case object instance, to simplify Java interoperability.
-    * @return The RequestNumActiveConnections case object instance.
-    */
-  def getRequestNumActiveConnections(): RequestNumActiveConnections.type  = {
-    RequestNumActiveConnections
-  }
-}
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/accumulators/AccumulatorMessages.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/accumulators/AccumulatorMessages.scala
deleted file mode 100644
index 9ed01aa..0000000
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/accumulators/AccumulatorMessages.scala
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.messages.accumulators
-
-import org.apache.flink.api.common.JobID
-import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult
-import org.apache.flink.util.{OptionalFailure, SerializedValue}
-
-/**
- * Base trait of all accumulator messages
- */
-sealed trait AccumulatorMessage {
-
-  /** ID of the job that the accumulator belongs to */
-  val jobID: JobID
-}
-
-/**
- * Base trait of responses to [[RequestAccumulatorResults]]
- */
-sealed trait AccumulatorResultsResponse extends AccumulatorMessage
-
-/**
- * Requests the accumulator results of the job identified by [[jobID]] from 
the job manager.
- * The result is sent back to the sender as a [[AccumulatorResultsResponse]] 
message.
- *
- * @param jobID Job Id of the job that the accumulator belongs to
- */
-case class RequestAccumulatorResults(jobID: JobID)
-  extends AccumulatorMessage
-
-/**
- * Requests the accumulator results of the job as strings. This is used to 
request accumulators to
- * be displayed for example in the web frontend. Because it transports only
- * Strings, not custom objects, it is safe for user-defined types and class 
loading.
- *
- * @param jobID Job Id of the job that the accumulator belongs to
- */
-case class RequestAccumulatorResultsStringified(jobID: JobID)
-  extends AccumulatorMessage
-
-/**
- * Contains the retrieved accumulator results from the job manager. This 
response is triggered
- * by [[RequestAccumulatorResults]].
- *
- * @param jobID Job Id of the job that the accumulator belongs to
- * @param result The accumulator result values, in serialized form.
- */
-case class AccumulatorResultsFound(
-    jobID: JobID,
-    result: java.util.Map[String, SerializedValue[OptionalFailure[Object]]])
-  extends AccumulatorResultsResponse
-
-/**
- * Contains the retrieved accumulator result strings from the job manager.
- *
- * @param jobID Job Id of the job that the accumulator belongs to
- * @param result The accumulator result values, in string form.
- */
-case class AccumulatorResultStringsFound(jobID: JobID,
-                                         result: 
Array[StringifiedAccumulatorResult])
-  extends AccumulatorResultsResponse
-
-/**
- * Denotes that no accumulator results for [[jobID]] could be found at the job 
manager.
- *
- * @param jobID Job Id of the job that the accumulators were requested for.
- */
-case class AccumulatorResultsNotFound(jobID: JobID)
-  extends AccumulatorResultsResponse
-
-/**
- * Denotes that the accumulator results for [[jobID]] could be obtained from
- * the JobManager because of an exception.
- *
- * @param jobID Job Id of the job that the accumulators were requested for.
- * @param cause Reason why the accumulators were not found.
- */
-case class AccumulatorResultsErroneous(jobID: JobID, cause: Exception)
-  extends AccumulatorResultsResponse
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorTest.java
index ed98be5..8693b16 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import 
org.apache.flink.runtime.messages.StackTraceSampleMessages.TriggerStackTraceSample;
 import org.apache.flink.runtime.messages.StackTraceSampleResponse;
 import org.apache.flink.util.TestLogger;
 
@@ -94,16 +93,6 @@ public class StackTraceSampleCoordinatorTest extends 
TestLogger {
 
                // Verify messages have been sent
                for (ExecutionVertex vertex : vertices) {
-                       ExecutionAttemptID expectedExecutionId = vertex
-                                       
.getCurrentExecutionAttempt().getAttemptId();
-
-                       TriggerStackTraceSample expectedMsg = new 
TriggerStackTraceSample(
-                                       0,
-                                       expectedExecutionId,
-                                       numSamples,
-                                       delayBetweenSamples,
-                                       maxStackTraceDepth);
-
                        Mockito.verify(vertex.getCurrentExecutionAttempt())
                                .requestStackTraceSample(Matchers.eq(0), 
Matchers.eq(numSamples), Matchers.eq(delayBetweenSamples), 
Matchers.eq(maxStackTraceDepth), Matchers.any(Time.class));
                }

Reply via email to