This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 94007ff679e  [FLINK-37258][Runtime] Return Ordered Job list on 
Disptacher#requestMultipleJobDetails
94007ff679e is described below

commit 94007ff679ed544c29bde0a321267bb4aeb748b6
Author: Ahmed Hamdy <[email protected]>
AuthorDate: Wed Feb 5 14:04:54 2025 +0000

     [FLINK-37258][Runtime] Return Ordered Job list on 
Disptacher#requestMultipleJobDetails
---
 .../flink/runtime/dispatcher/Dispatcher.java       | 15 +++-
 .../flink/runtime/dispatcher/DispatcherTest.java   | 95 ++++++++++++++++++++++
 2 files changed, 108 insertions(+), 2 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 266281134fb..5c9930be711 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -878,8 +878,19 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
 
                     completedJobDetails.forEach(job -> 
deduplicatedJobs.put(job.getJobId(), job));
                     runningJobDetails.forEach(job -> 
deduplicatedJobs.put(job.getJobId(), job));
-
-                    return new MultipleJobsDetails(new 
HashSet<>(deduplicatedJobs.values()));
+                    Collection<JobDetails> orderedDeduplicatedJobs =
+                            deduplicatedJobs.values().stream()
+                                    .sorted(
+                                            (jd1, jd2) ->
+                                                    jd1.getStartTime() == 
jd2.getStartTime()
+                                                            ? jd1.getJobId()
+                                                                    
.compareTo(jd2.getJobId())
+                                                            : Long.compare(
+                                                                    
jd2.getStartTime(),
+                                                                    
jd1.getStartTime()))
+                                    .collect(Collectors.toList());
+
+                    return new MultipleJobsDetails(orderedDeduplicatedJobs);
                 });
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index a1d44b3712e..e08f7ff8ac1 100755
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -127,6 +127,8 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Optional;
 import java.util.Queue;
 import java.util.UUID;
@@ -140,6 +142,8 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -1158,6 +1162,49 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
                 JobStatus.FINISHED, 
dispatcherGateway.requestMultipleJobDetails(TIMEOUT).get());
     }
 
+    @Test
+    public void 
testRequestMultipleJobDetails_returnsJobsOfSameStateOrderedByStartTimeInDecOrder()
+            throws Exception {
+        final JobID secondJobID = new JobID();
+        JobGraph secondJobGraph = JobGraphTestUtils.streamingJobGraph();
+        secondJobGraph.setJobID(secondJobID);
+        final JobManagerRunnerFactory blockingJobMaster =
+                new QueuedJobManagerRunnerFactory(
+                        runningJobManagerRunnerWithJobStatus(
+                                JobStatus.RUNNING, jobId, 0L, 100L, 110L),
+                        runningJobManagerRunnerWithJobStatus(
+                                JobStatus.RUNNING, secondJobID, 10L, 11L, 
12L));
+
+        DispatcherGateway dispatcherGateway =
+                createDispatcherAndStartJobs(
+                        blockingJobMaster, Arrays.asList(jobGraph, 
secondJobGraph));
+
+        assertOnlyContainsRunningJobsWithOrder(
+                dispatcherGateway.requestMultipleJobDetails(TIMEOUT).get(),
+                Arrays.asList(secondJobID, jobId));
+    }
+
+    @Test
+    public void
+            
testRequestMultipleJobDetails_returnsJobsOfSameStateOrderedByJobIdWhenSameStartTime()
+                    throws Exception {
+        final JobID secondJobID = new JobID();
+        JobGraph secondJobGraph = JobGraphTestUtils.streamingJobGraph();
+        secondJobGraph.setJobID(secondJobID);
+        final JobManagerRunnerFactory blockingJobMaster =
+                new QueuedJobManagerRunnerFactory(
+                        
runningJobManagerRunnerWithJobStatus(JobStatus.RUNNING, jobId, 10L),
+                        
runningJobManagerRunnerWithJobStatus(JobStatus.RUNNING, secondJobID, 10L));
+
+        DispatcherGateway dispatcherGateway =
+                createDispatcherAndStartJobs(
+                        blockingJobMaster, Arrays.asList(jobGraph, 
secondJobGraph));
+
+        assertOnlyContainsRunningJobsWithOrder(
+                dispatcherGateway.requestMultipleJobDetails(TIMEOUT).get(),
+                Stream.of(jobId, 
secondJobID).sorted().collect(Collectors.toList()));
+    }
+
     @Test
     public void testRequestMultipleJobDetails_isSerializable() throws 
