This is an automated email from the ASF dual-hosted git repository.
mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 94007ff679e [FLINK-37258][Runtime] Return Ordered Job list on
Disptacher#requestMultipleJobDetails
94007ff679e is described below
commit 94007ff679ed544c29bde0a321267bb4aeb748b6
Author: Ahmed Hamdy <[email protected]>
AuthorDate: Wed Feb 5 14:04:54 2025 +0000
[FLINK-37258][Runtime] Return Ordered Job list on
Disptacher#requestMultipleJobDetails
---
.../flink/runtime/dispatcher/Dispatcher.java | 15 +++-
.../flink/runtime/dispatcher/DispatcherTest.java | 95 ++++++++++++++++++++++
2 files changed, 108 insertions(+), 2 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 266281134fb..5c9930be711 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -878,8 +878,19 @@ public abstract class Dispatcher extends
FencedRpcEndpoint<DispatcherId>
completedJobDetails.forEach(job ->
deduplicatedJobs.put(job.getJobId(), job));
runningJobDetails.forEach(job ->
deduplicatedJobs.put(job.getJobId(), job));
-
- return new MultipleJobsDetails(new
HashSet<>(deduplicatedJobs.values()));
+ Collection<JobDetails> orderedDeduplicatedJobs =
+ deduplicatedJobs.values().stream()
+ .sorted(
+ (jd1, jd2) ->
+ jd1.getStartTime() ==
jd2.getStartTime()
+ ? jd1.getJobId()
+
.compareTo(jd2.getJobId())
+ : Long.compare(
+
jd2.getStartTime(),
+
jd1.getStartTime()))
+ .collect(Collectors.toList());
+
+ return new MultipleJobsDetails(orderedDeduplicatedJobs);
});
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index a1d44b3712e..e08f7ff8ac1 100755
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -127,6 +127,8 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.UUID;
@@ -140,6 +142,8 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
import static org.assertj.core.api.Assertions.assertThat;
@@ -1158,6 +1162,49 @@ public class DispatcherTest extends
AbstractDispatcherTest {
JobStatus.FINISHED,
dispatcherGateway.requestMultipleJobDetails(TIMEOUT).get());
}
+ @Test
+ public void
testRequestMultipleJobDetails_returnsJobsOfSameStateOrderedByStartTimeInDecOrder()
+ throws Exception {
+ final JobID secondJobID = new JobID();
+ JobGraph secondJobGraph = JobGraphTestUtils.streamingJobGraph();
+ secondJobGraph.setJobID(secondJobID);
+ final JobManagerRunnerFactory blockingJobMaster =
+ new QueuedJobManagerRunnerFactory(
+ runningJobManagerRunnerWithJobStatus(
+ JobStatus.RUNNING, jobId, 0L, 100L, 110L),
+ runningJobManagerRunnerWithJobStatus(
+ JobStatus.RUNNING, secondJobID, 10L, 11L,
12L));
+
+ DispatcherGateway dispatcherGateway =
+ createDispatcherAndStartJobs(
+ blockingJobMaster, Arrays.asList(jobGraph,
secondJobGraph));
+
+ assertOnlyContainsRunningJobsWithOrder(
+ dispatcherGateway.requestMultipleJobDetails(TIMEOUT).get(),
+ Arrays.asList(secondJobID, jobId));
+ }
+
+ @Test
+ public void
+
testRequestMultipleJobDetails_returnsJobsOfSameStateOrderedByJobIdWhenSameStartTime()
+ throws Exception {
+ final JobID secondJobID = new JobID();
+ JobGraph secondJobGraph = JobGraphTestUtils.streamingJobGraph();
+ secondJobGraph.setJobID(secondJobID);
+ final JobManagerRunnerFactory blockingJobMaster =
+ new QueuedJobManagerRunnerFactory(
+
runningJobManagerRunnerWithJobStatus(JobStatus.RUNNING, jobId, 10L),
+
runningJobManagerRunnerWithJobStatus(JobStatus.RUNNING, secondJobID, 10L));
+
+ DispatcherGateway dispatcherGateway =
+ createDispatcherAndStartJobs(
+ blockingJobMaster, Arrays.asList(jobGraph,
secondJobGraph));
+
+ assertOnlyContainsRunningJobsWithOrder(
+ dispatcherGateway.requestMultipleJobDetails(TIMEOUT).get(),
+ Stream.of(jobId,
secondJobID).sorted().collect(Collectors.toList()));
+ }
+
@Test
public void testRequestMultipleJobDetails_isSerializable() throws
Exception {
final JobManagerRunnerFactory blockingJobMaster =
@@ -1225,7 +1272,26 @@ public class DispatcherTest extends
AbstractDispatcherTest {
private JobManagerRunner runningJobManagerRunnerWithJobStatus(
final JobStatus currentJobStatus) {
+ return runningJobManagerRunnerWithJobStatus(currentJobStatus, jobId,
0L);
+ }
+
+ private JobManagerRunner runningJobManagerRunnerWithJobStatus(
+ final JobStatus currentJobStatus, final JobID jobId, long
startTime) {
+ return runningJobManagerRunnerWithJobStatus(
+ currentJobStatus, jobId, startTime, startTime, startTime);
+ }
+
+ private JobManagerRunner runningJobManagerRunnerWithJobStatus(
+ final JobStatus currentJobStatus,
+ final JobID jobId,
+ long startTime,
+ long transitionToCreatedTimestamp,
+ long transitionToRunningTimestamp) {
Preconditions.checkArgument(!currentJobStatus.isTerminalState());
+ long[] stateTimeStampsForRunningJob = new
long[JobStatus.values().length];
+ stateTimeStampsForRunningJob[JobStatus.INITIALIZING.ordinal()] =
startTime;
+ stateTimeStampsForRunningJob[JobStatus.CREATED.ordinal()] =
transitionToCreatedTimestamp;
+ stateTimeStampsForRunningJob[JobStatus.RUNNING.ordinal()] =
transitionToRunningTimestamp;
return TestingJobManagerRunner.newBuilder()
.setJobId(jobId)
@@ -1235,6 +1301,7 @@ public class DispatcherTest extends
AbstractDispatcherTest {
new ArchivedExecutionGraphBuilder()
.setJobID(jobId)
.setState(currentJobStatus)
+
.setStateTimestamps(stateTimeStampsForRunningJob)
.build()))
.build();
}
@@ -1256,6 +1323,20 @@ public class DispatcherTest extends
AbstractDispatcherTest {
.build();
}
+ private DispatcherGateway createDispatcherAndStartJobs(
+ final JobManagerRunnerFactory jobManagerRunnerFactory, final
List<JobGraph> jobGraphs)
+ throws Exception {
+ dispatcher =
+ createAndStartDispatcher(heartbeatServices, haServices,
jobManagerRunnerFactory);
+ DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
+ jobMasterLeaderElection.isLeader(UUID.randomUUID());
+ for (JobGraph jobGraph : jobGraphs) {
+ dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+ }
+
+ return dispatcherGateway;
+ }
+
private static void assertOnlyContainsSingleJobWithState(
final JobStatus expectedJobStatus, final MultipleJobsDetails
multipleJobsDetails) {
final Collection<JobDetails> finishedJobDetails =
multipleJobsDetails.getJobs();
@@ -1263,6 +1344,20 @@ public class DispatcherTest extends
AbstractDispatcherTest {
assertThat(finishedJobDetails.iterator().next().getStatus()).isEqualTo(expectedJobStatus);
}
+ private static void assertOnlyContainsRunningJobsWithOrder(
+ final MultipleJobsDetails multipleJobsDetails,
+ final List<JobID> expectedOrderedJobIDs) {
+ final Collection<JobDetails> finishedJobDetails =
multipleJobsDetails.getJobs();
+ assertThat(finishedJobDetails).isInstanceOf(List.class);
+ assertThat(finishedJobDetails).hasSize(expectedOrderedJobIDs.size());
+ Iterator<JobDetails> jobDetailsIterator =
finishedJobDetails.iterator();
+ for (final JobID nextExpectedJobId : expectedOrderedJobIDs) {
+ final JobDetails jobDetails = jobDetailsIterator.next();
+ assertThat(jobDetails.getStatus()).isEqualTo(JobStatus.RUNNING);
+ assertThat(jobDetails.getJobId()).isEqualTo(nextExpectedJobId);
+ }
+ }
+
@Test
public void testOnlyRecoveredJobsAreRetainedInTheBlobServer() throws
Exception {
final JobID jobId1 = new JobID();