This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new e358ac6 [FLINK-25732][coordination] Pass serializable collection
e358ac6 is described below
commit e358ac67d2d619fee0c12bf875c17728224dd2a5
Author: Chesnay Schepler <[email protected]>
AuthorDate: Thu Jan 20 13:40:38 2022 +0100
[FLINK-25732][coordination] Pass serializable collection
---
.../apache/flink/runtime/dispatcher/Dispatcher.java | 2 +-
.../flink/runtime/dispatcher/DispatcherTest.java | 20 ++++++++++++++++++++
2 files changed, 21 insertions(+), 1 deletion(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index a3049dc..121990e 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -585,7 +585,7 @@ public abstract class Dispatcher extends
PermanentlyFencedRpcEndpoint<Dispatcher
completedJobDetails.forEach(job ->
deduplicatedJobs.put(job.getJobId(), job));
runningJobDetails.forEach(job ->
deduplicatedJobs.put(job.getJobId(), job));
- return new MultipleJobsDetails(deduplicatedJobs.values());
+ return new MultipleJobsDetails(new
HashSet<>(deduplicatedJobs.values()));
});
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 130be52..68f818c 100755
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -79,6 +79,7 @@ import
org.apache.flink.runtime.testutils.TestingJobGraphStore;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TimeUtils;
import org.apache.flink.util.function.ThrowingRunnable;
@@ -919,6 +920,25 @@ public class DispatcherTest extends AbstractDispatcherTest
{
JobStatus.FINISHED,
dispatcherGateway.requestMultipleJobDetails(TIMEOUT).get());
}
+ @Test
+ public void testRequestMultipleJobDetails_isSerializable() throws
Exception {
+ final JobManagerRunnerFactory blockingJobMaster =
+ new QueuedJobManagerRunnerFactory(
+
completedJobManagerRunnerWithJobStatus(JobStatus.SUSPENDED));
+
+ dispatcher = createAndStartDispatcher(heartbeatServices, haServices,
blockingJobMaster);
+ DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
+ jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+
+ dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+ dispatcherGateway.requestJobResult(jobId, TIMEOUT).get();
+
+ final MultipleJobsDetails multipleJobsDetails =
+ dispatcherGateway.requestMultipleJobDetails(TIMEOUT).get();
+
+ InstantiationUtil.serializeObject(multipleJobsDetails);
+ }
+
private JobManagerRunner runningJobManagerRunnerWithJobStatus(
final JobStatus currentJobStatus) {
Preconditions.checkArgument(!currentJobStatus.isTerminalState());