zhuzhurk commented on code in PR #20350:
URL: https://github.com/apache/flink/pull/20350#discussion_r935529490


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumedPartitionGroup.java:
##########
@@ -88,6 +95,10 @@ public boolean isEmpty() {
         return resultPartitions.isEmpty();
     }
 
+    public int getNumConsumers() {

Review Comment:
   It's better to add a comment to explain that the number of consumers of 
`ConsumedPartitionGroup` can be different even if  they contains the same 
`IntermediateResultPartition`, in dynamic graph cases.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtils.java:
##########
@@ -145,13 +144,17 @@ private static Map<IntermediateDataSetID, Integer> 
getMaxSubpartitionNums(
 
         for (int i = 0; i < producedDataSets.size(); i++) {
             IntermediateDataSet producedDataSet = producedDataSets.get(i);
-            JobEdge outputEdge = checkNotNull(producedDataSet.getConsumer());
-            ExecutionJobVertex consumerJobVertex = 
ejvs.apply(outputEdge.getTarget().getID());
-            int maxNum =
-                    
EdgeManagerBuildUtil.computeMaxEdgesToTargetExecutionVertex(
-                            ejv.getParallelism(),
-                            consumerJobVertex.getParallelism(),
-                            outputEdge.getDistributionPattern());
+            int maxNum = 0;
+            for (JobEdge outputEdge : producedDataSet.getConsumers()) {
+                ExecutionJobVertex consumerJobVertex = 
ejvs.apply(outputEdge.getTarget().getID());
+                maxNum =
+                        Math.max(
+                                maxNum,
+                                
EdgeManagerBuildUtil.computeMaxEdgesToTargetExecutionVertex(
+                                        ejv.getParallelism(),
+                                        consumerJobVertex.getParallelism(),
+                                        outputEdge.getDistributionPattern()));

Review Comment:
   Maybe just compute it from the first consumer group? This method is only 
used for non-dynamic graph, and therefore the consumer vertices' parallelisms 
and distribution patterns must be the same. It's also better to add a comment 
to explain the assumption here.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java:
##########
@@ -56,6 +67,26 @@ public IntermediateResultPartition(
         this.producer = producer;
         this.partitionId = new 
IntermediateResultPartitionID(totalResult.getId(), partitionNumber);
         this.edgeManager = edgeManager;
+        this.consumerVertices = totalResult.getConsumerVertices();
+    }
+
+    public void releaseConsumedPartitionGroup(ConsumedPartitionGroup 
partitionGroup) {
+        releasedPartitionGroups.add(partitionGroup);
+    }
+
+    public boolean canBeReleased() {
+        if (releasedPartitionGroups.size()
+                != 
edgeManager.getNumberOfConsumedPartitionGroupsById(partitionId)) {
+            return false;
+        }
+        for (JobVertexID jobVertexID : consumerVertices) {

Review Comment:
   nit: jobVertexID -> jobVertexId



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java:
##########
@@ -162,20 +176,52 @@ int getNumParallelProducers() {
         return numParallelProducers;
     }
 
-    ExecutionJobVertex getConsumerExecutionJobVertex() {
-        final JobEdge consumer = 
checkNotNull(intermediateDataSet.getConsumer());
-        final JobVertexID consumerJobVertexId = consumer.getTarget().getID();
-        return 
checkNotNull(getProducer().getGraph().getJobVertex(consumerJobVertexId));
+    int getConsumersParallelism() {
+        List<JobEdge> consumers = intermediateDataSet.getConsumers();
+        checkState(!consumers.isEmpty());
+
+        InternalExecutionGraphAccessor graph = getProducer().getGraph();
+        int consumersParallelism =
+                
graph.getJobVertex(consumers.get(0).getTarget().getID()).getParallelism();
+        if (consumers.size() == 1) {
+            return consumersParallelism;
+        }
+
+        // sanity check, all consumer vertices must have the same parallelism
+        for (JobVertexID jobVertexID : consumerVertices) {

Review Comment:
   Better to add some comments to explain the assumption that
   - the parallelisms will all be -1 for vertices that are not assigned a 
parallelism initially (this also means the parallelisms of the downstream 
vertices are not decided yet at this moment)
   - the parallelisms must be the same for vertices that are initially assigned 
a parallelism
   
   Above are just my understanding and you may need to organize them in a clear 
way.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java:
##########
@@ -528,4 +546,150 @@ public static void verifyGeneratedExecutionJobVertex(
             subtaskIndex++;
         }
     }
+
+    public static SchedulerBase createSchedulerAndDeploy(

Review Comment:
   This method should be added to `SchedulerTestingUtils`.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java:
##########
@@ -251,11 +279,24 @@ public static ExecutionGraph createExecutionGraph(
             v2.setMaxParallelism(consumerMaxParallelism);
         }
 
-        v2.connectNewDataSetAsInput(v1, distributionPattern, 
ResultPartitionType.BLOCKING);
+        final JobVertex v3 = new JobVertex("v3");

Review Comment:
   To double confirm, is this change helps to verify that the previous test can 
still pass with no exception with multi consumer job vertices?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java:
##########
@@ -56,6 +67,26 @@ public IntermediateResultPartition(
         this.producer = producer;
         this.partitionId = new 
IntermediateResultPartitionID(totalResult.getId(), partitionNumber);
         this.edgeManager = edgeManager;
+        this.consumerVertices = totalResult.getConsumerVertices();
+    }
+
+    public void releaseConsumedPartitionGroup(ConsumedPartitionGroup 
partitionGroup) {
+        releasedPartitionGroups.add(partitionGroup);
+    }
+
+    public boolean canBeReleased() {
+        if (releasedPartitionGroups.size()
+                != 
edgeManager.getNumberOfConsumedPartitionGroupsById(partitionId)) {
+            return false;
+        }
+        for (JobVertexID jobVertexID : consumerVertices) {
+            // for adaptive scheduler, if any consumer vertex is still not 
initialized, this result

Review Comment:
   adaptive scheduler -> dynamic graph
   
   This is more accurate.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobEdge.java:
##########
@@ -74,36 +74,21 @@ public class JobEdge implements java.io.Serializable {
      * @param source The data set that is at the source of this edge.
      * @param target The operation that is at the target of this edge.
      * @param distributionPattern The pattern that defines how the connection 
behaves in parallel.
+     * @param isBroadcast Whether the source broadcasts data to the target.
      */
     public JobEdge(
-            IntermediateDataSet source, JobVertex target, DistributionPattern 
distributionPattern) {
+            IntermediateDataSet source,
+            JobVertex target,
+            DistributionPattern distributionPattern,
+            boolean isBroadcast) {

Review Comment:
   With this change, `source` can be final and is never null. `sourceId` is no 
longer needed and should be retrieved from the `source`. `isIdReference` can be 
removed, as well as its usages.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java:
##########
@@ -47,6 +49,15 @@ public class IntermediateResultPartition {
     /** Whether this partition has produced some data. */
     private boolean hasDataProduced = false;
 
+    /**
+     * Released {@link ConsumedPartitionGroup}s for this result partition. 
This result partition can
+     * be released if all {@link ConsumedPartitionGroup}s are released.
+     */
+    private final Set<ConsumedPartitionGroup> releasedPartitionGroups = new 
HashSet<>();
+
+    /** All consumer job vertex ids of the corresponding {@link 
IntermediateResult}. */
+    private final List<JobVertexID> consumerVertices;

Review Comment:
   It's better to add a `getConsumerJobVertices` method to return 
`totalResult.getConsumerVertices()`, instead of maintain the list by itself. 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java:
##########
@@ -718,31 +715,34 @@ public CompletableFuture<?> suspend() {
     }
 
     private void updatePartitionConsumers(final IntermediateResultPartition 
partition) {
-        final Optional<ConsumerVertexGroup> consumerVertexGroup =
-                partition.getConsumerVertexGroupOptional();
-        if (!consumerVertexGroup.isPresent()) {
+        final List<ConsumerVertexGroup> consumerVertexGroups = 
partition.getConsumerVertexGroups();
+        if (consumerVertexGroups.isEmpty()) {
             return;
         }
-        for (ExecutionVertexID consumerVertexId : consumerVertexGroup.get()) {
-            final ExecutionVertex consumerVertex =
-                    
vertex.getExecutionGraphAccessor().getExecutionVertexOrThrow(consumerVertexId);
-            final Execution consumer = 
consumerVertex.getCurrentExecutionAttempt();
-            final ExecutionState consumerState = consumer.getState();
-
-            // ----------------------------------------------------------------
-            // Consumer is recovering or running => send update message now
-            // Consumer is deploying => cache the partition info which would be
-            // sent after switching to running
-            // ----------------------------------------------------------------
-            if (consumerState == DEPLOYING
-                    || consumerState == RUNNING
-                    || consumerState == INITIALIZING) {
-                final PartitionInfo partitionInfo = 
createPartitionInfo(partition);
-
-                if (consumerState == DEPLOYING) {
-                    consumerVertex.cachePartitionInfo(partitionInfo);
-                } else {
-                    
consumer.sendUpdatePartitionInfoRpcCall(Collections.singleton(partitionInfo));
+        for (ConsumerVertexGroup consumerVertexGroup : consumerVertexGroups) {
+            for (ExecutionVertexID consumerVertexId : consumerVertexGroup) {
+                final ExecutionVertex consumerVertex =

Review Comment:
   The `consumerVertex` can be visited multiple times. It's better to compute a 
set of vertices and iterate over them later.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/BlockingResultPartitionReleaseTest.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.blob.TestingBlobWriter;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import 
org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
+import 
org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.SchedulerBase;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createSchedulerAndDeploy;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.transitionTasksToFinished;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests that blocking result partitions are properly released. */
+class BlockingResultPartitionReleaseTest {
+
+    @RegisterExtension
+    static final TestExecutorExtension<ScheduledExecutorService> 
EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorExtension();
+
+    private ScheduledExecutorService scheduledExecutorService;
+    private ComponentMainThreadExecutor mainThreadExecutor;
+    private ManuallyTriggeredScheduledExecutorService ioExecutor;
+
+    @BeforeEach
+    void setup() {
+        scheduledExecutorService = 
Executors.newSingleThreadScheduledExecutor();
+        mainThreadExecutor =
+                
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
+                        scheduledExecutorService);
+        ioExecutor = new ManuallyTriggeredScheduledExecutorService();
+    }
+
+    @AfterEach
+    void teardown() {
+        if (scheduledExecutorService != null) {
+            scheduledExecutorService.shutdownNow();
+        }
+    }
+
+    @Test
+    void testMultipleConsumersForAdaptiveScheduler() throws Exception {
+        testResultPartitionConsumedByMultiConsumers(true);
+    }
+
+    @Test
+    void testMultipleConsumersForDefaultScheduler() throws Exception {
+        testResultPartitionConsumedByMultiConsumers(false);
+    }
+
+    private void testResultPartitionConsumedByMultiConsumers(boolean 
isAdaptive) throws Exception {
+        int parallelism = 2;
+        JobID jobId = new JobID();
+        JobVertex producer = 
ExecutionGraphTestUtils.createNoOpVertex("producer", parallelism);
+        JobVertex consumer1 = 
ExecutionGraphTestUtils.createNoOpVertex("consumer1", parallelism);
+        JobVertex consumer2 = 
ExecutionGraphTestUtils.createNoOpVertex("consumer2", parallelism);
+
+        TestingPartitionTracker partitionTracker = new 
TestingPartitionTracker();
+        SchedulerBase scheduler =
+                createSchedulerAndDeploy(
+                        isAdaptive,
+                        jobId,
+                        producer,
+                        new JobVertex[] {consumer1, consumer2},
+                        DistributionPattern.ALL_TO_ALL,
+                        new TestingBlobWriter(Integer.MAX_VALUE),
+                        mainThreadExecutor,
+                        ioExecutor,
+                        partitionTracker,
+                        EXECUTOR_RESOURCE.getExecutor());
+        ExecutionGraph executionGraph = scheduler.getExecutionGraph();
+
+        assertThat(partitionTracker.releasedPartitions).isEmpty();
+
+        CompletableFuture.runAsync(
+                        () -> transitionTasksToFinished(executionGraph, 
consumer1.getID()),
+                        mainThreadExecutor)
+                .join();
+        ioExecutor.triggerAll();
+
+        assertThat(partitionTracker.releasedPartitions).isEmpty();
+
+        CompletableFuture.runAsync(
+                        () -> transitionTasksToFinished(executionGraph, 
consumer2.getID()),
+                        mainThreadExecutor)
+                .join();
+        ioExecutor.triggerAll();
+
+        
assertThat(partitionTracker.releasedPartitions.size()).isEqualTo(parallelism);
+        for (int i = 0; i < parallelism; ++i) {
+            ExecutionJobVertex ejv = 
checkNotNull(executionGraph.getJobVertex(producer.getID()));
+            assertThat(

Review Comment:
   Looks to me what should be verified here is that the `releasedPartitions` 
include all the partitions produced by the producer. It can be written as:
   ```
           ExecutionJobVertex ejv = 
checkNotNull(executionGraph.getJobVertex(producer.getID()));
           assertThat(
                           partitionTracker.releasedPartitions.stream()
                                   .map(ResultPartitionID::getPartitionId))
                   .containsExactlyInAnyOrder(
                           
Arrays.stream(ejv.getProducedDataSets()[0].getPartitions())
                                   
.map(IntermediateResultPartition::getPartitionId)
                                   
.toArray(IntermediateResultPartitionID[]::new));
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/EdgeManagerTest.java:
##########
@@ -85,13 +76,72 @@ public void testGetConsumedPartitionGroup() throws 
Exception {
         assertEquals(groupRetrievedByDownstreamVertex, 
groupRetrievedByIntermediateResultPartition);
 
         ConsumedPartitionGroup groupRetrievedByScheduledResultPartition =
-                scheduler
-                        .getExecutionGraph()
-                        .getSchedulingTopology()
+                eg.getSchedulingTopology()
                         .getResultPartition(consumedPartition.getPartitionId())
                         .getConsumedPartitionGroups()
                         .get(0);
 
         assertEquals(groupRetrievedByDownstreamVertex, 
groupRetrievedByScheduledResultPartition);
     }
+
+    @Test
+    public void testCalculateNumberOfConsumers() throws Exception {
+        testCalculateNumberOfConsumers(5, 2, ALL_TO_ALL, new int[] {2, 2});
+        testCalculateNumberOfConsumers(5, 2, POINTWISE, new int[] {1, 1});
+        testCalculateNumberOfConsumers(2, 5, ALL_TO_ALL, new int[] {5, 5, 5, 
5, 5});
+        testCalculateNumberOfConsumers(2, 5, POINTWISE, new int[] {3, 3, 3, 2, 
2});
+        testCalculateNumberOfConsumers(5, 5, ALL_TO_ALL, new int[] {5, 5, 5, 
5, 5});
+        testCalculateNumberOfConsumers(5, 5, POINTWISE, new int[] {1, 1, 1, 1, 
1});
+    }
+
+    private void testCalculateNumberOfConsumers(
+            int producerParallelism,
+            int consumerParallelism,
+            DistributionPattern distributionPattern,
+            int[] expectedConsumers)
+            throws Exception {
+        JobVertex producer = new JobVertex("producer");
+        JobVertex consumer = new JobVertex("consumer");
+        ExecutionGraph eg =
+                buildExecutionGraph(
+                        producer,
+                        consumer,
+                        producerParallelism,
+                        consumerParallelism,
+                        distributionPattern);
+        List<ConsumedPartitionGroup> partitionGroups =
+                
Arrays.stream(checkNotNull(eg.getJobVertex(consumer.getID())).getTaskVertices())
+                        .flatMap(ev -> 
ev.getAllConsumedPartitionGroups().stream())
+                        .collect(Collectors.toList());
+        int index = 0;
+        for (ConsumedPartitionGroup partitionGroup : partitionGroups) {
+            Assertions.assertThat(partitionGroup.getNumConsumers())
+                    .isEqualTo(expectedConsumers[index++]);
+        }
+    }
+
+    private DefaultExecutionGraph buildExecutionGraph(
+            JobVertex producer,
+            JobVertex consumer,
+            int producerParallelism,
+            int consumerParallelism,
+            DistributionPattern distributionPattern)
+            throws Exception {
+        producer.setParallelism(producerParallelism);
+        consumer.setParallelism(consumerParallelism);
+
+        producer.setInvokableClass(NoOpInvokable.class);
+        consumer.setInvokableClass(NoOpInvokable.class);
+
+        consumer.connectNewDataSetAsInput(
+                producer, distributionPattern, ResultPartitionType.BLOCKING);
+
+        JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(producer, 
consumer);
+        SchedulerBase scheduler =
+                SchedulerTestingUtils.createScheduler(
+                        jobGraph,
+                        
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
+                        EXECUTOR_RESOURCE.getExecutor());
+        return (DefaultExecutionGraph) scheduler.getExecutionGraph();

Review Comment:
   Seems there is no need to convert it to a `DefaultExecutionGraph`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java:
##########
@@ -47,6 +49,15 @@ public class IntermediateResultPartition {
     /** Whether this partition has produced some data. */
     private boolean hasDataProduced = false;
 
+    /**
+     * Released {@link ConsumedPartitionGroup}s for this result partition. 
This result partition can
+     * be released if all {@link ConsumedPartitionGroup}s are released.
+     */
+    private final Set<ConsumedPartitionGroup> releasedPartitionGroups = new 
HashSet<>();

Review Comment:
   Looks to me it's enough to recording the number of releasable groups?
   
   I also think it's better to name it as `releasablePartitionGroups`, and 
rename `releaseConsumedPartitionGroup()` to `markPartitionGroupReleasable()`, 
the comments should be updated as well.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/BlockingResultPartitionReleaseTest.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.blob.TestingBlobWriter;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import 
org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
+import 
org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.SchedulerBase;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createSchedulerAndDeploy;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.transitionTasksToFinished;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests that blocking result partitions are properly released. */
+class BlockingResultPartitionReleaseTest {
+
+    @RegisterExtension
+    static final TestExecutorExtension<ScheduledExecutorService> 
EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorExtension();
+
+    private ScheduledExecutorService scheduledExecutorService;
+    private ComponentMainThreadExecutor mainThreadExecutor;
+    private ManuallyTriggeredScheduledExecutorService ioExecutor;
+
+    @BeforeEach
+    void setup() {
+        scheduledExecutorService = 
Executors.newSingleThreadScheduledExecutor();
+        mainThreadExecutor =
+                
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
+                        scheduledExecutorService);
+        ioExecutor = new ManuallyTriggeredScheduledExecutorService();
+    }
+
+    @AfterEach
+    void teardown() {
+        if (scheduledExecutorService != null) {
+            scheduledExecutorService.shutdownNow();
+        }
+    }
+
+    @Test
+    void testMultipleConsumersForAdaptiveScheduler() throws Exception {

Review Comment:
   -> testMultipleConsumersForAdaptiveBatchScheduler



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java:
##########
@@ -528,4 +546,150 @@ public static void verifyGeneratedExecutionJobVertex(
             subtaskIndex++;
         }
     }
+
+    public static SchedulerBase createSchedulerAndDeploy(
+            boolean isAdaptive,
+            JobID jobId,
+            JobVertex producer,
+            JobVertex[] consumers,
+            DistributionPattern distributionPattern,
+            BlobWriter blobWriter,
+            ComponentMainThreadExecutor mainThreadExecutor,
+            ScheduledExecutorService ioExecutor,
+            JobMasterPartitionTracker partitionTracker,
+            ScheduledExecutorService scheduledExecutor)
+            throws Exception {
+        final List<JobVertex> vertices = new 
ArrayList<>(Collections.singletonList(producer));
+        IntermediateDataSetID dataSetId = new IntermediateDataSetID();
+        for (JobVertex consumer : consumers) {
+            consumer.connectNewDataSetAsInput(
+                    producer, distributionPattern, 
ResultPartitionType.BLOCKING, dataSetId, false);
+            vertices.add(consumer);
+        }
+
+        final SchedulerBase scheduler =
+                createScheduler(
+                        isAdaptive,
+                        jobId,
+                        vertices,
+                        blobWriter,
+                        mainThreadExecutor,
+                        ioExecutor,
+                        partitionTracker,
+                        scheduledExecutor);
+        final ExecutionGraph executionGraph = scheduler.getExecutionGraph();
+        if (isAdaptive) {
+            initializeExecutionJobVertex(producer.getID(), executionGraph, 
mainThreadExecutor);
+        }
+        final TestingLogicalSlotBuilder slotBuilder = new 
TestingLogicalSlotBuilder();
+
+        CompletableFuture.runAsync(
+                        () -> {
+                            try {
+                                // Deploy upstream source vertices
+                                deployTasks(executionGraph, producer.getID(), 
slotBuilder);
+                                // Transition upstream vertices into FINISHED
+                                transitionTasksToFinished(executionGraph, 
producer.getID());
+                                // Deploy downstream sink vertices
+                                for (JobVertex consumer : consumers) {
+                                    if (isAdaptive) {
+                                        initializeExecutionJobVertex(
+                                                consumer.getID(), 
executionGraph);
+                                    }
+                                    deployTasks(executionGraph, 
consumer.getID(), slotBuilder);
+                                }
+                            } catch (Exception e) {
+                                throw new RuntimeException("Exceptions 
shouldn't happen here.", e);
+                            }
+                        },
+                        mainThreadExecutor)
+                .join();
+        return scheduler;
+    }
+
+    private static void initializeExecutionJobVertex(
+            JobVertexID jobVertex,
+            ExecutionGraph executionGraph,
+            ComponentMainThreadExecutor mainThreadExecutor) {
+        CompletableFuture.runAsync(
+                        () -> initializeExecutionJobVertex(jobVertex, 
executionGraph),
+                        mainThreadExecutor)
+                .join();
+    }
+
+    private static void initializeExecutionJobVertex(
+            JobVertexID jobVertex, ExecutionGraph executionGraph) {
+        try {
+            executionGraph.initializeJobVertex(
+                    executionGraph.getJobVertex(jobVertex), 
System.currentTimeMillis());
+            executionGraph.notifyNewlyInitializedJobVertices(
+                    
Collections.singletonList(executionGraph.getJobVertex(jobVertex)));
+        } catch (JobException exception) {
+            throw new RuntimeException(exception);
+        }
+    }
+
+    private static DefaultScheduler createScheduler(
+            boolean isAdaptive,
+            JobID jobId,
+            List<JobVertex> jobVertices,
+            BlobWriter blobWriter,
+            ComponentMainThreadExecutor mainThreadExecutor,
+            ScheduledExecutorService ioExecutor,
+            JobMasterPartitionTracker partitionTracker,
+            ScheduledExecutorService scheduledExecutor)
+            throws Exception {
+        final JobGraph jobGraph =
+                JobGraphBuilder.newBatchJobGraphBuilder()
+                        .setJobId(jobId)
+                        .addJobVertices(jobVertices)
+                        .build();
+
+        final DefaultSchedulerBuilder builder =
+                new DefaultSchedulerBuilder(jobGraph, mainThreadExecutor, 
scheduledExecutor)
+                        .setRestartBackoffTimeStrategy(new 
TestRestartBackoffTimeStrategy(true, 0))
+                        .setBlobWriter(blobWriter)
+                        .setIoExecutor(ioExecutor)
+                        .setPartitionTracker(partitionTracker);
+        return isAdaptive ? builder.buildAdaptiveBatchJobScheduler() : 
builder.build();
+    }
+
+    private static void deployTasks(
+            ExecutionGraph executionGraph,
+            JobVertexID jobVertexID,
+            TestingLogicalSlotBuilder slotBuilder)
+            throws JobException, ExecutionException, InterruptedException {
+
+        for (ExecutionVertex vertex :
+                
Objects.requireNonNull(executionGraph.getJobVertex(jobVertexID))
+                        .getTaskVertices()) {
+            LogicalSlot slot = slotBuilder.createTestingLogicalSlot();
+
+            Execution execution = vertex.getCurrentExecutionAttempt();
+            
execution.registerProducedPartitions(slot.getTaskManagerLocation()).get();
+            execution.transitionState(ExecutionState.SCHEDULED);
+
+            vertex.tryAssignResource(slot);
+            vertex.deploy();
+        }
+    }
+
+    public static void transitionTasksToFinished(

Review Comment:
   It's better to move these 2 methods to be around 
`ExecutionGraphTestUtils#finishAllVertices()` and rename them and implement 
them (if possible) in a similar way.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java:
##########
@@ -528,4 +546,150 @@ public static void verifyGeneratedExecutionJobVertex(
             subtaskIndex++;
         }
     }
+
+    public static SchedulerBase createSchedulerAndDeploy(
+            boolean isAdaptive,
+            JobID jobId,
+            JobVertex producer,
+            JobVertex[] consumers,
+            DistributionPattern distributionPattern,
+            BlobWriter blobWriter,
+            ComponentMainThreadExecutor mainThreadExecutor,
+            ScheduledExecutorService ioExecutor,
+            JobMasterPartitionTracker partitionTracker,
+            ScheduledExecutorService scheduledExecutor)
+            throws Exception {
+        final List<JobVertex> vertices = new 
ArrayList<>(Collections.singletonList(producer));
+        IntermediateDataSetID dataSetId = new IntermediateDataSetID();
+        for (JobVertex consumer : consumers) {
+            consumer.connectNewDataSetAsInput(
+                    producer, distributionPattern, 
ResultPartitionType.BLOCKING, dataSetId, false);
+            vertices.add(consumer);
+        }
+
+        final SchedulerBase scheduler =
+                createScheduler(
+                        isAdaptive,
+                        jobId,
+                        vertices,
+                        blobWriter,
+                        mainThreadExecutor,
+                        ioExecutor,
+                        partitionTracker,
+                        scheduledExecutor);
+        final ExecutionGraph executionGraph = scheduler.getExecutionGraph();
+        if (isAdaptive) {
+            initializeExecutionJobVertex(producer.getID(), executionGraph, 
mainThreadExecutor);

Review Comment:
   Can we move it into the `runAsync` below? Then we can remove the 
`initializeExecutionJobVertex(
               JobVertexID jobVertex,
               ExecutionGraph executionGraph,
               ComponentMainThreadExecutor mainThreadExecutor)`.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RemoveCachedShuffleDescriptorTest.java:
##########
@@ -294,42 +324,7 @@ private void testRemoveCacheForPointwiseEdgeAfterFailover(
         assertEquals(expectedAfter, blobWriter.numberOfBlobs());
     }
 
-    private DefaultScheduler createSchedulerAndDeploy(

Review Comment:
   It's better to keep this method for the convenience of tests in this test 
class. It can just invoke the `createSchedulerAndDeploy` in the util class 
though.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java:
##########
@@ -47,6 +49,15 @@ public class IntermediateResultPartition {
     /** Whether this partition has produced some data. */
     private boolean hasDataProduced = false;
 
+    /**
+     * Released {@link ConsumedPartitionGroup}s for this result partition. 
This result partition can
+     * be released if all {@link ConsumedPartitionGroup}s are released.
+     */
+    private final Set<ConsumedPartitionGroup> releasedPartitionGroups = new 
HashSet<>();
+
+    /** All consumer job vertex ids of the corresponding {@link 
IntermediateResult}. */
+    private final List<JobVertexID> consumerVertices;

Review Comment:
   Or maybe we do not need this method because it is used only once, in 
`canBeReleased()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to