This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new cefebf8 [FLINK-12344][coordination] Remove legacy runtime messages
cefebf8 is described below
commit cefebf80410821704cbc153615c551f1117d2872
Author: Zili Chen <[email protected]>
AuthorDate: Sun Apr 28 00:22:14 2019 +0800
[FLINK-12344][coordination] Remove legacy runtime messages
---
.../org/apache/flink/client/cli/CliFrontend.java | 6 +-
.../messages/CheckAndAllocateContainers.java | 68 ---
.../messages/FatalErrorOccurred.java | 65 ---
.../messages/NewLeaderAvailable.java | 56 ---
.../messages/NotifyResourceStarted.java | 47 --
.../messages/ReconnectResourceManager.java | 59 ---
.../RegisterInfoMessageListenerSuccessful.java | 69 ---
.../messages/RegisterResourceManager.java | 45 --
.../RegisterResourceManagerSuccessful.java | 78 ---
.../clusterframework/messages/RemoveResource.java | 64 ---
.../clusterframework/messages/ResourceRemoved.java | 78 ---
.../messages/SetWorkerPoolSize.java | 63 ---
.../messages/ShutdownClusterAfterJob.java | 45 --
.../clusterframework/messages/StopCluster.java | 58 ---
.../messages/StopClusterSuccessful.java | 45 --
.../messages/TriggerRegistrationAtJobManager.java | 50 --
.../clusterframework/messages/package-info.java | 24 -
.../PartitionProducerDisposedException.java | 3 +-
.../messages/LeaderSessionMessageDecorator.java | 51 --
.../flink/runtime/messages/MessageDecorator.java | 33 --
.../runtime/messages/RequiresLeaderSessionID.java | 25 -
.../apache/flink/runtime/messages/StackTrace.java | 45 --
.../flink/runtime/messages/ArchiveMessages.scala | 76 ---
.../runtime/messages/ExecutionGraphMessages.scala | 87 ----
.../runtime/messages/JobManagerMessages.scala | 559 ---------------------
.../apache/flink/runtime/messages/Messages.scala | 40 --
.../runtime/messages/RegistrationMessages.scala | 105 ----
.../messages/StackTraceSampleMessages.scala | 79 ---
.../runtime/messages/TaskControlMessages.scala | 160 ------
.../runtime/messages/TaskManagerMessages.scala | 183 -------
.../accumulators/AccumulatorMessages.scala | 96 ----
.../StackTraceSampleCoordinatorTest.java | 11 -
32 files changed, 3 insertions(+), 2470 deletions(-)
diff --git
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 1073936..cdd7616 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -52,7 +52,6 @@ import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.util.EnvironmentInformation;
@@ -691,8 +690,7 @@ public class CliFrontend {
}
/**
- * Sends a {@link
org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint}
- * message to the job manager.
+ * Sends a SavepointTriggerMessage to the job manager.
*/
private String triggerSavepoint(ClusterClient<?> clusterClient, JobID
jobId, String savepointDirectory) throws FlinkException {
logAndSysout("Triggering savepoint for job " + jobId + '.');
@@ -717,7 +715,7 @@ public class CliFrontend {
}
/**
- * Sends a {@link JobManagerMessages.DisposeSavepoint} message to the
job manager.
+ * Sends a SavepointDisposalRequest to the job manager.
*/
private void disposeSavepoint(ClusterClient<?> clusterClient, String
savepointPath) throws FlinkException {
Preconditions.checkNotNull(savepointPath, "Missing required
argument: savepoint path. " +
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/CheckAndAllocateContainers.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/CheckAndAllocateContainers.java
deleted file mode 100644
index 3380939..0000000
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/CheckAndAllocateContainers.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.clusterframework.messages;
-
-/**
- * This message signals the resource master to check how many TaskManagers are
- * desired, how many are available, and to trigger adjustments if needed.
- */
-public class CheckAndAllocateContainers implements java.io.Serializable {
-
- private static final long serialVersionUID = 7808628311617273755L;
-
- /** The singleton instance */
- private static final CheckAndAllocateContainers INSTANCE = new
CheckAndAllocateContainers();
-
- /**
- * Gets the singleton instance.
- * @return The singleton instance.
- */
- public static CheckAndAllocateContainers get() {
- return INSTANCE;
- }
-
- //
------------------------------------------------------------------------
-
- /** Private constructor to prevent instantiation */
- private CheckAndAllocateContainers() {}
-
- //
------------------------------------------------------------------------
-
- @Override
- public boolean equals(Object obj) {
- return obj != null && obj.getClass() ==
CheckAndAllocateContainers.class;
- }
-
- @Override
- public int hashCode() {
- return 1725602876;
- }
-
- @Override
- public String toString() {
- return getClass().getSimpleName();
- }
-
- /**
- * Read resolve to preserve the singleton object property.
- */
- private Object readResolve() throws java.io.ObjectStreamException {
- return INSTANCE;
- }
-}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/FatalErrorOccurred.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/FatalErrorOccurred.java
deleted file mode 100644
index ae6bb39..0000000
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/FatalErrorOccurred.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.clusterframework.messages;
-
-import org.apache.flink.util.SerializedThrowable;
-
-import java.io.Serializable;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Message sent to the Flink's resource manager in case of a fatal error that
- * cannot be recovered in the running process.
- *
- * When master high-availability is enabled, this message should fail the
resource
- * manager (for example process kill) such that it gets recovered (restarted or
- * another process takes over).
- */
-public class FatalErrorOccurred implements Serializable {
-
- private static final long serialVersionUID = -2246792138413563536L;
-
- private final String message;
-
- private final SerializedThrowable error;
-
- public FatalErrorOccurred(String message, Throwable error) {
- this.message = requireNonNull(message);
- this.error = new SerializedThrowable(requireNonNull(error));
- }
-
- //
------------------------------------------------------------------------
-
- public String message() {
- return message;
- }
-
- public Throwable error() {
- return error.deserializeError(getClass().getClassLoader());
- }
-
- //
------------------------------------------------------------------------
-
- @Override
- public String toString() {
- return "FatalErrorOccurred { message: '" + message + "', error:
'"
- + error.getMessage() + "' }";
- }
-}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/NewLeaderAvailable.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/NewLeaderAvailable.java
deleted file mode 100644
index f751718..0000000
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/NewLeaderAvailable.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.clusterframework.messages;
-
-import java.util.UUID;
-
-/**
- * Message sent to the Flink resource manager to indicate that a new leader is
available.
- */
-public class NewLeaderAvailable implements java.io.Serializable {
- private static final long serialVersionUID = 1L;
-
- /** The leader Akka URL */
- private final String leaderAddress;
-
- /** The session ID for the leadership status */
- private final UUID leaderSessionId;
-
- public NewLeaderAvailable(String leaderAddress, UUID leaderSessionId) {
- this.leaderAddress = leaderAddress;
- this.leaderSessionId = leaderSessionId;
- }
-
- //
------------------------------------------------------------------------
-
- public String leaderAddress() {
- return leaderAddress;
- }
-
- public UUID leaderSessionId() {
- return leaderSessionId;
- }
-
- //
------------------------------------------------------------------------
-
- @Override
- public String toString() {
- return "NewLeaderAvailable (" + leaderAddress + " / " +
leaderSessionId + ')';
- }
-}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/NotifyResourceStarted.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/NotifyResourceStarted.java
deleted file mode 100644
index 1427ba8..0000000
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/NotifyResourceStarted.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.clusterframework.messages;
-
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
-
-/**
- * Notifies the ResourceManager that a TaskManager has been started in a
container with the given
- * resource id.
- */
-public class NotifyResourceStarted implements RequiresLeaderSessionID,
java.io.Serializable {
- private static final long serialVersionUID = 1L;
-
- private final ResourceID resourceID;
-
- public NotifyResourceStarted(ResourceID resourceID) {
- this.resourceID = resourceID;
- }
-
- public ResourceID getResourceID() {
- return resourceID;
- }
-
- @Override
- public String toString() {
- return "NotifyResourceStarted{" +
- ", resourceID=" + resourceID +
- '}';
- }
-}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ReconnectResourceManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ReconnectResourceManager.java
deleted file mode 100644
index 1f852e8..0000000
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ReconnectResourceManager.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.clusterframework.messages;
-
-import akka.actor.ActorRef;
-import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
-import org.apache.flink.util.Preconditions;
-
-import java.io.Serializable;
-
-/**
- * This message signals that the ResourceManager should reconnect to the
JobManager. It is processed
- * by the JobManager if it fails to register resources with the
ResourceManager. The JobManager wants
- * the ResourceManager to go through the reconciliation phase to sync up with
the JobManager bookkeeping.
- * This is done by forcing the ResourceManager to reconnect.
- */
-public class ReconnectResourceManager implements RequiresLeaderSessionID,
Serializable {
- private static final long serialVersionUID = 1L;
-
- private final ActorRef resourceManager;
-
- private final long connectionId;
-
- public ReconnectResourceManager(ActorRef resourceManager, long
connectionId) {
- this.resourceManager =
Preconditions.checkNotNull(resourceManager);
- this.connectionId = Preconditions.checkNotNull(connectionId);
- }
-
- public ActorRef resourceManager() {
- return resourceManager;
- }
-
- public long getConnectionId() {
- return connectionId;
- }
-
- @Override
- public String toString() {
- return "ReconnectResourceManager(" +
- resourceManager.path() + ", " +
- connectionId + ')';
- }
-}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterInfoMessageListenerSuccessful.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterInfoMessageListenerSuccessful.java
deleted file mode 100644
index 79e5618..0000000
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterInfoMessageListenerSuccessful.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.clusterframework.messages;
-
-import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
-
-/**
- * This message signals to the application client that the registration was
successful.
- */
-public class RegisterInfoMessageListenerSuccessful implements
RequiresLeaderSessionID, java.io.Serializable {
-
- private static final long serialVersionUID = 7808628311617273755L;
-
- /** The singleton instance */
- private static final RegisterInfoMessageListenerSuccessful INSTANCE =
new RegisterInfoMessageListenerSuccessful();
-
- /**
- * Gets the singleton instance.
- * @return The singleton instance.
- */
- public static RegisterInfoMessageListenerSuccessful get() {
- return INSTANCE;
- }
-
- //
------------------------------------------------------------------------
-
- /** Private constructor to prevent instantiation */
- private RegisterInfoMessageListenerSuccessful() {}
-
- //
------------------------------------------------------------------------
-
- @Override
- public boolean equals(Object obj) {
- return obj != null && obj.getClass() ==
RegisterInfoMessageListenerSuccessful.class;
- }
-
- @Override
- public int hashCode() {
- return 2018741656;
- }
-
- @Override
- public String toString() {
- return getClass().getSimpleName();
- }
-
- /**
- * Read resolve to preserve the singleton object property.
- */
- private Object readResolve() throws java.io.ObjectStreamException {
- return INSTANCE;
- }
-}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterResourceManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterResourceManager.java
deleted file mode 100644
index fce2b87..0000000
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterResourceManager.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.clusterframework.messages;
-
-import akka.actor.ActorRef;
-import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
-import org.apache.flink.util.Preconditions;
-
-/**
- * This message signals that the resource manager wants to register at the
JobManager leader.
- */
-public class RegisterResourceManager implements RequiresLeaderSessionID,
java.io.Serializable {
- private static final long serialVersionUID = 1L;
-
- private final ActorRef resourceManager;
-
- public RegisterResourceManager(ActorRef resourceManager) {
- this.resourceManager =
Preconditions.checkNotNull(resourceManager);
- }
-
- public ActorRef resourceManager() {
- return resourceManager;
- }
-
- @Override
- public String toString() {
- return "RegisterResourceManager " + resourceManager.path();
- }
-}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterResourceManagerSuccessful.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterResourceManagerSuccessful.java
deleted file mode 100644
index df72195..0000000
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterResourceManagerSuccessful.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.clusterframework.messages;
-
-import akka.actor.ActorRef;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
-
-import java.io.Serializable;
-import java.util.Collection;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Message that informs the resource manager that the JobManager accepted its
registration.
- * Carries information about the JobManager, and about the TaskManagers that
the JobManager
- * still has registered.
- */
-public class RegisterResourceManagerSuccessful implements
RequiresLeaderSessionID, Serializable {
-
- private static final long serialVersionUID = 817011779310941753L;
-
- /** The JobManager which we registered with. */
- private final ActorRef jobManager;
-
- /** The list of registered TaskManagers that the JobManager currently
knows */
- private final Collection<ResourceID> currentlyRegisteredTaskManagers;
-
- /**
- * Creates a new message with a list of existing known TaskManagers.
- *
- * @param currentlyRegisteredTaskManagers
- * The list of TaskManagers that the JobManager currently
knows.
- */
- public RegisterResourceManagerSuccessful(ActorRef jobManager,
- Collection<ResourceID> currentlyRegisteredTaskManagers)
- {
- this.jobManager = jobManager;
- this.currentlyRegisteredTaskManagers =
requireNonNull(currentlyRegisteredTaskManagers);
- }
-
- //
------------------------------------------------------------------------
-
-
- public ActorRef jobManager() {
- return jobManager;
- }
-
- public Collection<ResourceID> currentlyRegisteredTaskManagers() {
- return currentlyRegisteredTaskManagers;
- }
-
- //
------------------------------------------------------------------------
-
- @Override
- public String toString() {
- return "RegisterResourceManagerSuccessful{" +
- "jobManager=" + jobManager +
- ", currentlyRegisteredTaskManagers=" +
currentlyRegisteredTaskManagers +
- '}';
- }
-}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RemoveResource.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RemoveResource.java
deleted file mode 100644
index 08fa90e..0000000
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RemoveResource.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.clusterframework.messages;
-
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
-
-import java.io.Serializable;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Message sent to the ResourceManager by the JobManager to instruct to remove
a resource.
- */
-public class RemoveResource implements RequiresLeaderSessionID, Serializable {
- private static final long serialVersionUID = 1L;
-
- /** The ID under which the resource is registered (for example
container ID) */
- private final ResourceID resourceId;
-
- /**
- * Constructor for a shutdown of a registered resource.
- * @param resourceId The ID under which the resource is registered (for
example container ID).
- */
- public RemoveResource(ResourceID resourceId) {
- this.resourceId = requireNonNull(resourceId);
- }
-
- //
------------------------------------------------------------------------
-
- /**
- * Gets the ID under which the resource is registered (for example
container ID).
- * @return The resource ID
- */
- public ResourceID resourceId() {
- return resourceId;
- }
-
-
- //
------------------------------------------------------------------------
-
- @Override
- public String toString() {
- return "RemoveResource{" +
- "resourceId=" + resourceId +
- '}';
- }
-}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ResourceRemoved.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ResourceRemoved.java
deleted file mode 100644
index 1dfa0eb..0000000
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ResourceRemoved.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.clusterframework.messages;
-
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
-
-import java.io.Serializable;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Message sent to the JobManager by the Resource Manager to inform
- * about the removal of a resource.
- */
-public class ResourceRemoved implements RequiresLeaderSessionID, Serializable {
- private static final long serialVersionUID = 1L;
-
- /** The ID under which the resource is registered (for example
container ID) */
- private final ResourceID resourceId;
-
- /** Optional message with details, for logging and debugging */
- private final String message;
-
- /**
- * Constructor for a shutdown of a registered resource.
- * @param resourceId The ID under which the resource is registered (for
example container ID).
- * @param message Optional message with details, for logging and
debugging.
- */
- public ResourceRemoved(ResourceID resourceId, String message) {
- this.resourceId = requireNonNull(resourceId);
- this.message = message;
- }
-
- //
------------------------------------------------------------------------
-
- /**
- * Gets the ID under which the resource is registered (for example
container ID).
- * @return The resource ID
- */
- public ResourceID resourceId() {
- return resourceId;
- }
-
- /**
- * Gets the optional message with details, for logging and debugging.
- * @return Optional message, may be null
- */
- public String message() {
- return message;
- }
-
- //
------------------------------------------------------------------------
-
- @Override
- public String toString() {
- return "ResourceRemoved{" +
- "resourceId=" + resourceId +
- ", message='" + message + '\'' +
- '}';
- }
-}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/SetWorkerPoolSize.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/SetWorkerPoolSize.java
deleted file mode 100644
index 705bf91..0000000
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/SetWorkerPoolSize.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.clusterframework.messages;
-
-import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
-
-/**
- * Message sent to the resource master actor to adjust the designated number of
- * workers it maintains.
- */
-public class SetWorkerPoolSize implements RequiresLeaderSessionID,
java.io.Serializable{
-
- private static final long serialVersionUID = -335911350781207609L;
-
- private final int numberOfWorkers;
-
- public SetWorkerPoolSize(int numberOfWorkers) {
- if (numberOfWorkers < 0) {
- throw new IllegalArgumentException("Number of workers
must not be negative.");
- }
- this.numberOfWorkers = numberOfWorkers;
- }
-
- //
------------------------------------------------------------------------
-
- public int numberOfWorkers() {
- return numberOfWorkers;
- }
-
- //
------------------------------------------------------------------------
-
- @Override
- public int hashCode() {
- return numberOfWorkers;
- }
-
- @Override
- public boolean equals(Object obj) {
- return obj != null && obj.getClass() == SetWorkerPoolSize.class
&&
- this.numberOfWorkers ==
((SetWorkerPoolSize) obj).numberOfWorkers;
- }
-
- @Override
- public String toString() {
- return "SetWorkerPoolSize (" + numberOfWorkers + ')';
- }
-}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ShutdownClusterAfterJob.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ShutdownClusterAfterJob.java
deleted file mode 100644
index add4d39..0000000
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ShutdownClusterAfterJob.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.clusterframework.messages;
-
-import org.apache.flink.api.common.JobID;
-
-/**
- * Message sent to the cluster framework master to signal it that the cluster
- * should be shut down upon completion of a certain job identified by a job id.
- */
-public class ShutdownClusterAfterJob implements java.io.Serializable {
-
- private static final long serialVersionUID = -2723662264779569704L;
-
- private final JobID jobId;
-
- public ShutdownClusterAfterJob(JobID jobId) {
- this.jobId = jobId;
- }
-
- public JobID jobId() {
- return jobId;
- }
-
- @Override
- public String toString() {
- return "ShutdownClusterAfterJob { jobId=" + jobId + " } ";
- }
-}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/StopCluster.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/StopCluster.java
deleted file mode 100644
index f366d7c..0000000
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/StopCluster.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.clusterframework.messages;
-
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Generic message to signal the cluster framework to shut the cluster down.
- */
-public class StopCluster implements RequiresLeaderSessionID,
java.io.Serializable {
-
- private static final long serialVersionUID = -8957259342982181684L;
-
- private final ApplicationStatus finalStatus;
-
- private final String message;
-
- public StopCluster(ApplicationStatus finalStatus, String message) {
- this.finalStatus = requireNonNull(finalStatus);
- this.message = message == null ? "" : message;
- }
-
- //
------------------------------------------------------------------------
-
- public ApplicationStatus finalStatus() {
- return finalStatus;
- }
-
- public String message() {
- return message;
- }
-
- //
------------------------------------------------------------------------
-
- @Override
- public String toString() {
- return "StopApplication { message='" + message + "' }";
- }
-}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/StopClusterSuccessful.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/StopClusterSuccessful.java
deleted file mode 100644
index 7f9d855..0000000
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/StopClusterSuccessful.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.clusterframework.messages;
-
-import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
-
-/**
- * Generic message to signal the cluster listener that the cluster has been
shut down.
- */
-public class StopClusterSuccessful implements RequiresLeaderSessionID,
java.io.Serializable {
-
- private static final long serialVersionUID = 42L;
-
- private static StopClusterSuccessful INSTANCE = new
StopClusterSuccessful();
-
- /**
- * Private constructor. Initial singleton in get() method.
- */
- private StopClusterSuccessful() {}
-
- public static StopClusterSuccessful getInstance() {
- return INSTANCE;
- }
-
- @Override
- public String toString() {
- return "StopClusterSuccessful{}";
- }
-}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/TriggerRegistrationAtJobManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/TriggerRegistrationAtJobManager.java
deleted file mode 100644
index d817d9c..0000000
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/TriggerRegistrationAtJobManager.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.clusterframework.messages;
-
-import akka.actor.ActorRef;
-import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
-
-import java.io.Serializable;
-
-/**
- * Causes the resource manager to try and apply at the leader JobManager.
- */
-public class TriggerRegistrationAtJobManager implements
RequiresLeaderSessionID, Serializable {
- private static final long serialVersionUID = 1L;
-
- private final String jobManagerAddress;
-
- public TriggerRegistrationAtJobManager(ActorRef jobManager) {
- this(jobManager.path().toSerializationFormat());
- }
-
- public TriggerRegistrationAtJobManager(String jobManagerAddress) {
- this.jobManagerAddress = jobManagerAddress;
- }
-
- public String jobManagerAddress() {
- return jobManagerAddress;
- }
-
- @Override
- public String toString() {
- return "TriggerRegistrationAtJobManager " + jobManagerAddress;
- }
-}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/package-info.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/package-info.java
deleted file mode 100644
index d647dea..0000000
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/package-info.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * This package contains the actor messages that are sent between the
- * cluster resource framework and the JobManager, as well as the generic
- * messages sent between the cluster resource framework and the client.
- */
-package org.apache.flink.runtime.clusterframework.messages;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/PartitionProducerDisposedException.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/PartitionProducerDisposedException.java
index 12f2433..1cf77d7 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/PartitionProducerDisposedException.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/PartitionProducerDisposedException.java
@@ -19,10 +19,9 @@
package org.apache.flink.runtime.jobmanager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import
org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionProducerState;
/**
- * Exception returned to a TaskManager on {@link RequestPartitionProducerState}
+ * Exception returned to a TaskManager on JobMaster requesting partition state,
* if the producer of a partition has been disposed.
*/
public class PartitionProducerDisposedException extends Exception {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/LeaderSessionMessageDecorator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/LeaderSessionMessageDecorator.java
deleted file mode 100644
index 6e49f44..0000000
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/LeaderSessionMessageDecorator.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.messages;
-
-import java.util.UUID;
-
-/**
- * Message decorator which wraps message which implement {@link
RequiresLeaderSessionID} into
- * a {@link
org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage}.
- */
-public class LeaderSessionMessageDecorator implements MessageDecorator {
-
- private static final long serialVersionUID = 5359618147408392706L;
-
- /** Leader session ID with which the RequiresLeaderSessionID messages
will be decorated */
- private final UUID leaderSessionID;
-
- /**
- * Sets the leader session ID with which the messages will be decorated.
- *
- * @param leaderSessionID Leader session ID to be used for decoration
- */
- public LeaderSessionMessageDecorator(UUID leaderSessionID) {
- this.leaderSessionID = leaderSessionID;
- }
-
- @Override
- public Object decorate(Object message) {
- if (message instanceof RequiresLeaderSessionID) {
- return new
JobManagerMessages.LeaderSessionMessage(leaderSessionID, message);
- } else {
- return message;
- }
- }
-}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/MessageDecorator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/MessageDecorator.java
deleted file mode 100644
index 8bbab73..0000000
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/MessageDecorator.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.messages;
-
-/**
- * Interface for message decorators
- */
-public interface MessageDecorator extends java.io.Serializable {
-
- /**
- * Decorates a message
- *
- * @param message Message to decorate
- * @return Decorated message
- */
- Object decorate(Object message);
-}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/RequiresLeaderSessionID.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/RequiresLeaderSessionID.java
deleted file mode 100644
index 7e6a473..0000000
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/RequiresLeaderSessionID.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.messages;
-
-/**
- * Marks messages to be sent wrapped in a
- * {@link
org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage}
- */
-public interface RequiresLeaderSessionID {}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/StackTrace.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/StackTrace.java
deleted file mode 100644
index c041655..0000000
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/StackTrace.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.messages;
-
-import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.util.Preconditions;
-
-import java.io.Serializable;
-
-public class StackTrace implements Serializable {
-
- private static final long serialVersionUID = -899464298250067416L;
-
- private final InstanceID instanceId;
- private final String stackTrace;
-
- public StackTrace(InstanceID instanceId, String stackTrace) {
- this.instanceId = Preconditions.checkNotNull(instanceId);
- this.stackTrace = Preconditions.checkNotNull(stackTrace);
- }
-
- public InstanceID getInstanceId() {
- return instanceId;
- }
-
- public String getStackTrace() {
- return stackTrace;
- }
-}
diff --git
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
deleted file mode 100644
index c1227dc..0000000
---
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.messages
-
-import org.apache.flink.api.common.JobID
-import org.apache.flink.runtime.executiongraph.{ArchivedExecutionGraph,
ExecutionGraph}
-
-/**
- * This object contains the archive specific messages.
- */
-object ArchiveMessages {
-
- case class ArchiveExecutionGraph(jobID: JobID, graph: ArchivedExecutionGraph)
-
- /**
- * Request the currently archived jobs in the archiver. The resulting
response is [[ArchivedJobs]]
- */
- case object RequestArchivedJobs
-
- /**
- * Requests the number of finished, canceled, and failed jobs
- */
- case object RequestJobCounts
-
- /**
- * Request a specific ExecutionGraph by JobID. The response is
[[RequestArchivedJob]]
- * @param jobID
- */
- case class RequestArchivedJob(jobID: JobID)
-
- case class ArchivedJob(job: Option[ArchivedExecutionGraph])
-
- /**
- * Response to [[RequestArchivedJobs]] message. The response contains the
archived jobs.
- * @param jobs
- */
- case class ArchivedJobs(jobs: Iterable[ArchivedExecutionGraph]){
- def asJavaIterable: java.lang.Iterable[ArchivedExecutionGraph] = {
- import scala.collection.JavaConverters._
- jobs.asJava
- }
-
- def asJavaCollection: java.util.Collection[ArchivedExecutionGraph] = {
- import scala.collection.JavaConverters._
- jobs.asJavaCollection
- }
- }
-
- // --------------------------------------------------------------------------
- // Utility methods to allow simpler case object access from Java
- // --------------------------------------------------------------------------
-
- def getRequestArchivedJobs : AnyRef = {
- RequestArchivedJobs
- }
-
- def getRequestJobCounts : AnyRef = {
- RequestJobCounts
- }
-}
diff --git
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala
deleted file mode 100644
index 59efa73..0000000
---
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.messages
-
-import java.text.SimpleDateFormat
-import java.util.{UUID, Date}
-
-import org.apache.flink.api.common.JobID
-import org.apache.flink.runtime.execution.ExecutionState
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
-import org.apache.flink.runtime.jobgraph.{JobStatus, JobVertexID}
-
-/**
- * This object contains the execution graph specific messages.
- */
-object ExecutionGraphMessages {
-
- // --------------------------------------------------------------------------
- // Messages
- // --------------------------------------------------------------------------
-
- /**
- * Denotes the execution state change of an
- * [[org.apache.flink.runtime.executiongraph.ExecutionVertex]]
- *
- * @param jobID to which the vertex belongs
- * @param vertexID of the ExecutionJobVertex to which the ExecutionVertex
belongs
- * @param taskName
- * @param totalNumberOfSubTasks denotes the number of parallel subtasks
- * @param subtaskIndex denotes the index of the ExecutionVertex
- * @param executionID
- * @param newExecutionState
- * @param timestamp of the execution state change
- * @param optionalMessage
- */
- case class ExecutionStateChanged(
- jobID: JobID,
- vertexID: JobVertexID,
- taskName: String,
- totalNumberOfSubTasks: Int,
- subtaskIndex: Int,
- executionID: ExecutionAttemptID,
- newExecutionState: ExecutionState,
- timestamp: Long,
- optionalMessage: String)
- extends RequiresLeaderSessionID {
-
- override def toString: String = {
- val oMsg = if (optionalMessage != null) {
- s"\n$optionalMessage"
- } else {
- ""
- }
-
- s"${timestampToString(timestamp)}\t$taskName(${subtaskIndex +
- 1}/$totalNumberOfSubTasks) switched to $newExecutionState $oMsg"
- }
- }
-
- // --------------------------------------------------------------------------
- // Utilities
- // --------------------------------------------------------------------------
-
- private val DATE_FORMATTER: SimpleDateFormat = new
SimpleDateFormat("MM/dd/yyyy HH:mm:ss")
-
- private def timestampToString(timestamp: Long): String = {
- DATE_FORMATTER.synchronized {
- DATE_FORMATTER.format(new Date(timestamp))
- }
- }
-}
diff --git
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
deleted file mode 100644
index 0206872..0000000
---
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ /dev/null
@@ -1,559 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.messages
-
-import java.net.URL
-import java.util.UUID
-
-import akka.actor.ActorRef
-import org.apache.flink.api.common.JobID
-import org.apache.flink.runtime.akka.ListeningBehaviour
-import org.apache.flink.runtime.blob.PermanentBlobKey
-import org.apache.flink.runtime.client.{JobStatusMessage,
SerializedJobExecutionResult}
-import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.executiongraph.{AccessExecutionGraph,
ExecutionAttemptID, ExecutionGraph}
-import org.apache.flink.runtime.instance.{Instance, InstanceID}
-import org.apache.flink.runtime.io.network.partition.ResultPartitionID
-import org.apache.flink.runtime.jobgraph.{IntermediateDataSetID, JobGraph,
JobStatus, JobVertexID}
-import org.apache.flink.runtime.jobmanager.SubmittedJobGraph
-import org.apache.flink.runtime.messages.checkpoint.AbstractCheckpointMessage
-import org.apache.flink.util.SerializedThrowable
-
-import scala.collection.JavaConverters._
-
-/**
- * The job manager specific actor messages
- */
-object JobManagerMessages {
-
- /** Wrapper class for leader session messages. Leader session messages
implement the
- * [[RequiresLeaderSessionID]] interface and have to be wrapped in a
[[LeaderSessionMessage]],
- * which also contains the current leader session ID.
- *
- * @param leaderSessionID Current leader session ID
- * @param message [[RequiresLeaderSessionID]] message to be wrapped in a
[[LeaderSessionMessage]]
- */
- case class LeaderSessionMessage(leaderSessionID: UUID, message: Any)
-
- /**
- * Submits a job to the job manager. Depending on the [[listeningBehaviour]],
- * the sender registers for different messages. If
[[ListeningBehaviour.DETACHED]], then
- * it will only be informed whether the submission was successful or not. If
- * [[ListeningBehaviour.EXECUTION_RESULT]], then it will additionally
receive the execution
- * result. If [[ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES]],
then it will additionally
- * receive the job status change notifications.
- *
- * The submission result will be sent back to the sender as a success
message.
- *
- * @param jobGraph The job to be submitted to the JobManager
- * @param listeningBehaviour Specifies to what the sender wants to listen
(detached, execution
- * result, execution result and state changes)
- */
- case class SubmitJob(
- jobGraph: JobGraph,
- listeningBehaviour: ListeningBehaviour)
- extends RequiresLeaderSessionID
-
- /**
- * Registers the sender of the message as the client for the provided job
identifier.
- * This message is acknowledged by the JobManager with
[[RegisterJobClientSuccess]]
- * or [[JobNotFound]] if the job was not running.
- * @param jobID The job id of the job
- * @param listeningBehaviour The types of updates which will be sent to the
sender
- * after registration
- */
- case class RegisterJobClient(
- jobID: JobID,
- listeningBehaviour: ListeningBehaviour)
- extends RequiresLeaderSessionID
-
- /**
- * Triggers the recovery of the job with the given ID.
- *
- * @param jobId ID of the job to recover
- */
- case class RecoverJob(jobId: JobID) extends RequiresLeaderSessionID
-
- /**
- * Triggers the submission of the recovered job
- *
- * @param submittedJobGraph Contains the submitted JobGraph
- */
- case class RecoverSubmittedJob(submittedJobGraph: SubmittedJobGraph)
- extends RequiresLeaderSessionID
-
- /**
- * Triggers recovery of all available jobs.
- */
- case object RecoverAllJobs extends RequiresLeaderSessionID
-
- /**
- * Cancels a job with the given [[jobID]] at the JobManager. The result of
the cancellation is
- * sent back to the sender as a [[CancellationResponse]] message.
- *
- * @param jobID
- */
- case class CancelJob(jobID: JobID) extends RequiresLeaderSessionID
-
- /**
- * Cancels the job with the given [[jobID]] at the JobManager. Before
cancellation a savepoint
- * is triggered without any other checkpoints in between. The result of the
cancellation is
- * the path of the triggered savepoint on success or an exception.
- *
- * @param jobID ID of the job to cancel
- * @param savepointDirectory Optional target directory for the savepoint.
- * If no target directory is specified here, the
- * cluster default is used.
- */
- case class CancelJobWithSavepoint(
- jobID: JobID,
- savepointDirectory: String = null)
- extends RequiresLeaderSessionID
-
- /**
- * Requesting next input split for the
- * [[org.apache.flink.runtime.executiongraph.ExecutionJobVertex]]
- * of the job specified by [[jobID]]. The next input split is sent back to
the sender as a
- * [[NextInputSplit]] message.
- *
- * @param jobID
- * @param vertexID
- */
- case class RequestNextInputSplit(
- jobID: JobID,
- vertexID: JobVertexID,
- executionAttempt: ExecutionAttemptID)
- extends RequiresLeaderSessionID
-
- /**
- * Contains the next input split for a task. This message is a response to
- *
[[org.apache.flink.runtime.messages.JobManagerMessages.RequestNextInputSplit]].
- *
- * @param splitData
- */
- case class NextInputSplit(splitData: Array[Byte])
-
- /**
- * Requests the execution state of the execution producing a result
partition.
- *
- * @param jobId ID of the job the partition belongs to.
- * @param intermediateDataSetId ID of the parent intermediate data set.
- * @param resultPartitionId ID of the result partition to check. This
- * identifies the producing execution and
- * partition.
- */
- case class RequestPartitionProducerState(
- jobId: JobID,
- intermediateDataSetId: IntermediateDataSetID,
- resultPartitionId: ResultPartitionID)
- extends RequiresLeaderSessionID
-
- /**
- * Notifies the org.apache.flink.runtime.jobmanager.JobManager about
available data for a
- * produced partition.
- * <p>
- * There is a call to this method for each
- * [[org.apache.flink.runtime.executiongraph.ExecutionVertex]] instance once
per produced
- * [[org.apache.flink.runtime.io.network.partition.ResultPartition]]
instance,
- * either when first producing data (for pipelined executions) or when all
data has been produced
- * (for staged executions).
- * <p>
- * The org.apache.flink.runtime.jobmanager.JobManager then can decide when
to schedule the
- * partition consumers of the given session.
- *
- * @see [[org.apache.flink.runtime.io.network.partition.ResultPartition]]
- */
- case class ScheduleOrUpdateConsumers(jobId: JobID, partitionId:
ResultPartitionID)
- extends RequiresLeaderSessionID
-
- /**
- * Requests the current [[JobStatus]] of the job identified by [[jobID]].
This message triggers
- * as response a [[JobStatusResponse]] message.
- *
- * @param jobID
- */
- case class RequestJobStatus(jobID: JobID)
-
- sealed trait JobStatusResponse {
- def jobID: JobID
- }
-
- /**
- * Denotes the current [[JobStatus]] of the job with [[jobID]].
- *
- * @param jobID
- * @param status
- */
- case class CurrentJobStatus(jobID: JobID, status: JobStatus) extends
JobStatusResponse
-
- /**
- * Requests the number of currently registered task manager at the job
manager. The result is
- * sent back to the sender as an [[Int]].
- */
- case object RequestNumberRegisteredTaskManager
-
- /**
- * Requests the maximum number of slots available to the job manager. The
result is sent back
- * to the sender as an [[Int]].
- */
- case object RequestTotalNumberOfSlots
-
- /**
- * Requests all entities necessary for reconstructing a job class loader
- * May respond with [[ClassloadingProps]] or [[JobNotFound]]
- * @param jobId The job id of the registered job
- */
- case class RequestClassloadingProps(jobId: JobID)
-
- /**
- * Response to [[RequestClassloadingProps]]
- * @param blobManagerPort The port of the blobManager
- * @param requiredJarFiles The blob keys of the required jar files
- * @param requiredClasspaths The urls of the required classpaths
- */
- case class ClassloadingProps(blobManagerPort: Integer,
- requiredJarFiles:
java.util.Collection[PermanentBlobKey],
- requiredClasspaths: java.util.Collection[URL])
-
- /**
- * Requests the port of the blob manager from the job manager. The result is
sent back to the
- * sender as an [[Int]].
- */
- case object RequestBlobManagerPort
-
- /** Requests the current leader session ID of the job manager. The result is
sent back to the
- * sender as an [[ResponseLeaderSessionID]]
- */
- case object RequestLeaderSessionID
-
- /** Response to the [[RequestLeaderSessionID]] message.
- *
- * @param leaderSessionID
- */
- case class ResponseLeaderSessionID(leaderSessionID: UUID)
-
- /**
- * Denotes a successful job submission.
- * @param jobId Ths job's ID.
- */
- case class JobSubmitSuccess(jobId: JobID)
-
- /**
- * Denotes a successful registration of a JobClientActor for a running job
- * @param jobId The job id of the registered job
- */
- case class RegisterJobClientSuccess(jobId: JobID)
-
- /**
- * Denotes messages which contain the result of a completed job execution
- */
- sealed trait JobResultMessage
-
- /**
- * Denotes a successful job execution.
- * @param result The result of the job execution, in serialized form.
- */
- case class JobResultSuccess(result: SerializedJobExecutionResult) extends
JobResultMessage
-
- /**
- * Denotes an unsuccessful job execution.
- * @param cause The exception that caused the job to fail, in serialized
form.
- */
- case class JobResultFailure(cause: SerializedThrowable) extends
JobResultMessage
-
-
- sealed trait CancellationResponse{
- def jobID: JobID
- }
-
- /**
- * Denotes a successful job cancellation
- * @param jobID
- */
- case class CancellationSuccess(
- jobID: JobID,
- savepointPath: String = null) extends CancellationResponse
-
- /**
- * Denotes a failed job cancellation
- * @param jobID
- * @param cause
- */
- case class CancellationFailure(jobID: JobID, cause: Throwable) extends
CancellationResponse
-
- /**
- * Requests all currently running jobs from the job manager. This message
triggers a
- * [[RunningJobs]] response.
- */
- case object RequestRunningJobs
-
- /**
- * This message is the response to the [[RequestRunningJobs]] message. It
contains all
- * execution graphs of the currently running jobs.
- */
- case class RunningJobs(runningJobs: Iterable[ExecutionGraph]) {
- def this() = this(Seq())
- def asJavaIterable: java.lang.Iterable[ExecutionGraph] = {
- runningJobs.asJava
- }
- }
-
- /**
- * Requests the status of all currently running jobs from the job manager.
- * This message triggers a [[RunningJobsStatus]] response.
- */
- case object RequestRunningJobsStatus
-
- case class RunningJobsStatus(runningJobs: Iterable[JobStatusMessage]) {
- def this() = this(Seq())
-
- def getStatusMessages(): java.util.List[JobStatusMessage] = {
- new java.util.ArrayList[JobStatusMessage](runningJobs.asJavaCollection)
- }
- }
-
- /**
- * Requests the execution graph of a specific job identified by [[jobID]].
- * The result is sent back to the sender as a [[JobResponse]].
- */
- case class RequestJob(jobID: JobID)
-
- sealed trait JobResponse{
- def jobID: JobID
- }
-
- /**
- * Contains the [[executionGraph]] of a job with [[jobID]]. This is the
response to
- * [[RequestJob]] if the job runs or is archived.
- *
- * @param jobID
- * @param executionGraph
- */
- case class JobFound(jobID: JobID, executionGraph: AccessExecutionGraph)
extends JobResponse
-
- /**
- * Denotes that there is no job with [[jobID]] retrievable. This message can
be the response of
- * [[RequestJob]], [[RequestJobStatus]] or [[RegisterJobClient]].
- *
- * @param jobID
- */
- case class JobNotFound(jobID: JobID) extends JobResponse with
JobStatusResponse
-
- /** Triggers the removal of the job with the given job ID
- *
- * @param jobID
- * @param removeJobFromStateBackend true if the job has properly finished
- */
- case class RemoveJob(jobID: JobID, removeJobFromStateBackend: Boolean = true)
- extends RequiresLeaderSessionID
-
- /**
- * Removes the job belonging to the job identifier from the job manager and
archives it.
- * @param jobID The job identifier
- */
- case class RemoveCachedJob(jobID: JobID)
-
- /**
- * Requests the instances of all registered task managers.
- */
- case object RequestRegisteredTaskManagers
-
- /**
- * Contains the [[Instance]] objects of all registered task managers. It is
the response to the
- * message [[RequestRegisteredTaskManagers]].
- *
- * @param taskManagers
- */
- case class RegisteredTaskManagers(taskManagers: Iterable[Instance]){
- def asJavaIterable: java.lang.Iterable[Instance] = {
- import scala.collection.JavaConverters._
- taskManagers.asJava
- }
-
- def asJavaCollection: java.util.Collection[Instance] = {
- import scala.collection.JavaConverters._
- taskManagers.asJavaCollection
- }
- }
-
- /**
- * Requests the [[Instance]] object of the task manager with the given
instance ID
- *
- * @param resourceId identifying the TaskManager which shall be retrieved
- */
- case class RequestTaskManagerInstance(resourceId: ResourceID)
-
- /**
- * Returns the [[Instance]] object of the requested task manager. This is in
response to
- * [[RequestTaskManagerInstance]]
- */
- case class TaskManagerInstance(instance: Option[Instance])
-
- /**
- * Requests stack trace messages of the task manager
- *
- * @param instanceID Instance ID of the task manager
- */
- case class RequestStackTrace(instanceID: InstanceID)
-
- /**
- * Requests the current state of the job manager
- */
- case object RequestJobManagerStatus
-
- /**
- * Response to RequestJobManagerStatus
- */
- sealed trait JobManagerStatus
-
- case object JobManagerStatusAlive extends JobManagerStatus
-
- /** Grants leadership to the receiver. The message contains the new leader
session id.
- *
- * @param leaderSessionID
- */
- case class GrantLeadership(leaderSessionID: Option[UUID])
-
- /** Revokes leadership of the receiver.
- */
- case object RevokeLeadership
-
- /** Requests the ActorRef of the archiver */
- case object RequestArchive
-
- /** Response containing the ActorRef of the archiver */
- case class ResponseArchive(actor: ActorRef)
-
- /** Request for the JobManager's REST endpoint address */
- case object RequestRestAddress
-
- /**
- * Triggers a savepoint for the specified job.
- *
- * This is not a subtype of [[AbstractCheckpointMessage]], because it is a
- * control-flow message, which is *not* part of the checkpointing mechanism
- * of triggering and acknowledging checkpoints.
- *
- * @param jobId The JobID of the job to trigger the savepoint for.
- * @param savepointDirectory Optional target directory
- */
- case class TriggerSavepoint(
- jobId: JobID,
- savepointDirectory : Option[String] = Option.empty) extends
RequiresLeaderSessionID
-
- /**
- * Response after a successful savepoint trigger containing the savepoint
path.
- *
- * @param jobId The job ID for which the savepoint was triggered.
- * @param savepointPath The path of the savepoint.
- */
- case class TriggerSavepointSuccess(
- jobId: JobID,
- checkpointId: Long,
- savepointPath: String,
- triggerTime: Long
- )
-
- /**
- * Response after a failed savepoint trigger containing the failure cause.
- *
- * @param jobId The job ID for which the savepoint was triggered.
- * @param cause The cause of the failure.
- */
- case class TriggerSavepointFailure(jobId: JobID, cause: Throwable)
-
- /**
- * Disposes a savepoint.
- *
- * @param savepointPath The path of the savepoint to dispose.
- */
- case class DisposeSavepoint(
- savepointPath: String)
- extends RequiresLeaderSessionID
-
- /** Response after a successful savepoint dispose. */
- case object DisposeSavepointSuccess
-
- /**
- * Response after a failed savepoint dispose containing the failure cause.
- *
- * @param cause The cause of the failure.
- */
- case class DisposeSavepointFailure(cause: Throwable)
-
- // --------------------------------------------------------------------------
- // Utility methods to allow simpler case object access from Java
- // --------------------------------------------------------------------------
-
- def getRequestJobStatus(jobId : JobID) : AnyRef = {
- RequestJobStatus(jobId)
- }
-
- def getRequestNumberRegisteredTaskManager : AnyRef = {
- RequestNumberRegisteredTaskManager
- }
-
- def getRequestTotalNumberOfSlots : AnyRef = {
- RequestTotalNumberOfSlots
- }
-
- def getRequestBlobManagerPort : AnyRef = {
- RequestBlobManagerPort
- }
-
- def getRequestRunningJobs : AnyRef = {
- RequestRunningJobs
- }
-
- def getRequestRunningJobsStatus : AnyRef = {
- RequestRunningJobsStatus
- }
-
- def getRequestRegisteredTaskManagers : AnyRef = {
- RequestRegisteredTaskManagers
- }
-
- def getRequestJobManagerStatus : AnyRef = {
- RequestJobManagerStatus
- }
-
- def getJobManagerStatusAlive : AnyRef = {
- JobManagerStatusAlive
- }
-
- def getRequestLeaderSessionID: AnyRef = {
- RequestLeaderSessionID
- }
-
- def getRequestArchive: AnyRef = {
- RequestArchive
- }
-
- def getRecoverAllJobs: AnyRef = {
- RecoverAllJobs
- }
-
- def getRequestRestAddress: AnyRef = {
- RequestRestAddress
- }
-
- def getDisposeSavepointSuccess: AnyRef = {
- DisposeSavepointSuccess
- }
-}
diff --git
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala
deleted file mode 100644
index 1103a69..0000000
---
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.messages
-
-import org.apache.flink.runtime.instance.InstanceID
-
-/**
- * Generic messages between JobManager, TaskManager, JobClient.
- */
-object Messages {
-
- /**
- * Signals that the receiver (JobManager/TaskManager) shall disconnect the
sender.
- *
- * The TaskManager may send this on shutdown to let the JobManager realize
the TaskManager
- * loss more quickly.
- *
- * The JobManager may send this message to its TaskManagers to let them
clean up their
- * tasks that depend on the JobManager and go into a clean state.
- *
- * @param cause The reason for disconnecting, to be displayed in log and
error messages.
- */
- case class Disconnect(instanceId: InstanceID, cause: Exception) extends
RequiresLeaderSessionID
-}
diff --git
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
deleted file mode 100644
index 5648bc6..0000000
---
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.messages
-
-import java.util.UUID
-
-import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.instance.{HardwareDescription, InstanceID}
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation
-
-import scala.concurrent.duration.{Deadline, FiniteDuration}
-
-/**
- * A set of messages between TaskManager and JobManager to handle the
- * registration of the TaskManager at the JobManager.
- */
-object RegistrationMessages {
-
- /**
- * Marker trait for registration messages.
- */
- trait RegistrationMessage extends RequiresLeaderSessionID {}
-
- /**
- * Triggers the TaskManager to attempt a registration at the JobManager.
- *
- * @param jobManagerURL Akka URL to the JobManager
- * @param timeout The timeout for the message. The next retry will double
this timeout.
- * @param deadline Optional deadline until when the registration must be
completed.
- * @param attempt The attempt number, for logging.
- * @param registrationRun UUID of the current registration run to filter out
outdated runs
- */
- case class TriggerTaskManagerRegistration(
- jobManagerURL: String,
- timeout: FiniteDuration,
- deadline: Option[Deadline],
- attempt: Int,
- registrationRun: UUID)
- extends RegistrationMessage
-
- /**
- * Registers a task manager at the JobManager. A successful registration is
acknowledged by
- * [[AcknowledgeRegistration]].
- *
- * @param connectionInfo The TaskManagers connection information.
- * @param resources The TaskManagers resources.
- * @param numberOfSlots The number of processing slots offered by the
TaskManager.
- */
- case class RegisterTaskManager(
- resourceId: ResourceID,
- connectionInfo: TaskManagerLocation,
- resources: HardwareDescription,
- numberOfSlots: Int)
- extends RegistrationMessage
-
- /**
- * Denotes the successful registration of a task manager at the JobManager.
This is the
- * response triggered by the [[RegisterTaskManager]] message when the
JobManager has registered
- * the task manager with the resource manager.
- *
- * @param instanceID The instance ID under which the TaskManager is
registered at the
- * JobManager.
- * @param blobPort The server port where the JobManager's BLOB service runs.
- */
- case class AcknowledgeRegistration(
- instanceID: InstanceID,
- blobPort: Int)
- extends RegistrationMessage
-
- /**
- * Denotes that the TaskManager has already been registered at the
JobManager.
- *
- * @param instanceID The instance ID under which the TaskManager is
registered.
- * @param blobPort The server port where the JobManager's BLOB service runs.
- */
- case class AlreadyRegistered(
- instanceID: InstanceID,
- blobPort: Int)
- extends RegistrationMessage
-
- /**
- * Denotes the unsuccessful registration of a task manager at the
JobManager. This is the
- * response triggered by the [[RegisterTaskManager]] message.
- *
- * @param reason Reason why the task manager registration was refused
- */
- case class RefuseRegistration(reason: Throwable) extends RegistrationMessage
-
-}
diff --git
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/StackTraceSampleMessages.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/StackTraceSampleMessages.scala
deleted file mode 100644
index 3a7c057..0000000
---
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/StackTraceSampleMessages.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.messages
-
-import akka.actor.ActorRef
-import org.apache.flink.api.common.time.Time
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
-
-import scala.concurrent.duration.FiniteDuration
-
-/**
- * A set of messages exchanged with task manager instances in order to sample
- * the stack traces of running tasks.
- */
-object StackTraceSampleMessages {
-
- trait StackTraceSampleMessages
-
- /**
- * Triggers the sampling of a running task (sent by the job manager to the
- * task managers).
- *
- * @param sampleId ID of this sample.
- * @param executionId ID of the task to sample.
- * @param numSamples Number of stack trace samples to collect.
- * @param delayBetweenSamples Delay between consecutive samples.
- * @param maxStackTraceDepth Maximum depth of the stack trace. 0 indicates
- * no maximum and collects the complete stack
- * trace.
- */
- case class TriggerStackTraceSample(
- sampleId: Int,
- executionId: ExecutionAttemptID,
- numSamples: Int,
- delayBetweenSamples: Time,
- maxStackTraceDepth: Int = 0)
- extends StackTraceSampleMessages with java.io.Serializable
-
- /**
- * Task manager internal sample message.
- *
- * @param sampleId ID of the this sample.
- * @param executionId ID of the task to sample.
- * @param delayBetweenSamples Delay between consecutive samples.
- * @param maxStackTraceDepth Maximum depth of the stack trace. 0 indicates
- * no maximum and collects the complete stack
- * trace.
- * @param numRemainingSamples Number of remaining samples before this
- * sample is finished.
- * @param currentTraces The current list of gathered stack traces.
- * @param sender Actor triggering this sample (receiver of result).
- */
- case class SampleTaskStackTrace(
- sampleId: Int,
- executionId: ExecutionAttemptID,
- delayBetweenSamples: Time,
- maxStackTraceDepth: Int,
- numRemainingSamples: Int,
- currentTraces: java.util.List[Array[StackTraceElement]],
- sender: ActorRef)
- extends StackTraceSampleMessages
-
-}
diff --git
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala
deleted file mode 100644
index d984618..0000000
---
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.messages
-
-import java.util
-
-import org.apache.flink.runtime.deployment.{InputChannelDeploymentDescriptor,
TaskDeploymentDescriptor}
-import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID,
PartitionInfo}
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID
-import org.apache.flink.runtime.taskmanager.TaskExecutionState
-
-/**
- * A set of messages that control the deployment and the state of Tasks
executed
- * on the TaskManager.
- */
-object TaskMessages {
-
- /**
- * Marker trait for task messages.
- */
- trait TaskMessage
-
- // --------------------------------------------------------------------------
- // Starting and stopping Tasks
- // --------------------------------------------------------------------------
-
- /**
- * Submits a task to the task manager. The result is to this message is a
- * [[TaskOperationResult]] message.
- *
- * @param tasks Descriptor which contains the information to start the task.
- */
- case class SubmitTask(tasks: TaskDeploymentDescriptor)
- extends TaskMessage with RequiresLeaderSessionID
-
- /**
- * Cancels the task associated with [[attemptID]]. The result is sent back
to the sender as a
- * [[TaskOperationResult]] message.
- *
- * @param attemptID The task's execution attempt ID.
- */
- case class CancelTask(attemptID: ExecutionAttemptID)
- extends TaskMessage with RequiresLeaderSessionID
-
- /**
- * Triggers a fail of specified task from the outside (as opposed to the
task throwing
- * an exception itself) with the given exception as the cause.
- *
- * @param executionID The task's execution attempt ID.
- * @param cause The reason for the external failure.
- */
- case class FailTask(executionID: ExecutionAttemptID, cause: Throwable)
- extends TaskMessage
-
- /**
- * Notifies the TaskManager that the task has reached its final state,
- * either FINISHED, CANCELED, or FAILED.
- *
- * @param executionID The task's execution attempt ID.
- */
- case class TaskInFinalState(executionID: ExecutionAttemptID)
- extends TaskMessage
-
-
- // --------------------------------------------------------------------------
- // Updates to Intermediate Results
- // --------------------------------------------------------------------------
-
- /**
- * Base class for messages that update the information about location of
input partitions
- */
- abstract sealed class UpdatePartitionInfo extends TaskMessage with
RequiresLeaderSessionID {
- def executionID: ExecutionAttemptID
- }
-
- /**
- *
- * @param executionID The task's execution attempt ID.
- * @param resultId The input reader to update.
- * @param partitionInfo The partition info update.
- */
- case class UpdateTaskSinglePartitionInfo(
- executionID: ExecutionAttemptID,
- resultId: IntermediateDataSetID,
- partitionInfo: InputChannelDeploymentDescriptor)
- extends UpdatePartitionInfo
-
- /**
- *
- * @param executionID The task's execution attempt ID.
- * @param partitionInfos List of input gates with channel descriptors to
update.
- */
- case class UpdateTaskMultiplePartitionInfos(
- executionID: ExecutionAttemptID,
- partitionInfos: java.lang.Iterable[PartitionInfo])
- extends UpdatePartitionInfo
-
- /**
- * Fails (and releases) all intermediate result partitions identified by
- * [[executionID]] from the task manager.
- *
- * @param executionID The task's execution attempt ID.
- */
- case class FailIntermediateResultPartitions(executionID: ExecutionAttemptID)
- extends TaskMessage with RequiresLeaderSessionID
-
-
- // --------------------------------------------------------------------------
- // Report Messages
- // --------------------------------------------------------------------------
-
- /**
- * Denotes a state change of a task at the JobManager. The update success is
acknowledged by a
- * boolean value which is sent back to the sender.
- *
- * @param taskExecutionState The changed task state
- */
- case class UpdateTaskExecutionState(taskExecutionState: TaskExecutionState)
- extends TaskMessage with RequiresLeaderSessionID
-
- // --------------------------------------------------------------------------
- // Utility Functions
- // --------------------------------------------------------------------------
-
- def createUpdateTaskMultiplePartitionInfos(
- executionID: ExecutionAttemptID,
- resultIDs: java.util.List[IntermediateDataSetID],
- partitionInfos: java.util.List[InputChannelDeploymentDescriptor])
- : UpdateTaskMultiplePartitionInfos = {
-
- require(resultIDs.size() == partitionInfos.size(),
- "ResultIDs must have the same length as partitionInfos.")
-
- val partitionInfoList = new util.ArrayList[PartitionInfo](resultIDs.size())
-
- for (i <- 0 until resultIDs.size()) {
- partitionInfoList.add(new PartitionInfo(resultIDs.get(i),
partitionInfos.get(i)))
- }
-
- new UpdateTaskMultiplePartitionInfos(
- executionID,
- partitionInfoList)
- }
-}
diff --git
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
deleted file mode 100644
index acccd4e..0000000
---
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.messages
-
-import java.util.UUID
-import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
-import org.apache.flink.runtime.instance.InstanceID
-
-/**
- * Miscellaneous actor messages exchanged with the TaskManager.
- */
-object TaskManagerMessages {
-
- /**
- * This message informs the TaskManager about a fatal error that prevents
- * it from continuing.
- *
- * @param description The description of the problem
- */
- case class FatalError(description: String, cause: Throwable)
-
- /**
- * Tells the task manager to send a heartbeat message to the job manager.
- */
- case object SendHeartbeat {
-
- /**
- * Accessor for the case object instance, to simplify Java
interoperability.
- *
- * @return The SendHeartbeat case object instance.
- */
- def get() : SendHeartbeat.type = SendHeartbeat
- }
-
- /**
- * Reports liveliness of the TaskManager instance with the given instance ID
to the
- * This message is sent to the job.
- *
- * @param instanceID The instance ID of the reporting TaskManager.
- * @param accumulators Accumulators of tasks serialized as Tuple2[internal,
user-defined]
- */
- case class Heartbeat(instanceID: InstanceID, accumulators:
Seq[AccumulatorSnapshot])
-
-
- // --------------------------------------------------------------------------
- // Reporting the current TaskManager stack trace
- // --------------------------------------------------------------------------
-
- /**
- * Tells the TaskManager to send a stack trace of all threads to the sender.
- * The response to this message is the [[StackTrace]] message.
- */
- case object SendStackTrace {
-
- /**
- * Accessor for the case object instance, to simplify Java
interoperability.
- *
- * @return The SendStackTrace case object instance.
- */
- def get() : SendStackTrace.type = SendStackTrace
- }
-
- /**
- * Communicates the stack trace of the TaskManager with the given ID.
- * This message is the response to [[SendStackTrace]].
- *
- * @param instanceID The ID of the responding task manager.
- * @param stackTrace The stack trace, as a string.
- */
- case class StackTrace(instanceID: InstanceID, stackTrace: String)
-
-
- // --------------------------------------------------------------------------
- // Utility messages used for notifications during TaskManager startup
- // --------------------------------------------------------------------------
-
- /**
- * Requests a notification from the task manager as soon as the task manager
has been
- * registered at a job manager. Once the task manager is registered at a job
manager a
- * [[RegisteredAtJobManager]] message will be sent to the sender.
- */
- case object NotifyWhenRegisteredAtJobManager
-
- /**
- * Acknowledges that the task manager has been successfully registered at
any job manager. This
- * message is a response to [[NotifyWhenRegisteredAtJobManager]] and
contains the current leader
- * session id.
- */
- case class RegisteredAtJobManager(leaderId: UUID)
-
- /** Tells the address of the new leading
org.apache.flink.runtime.jobmanager.JobManager
- * and the new leader session ID.
- *
- * @param jobManagerAddress Address of the new leading JobManager
- * @param leaderSessionID New leader session ID
- */
- case class JobManagerLeaderAddress(jobManagerAddress: String,
leaderSessionID: UUID)
-
- /** Trait do differentiate which log file is requested */
- sealed trait LogTypeRequest
-
- /** Indicates a request for the .log file */
- case object LogFileRequest extends LogTypeRequest
-
- /** Indicates a request for the .out file */
- case object StdOutFileRequest extends LogTypeRequest
-
- /** Requests the TaskManager to upload either his log/stdout file to the
Blob store
- * param requestType LogTypeRequest indicating which file is requested
- */
- case class RequestTaskManagerLog(requestType : LogTypeRequest)
-
- /** Requests the number of active connections at the ConnectionManager */
- case object RequestNumActiveConnections
-
- case class ResponseNumActiveConnections(number: Int)
-
- /** Requests the number of broadcast variables with references */
- case object RequestBroadcastVariablesWithReferences
-
- case class ResponseBroadcastVariablesWithReferences(number: Int)
-
-
- // --------------------------------------------------------------------------
- // Utility getters for case objects to simplify access from Java
- // --------------------------------------------------------------------------
-
- /**
- * Accessor for the case object instance, to simplify Java interoperability.
- *
- * @return The NotifyWhenRegisteredAtJobManager case object instance.
- */
- def getNotifyWhenRegisteredAtJobManagerMessage:
- NotifyWhenRegisteredAtJobManager.type = NotifyWhenRegisteredAtJobManager
-
- /**
- * Accessor for the case object instance, to simplify Java interoperability.
- * @return The RequestTaskManagerLog case object instance.
- */
- def getRequestTaskManagerLog(): AnyRef = {
- RequestTaskManagerLog(LogFileRequest)
- }
-
- /**
- * Accessor for the case object instance, to simplify Java interoperability.
- * @return The RequestTaskManagerStdout case object instance.
- */
- def getRequestTaskManagerStdout(): AnyRef = {
- RequestTaskManagerLog(StdOutFileRequest)
- }
-
- /**
- * Accessor for the case object instance, to simplify Java interoperability.
- * @return The RequestBroadcastVariablesWithReferences case object instance.
- */
- def getRequestBroadcastVariablesWithReferences():
RequestBroadcastVariablesWithReferences.type = {
- RequestBroadcastVariablesWithReferences
- }
-
- /**
- * Accessor for the case object instance, to simplify Java interoperability.
- * @return The RequestNumActiveConnections case object instance.
- */
- def getRequestNumActiveConnections(): RequestNumActiveConnections.type = {
- RequestNumActiveConnections
- }
-}
diff --git
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/accumulators/AccumulatorMessages.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/accumulators/AccumulatorMessages.scala
deleted file mode 100644
index 9ed01aa..0000000
---
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/accumulators/AccumulatorMessages.scala
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.messages.accumulators
-
-import org.apache.flink.api.common.JobID
-import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult
-import org.apache.flink.util.{OptionalFailure, SerializedValue}
-
-/**
- * Base trait of all accumulator messages
- */
-sealed trait AccumulatorMessage {
-
- /** ID of the job that the accumulator belongs to */
- val jobID: JobID
-}
-
-/**
- * Base trait of responses to [[RequestAccumulatorResults]]
- */
-sealed trait AccumulatorResultsResponse extends AccumulatorMessage
-
-/**
- * Requests the accumulator results of the job identified by [[jobID]] from
the job manager.
- * The result is sent back to the sender as a [[AccumulatorResultsResponse]]
message.
- *
- * @param jobID Job Id of the job that the accumulator belongs to
- */
-case class RequestAccumulatorResults(jobID: JobID)
- extends AccumulatorMessage
-
-/**
- * Requests the accumulator results of the job as strings. This is used to
request accumulators to
- * be displayed for example in the web frontend. Because it transports only
- * Strings, not custom objects, it is safe for user-defined types and class
loading.
- *
- * @param jobID Job Id of the job that the accumulator belongs to
- */
-case class RequestAccumulatorResultsStringified(jobID: JobID)
- extends AccumulatorMessage
-
-/**
- * Contains the retrieved accumulator results from the job manager. This
response is triggered
- * by [[RequestAccumulatorResults]].
- *
- * @param jobID Job Id of the job that the accumulator belongs to
- * @param result The accumulator result values, in serialized form.
- */
-case class AccumulatorResultsFound(
- jobID: JobID,
- result: java.util.Map[String, SerializedValue[OptionalFailure[Object]]])
- extends AccumulatorResultsResponse
-
-/**
- * Contains the retrieved accumulator result strings from the job manager.
- *
- * @param jobID Job Id of the job that the accumulator belongs to
- * @param result The accumulator result values, in string form.
- */
-case class AccumulatorResultStringsFound(jobID: JobID,
- result:
Array[StringifiedAccumulatorResult])
- extends AccumulatorResultsResponse
-
-/**
- * Denotes that no accumulator results for [[jobID]] could be found at the job
manager.
- *
- * @param jobID Job Id of the job that the accumulators were requested for.
- */
-case class AccumulatorResultsNotFound(jobID: JobID)
- extends AccumulatorResultsResponse
-
-/**
- * Denotes that the accumulator results for [[jobID]] could be obtained from
- * the JobManager because of an exception.
- *
- * @param jobID Job Id of the job that the accumulators were requested for.
- * @param cause Reason why the accumulators were not found.
- */
-case class AccumulatorResultsErroneous(jobID: JobID, cause: Exception)
- extends AccumulatorResultsResponse
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorTest.java
index ed98be5..8693b16 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
-import
org.apache.flink.runtime.messages.StackTraceSampleMessages.TriggerStackTraceSample;
import org.apache.flink.runtime.messages.StackTraceSampleResponse;
import org.apache.flink.util.TestLogger;
@@ -94,16 +93,6 @@ public class StackTraceSampleCoordinatorTest extends
TestLogger {
// Verify messages have been sent
for (ExecutionVertex vertex : vertices) {
- ExecutionAttemptID expectedExecutionId = vertex
-
.getCurrentExecutionAttempt().getAttemptId();
-
- TriggerStackTraceSample expectedMsg = new
TriggerStackTraceSample(
- 0,
- expectedExecutionId,
- numSamples,
- delayBetweenSamples,
- maxStackTraceDepth);
-
Mockito.verify(vertex.getCurrentExecutionAttempt())
.requestStackTraceSample(Matchers.eq(0),
Matchers.eq(numSamples), Matchers.eq(delayBetweenSamples),
Matchers.eq(maxStackTraceDepth), Matchers.any(Time.class));
}