Exception {
         final JobManagerRunnerFactory blockingJobMaster =
@@ -1225,7 +1272,26 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
 
     private JobManagerRunner runningJobManagerRunnerWithJobStatus(
             final JobStatus currentJobStatus) {
+        return runningJobManagerRunnerWithJobStatus(currentJobStatus, jobId, 
0L);
+    }
+
+    private JobManagerRunner runningJobManagerRunnerWithJobStatus(
+            final JobStatus currentJobStatus, final JobID jobId, long 
startTime) {
+        return runningJobManagerRunnerWithJobStatus(
+                currentJobStatus, jobId, startTime, startTime, startTime);
+    }
+
+    private JobManagerRunner runningJobManagerRunnerWithJobStatus(
+            final JobStatus currentJobStatus,
+            final JobID jobId,
+            long startTime,
+            long transitionToCreatedTimestamp,
+            long transitionToRunningTimestamp) {
         Preconditions.checkArgument(!currentJobStatus.isTerminalState());
+        long[] stateTimeStampsForRunningJob = new 
long[JobStatus.values().length];
+        stateTimeStampsForRunningJob[JobStatus.INITIALIZING.ordinal()] = 
startTime;
+        stateTimeStampsForRunningJob[JobStatus.CREATED.ordinal()] = 
transitionToCreatedTimestamp;
+        stateTimeStampsForRunningJob[JobStatus.RUNNING.ordinal()] = 
transitionToRunningTimestamp;
 
         return TestingJobManagerRunner.newBuilder()
                 .setJobId(jobId)
@@ -1235,6 +1301,7 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
                                         new ArchivedExecutionGraphBuilder()
                                                 .setJobID(jobId)
                                                 .setState(currentJobStatus)
+                                                
.setStateTimestamps(stateTimeStampsForRunningJob)
                                                 .build()))
                 .build();
     }
@@ -1256,6 +1323,20 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
                 .build();
     }
 
+    private DispatcherGateway createDispatcherAndStartJobs(
+            final JobManagerRunnerFactory jobManagerRunnerFactory, final 
List<JobGraph> jobGraphs)
+            throws Exception {
+        dispatcher =
+                createAndStartDispatcher(heartbeatServices, haServices, 
jobManagerRunnerFactory);
+        DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
+        jobMasterLeaderElection.isLeader(UUID.randomUUID());
+        for (JobGraph jobGraph : jobGraphs) {
+            dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+        }
+
+        return dispatcherGateway;
+    }
+
     private static void assertOnlyContainsSingleJobWithState(
             final JobStatus expectedJobStatus, final MultipleJobsDetails 
multipleJobsDetails) {
         final Collection<JobDetails> finishedJobDetails = 
multipleJobsDetails.getJobs();
@@ -1263,6 +1344,20 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
         
assertThat(finishedJobDetails.iterator().next().getStatus()).isEqualTo(expectedJobStatus);
     }
 
+    private static void assertOnlyContainsRunningJobsWithOrder(
+            final MultipleJobsDetails multipleJobsDetails,
+            final List<JobID> expectedOrderedJobIDs) {
+        final Collection<JobDetails> finishedJobDetails = 
multipleJobsDetails.getJobs();
+        assertThat(finishedJobDetails).isInstanceOf(List.class);
+        assertThat(finishedJobDetails).hasSize(expectedOrderedJobIDs.size());
+        Iterator<JobDetails> jobDetailsIterator = 
finishedJobDetails.iterator();
+        for (final JobID nextExpectedJobId : expectedOrderedJobIDs) {
+            final JobDetails jobDetails = jobDetailsIterator.next();
+            assertThat(jobDetails.getStatus()).isEqualTo(JobStatus.RUNNING);
+            assertThat(jobDetails.getJobId()).isEqualTo(nextExpectedJobId);
+        }
+    }
+
     @Test
     public void testOnlyRecoveredJobsAreRetainedInTheBlobServer() throws 
Exception {
         final JobID jobId1 = new JobID();

Reply via email to