Thesharing commented on a change in pull request #15088:
URL: https://github.com/apache/flink/pull/15088#discussion_r595231410
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingResultPartition.java
##########
@@ -44,4 +46,14 @@
* @return result partition state
*/
ResultPartitionState getState();
+
+ /**
+ * Get the grouped {@link ExecutionVertexID}.
+ *
+ * @return List of grouped Execution Vertex IDs.
+ */
+ List<ConsumerVertexGroup> getConsumerVertexGroups();
+
+ /** Get {@link SchedulingExecutionVertex} by {@link ExecutionVertexID}. */
+ SchedulingExecutionVertex getVertexOrThrow(ExecutionVertexID id);
Review comment:
Removed.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingExecutionVertex.java
##########
@@ -37,4 +39,14 @@
* @return state of the execution vertex
*/
ExecutionState getState();
+
+ /**
+ * Get the grouped consumed result partitions.
+ *
+ * @return list of grouped intermediate result partition IDs
+ */
+ List<ConsumedPartitionGroup> getConsumerPartitionGroups();
+
+ /** Get {@link SchedulingResultPartition} by {@link
IntermediateResultPartitionID}. */
+ SchedulingResultPartition
getResultPartitionOrThrow(IntermediateResultPartitionID id);
Review comment:
Yes, I think we can remove this method. Resolved.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingExecutionVertex.java
##########
@@ -37,4 +39,14 @@
* @return state of the execution vertex
*/
ExecutionState getState();
+
+ /**
+ * Get the grouped consumed result partitions.
Review comment:
Resolved.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultResultPartition.java
##########
@@ -43,18 +47,38 @@
private DefaultExecutionVertex producer;
- private final List<DefaultExecutionVertex> consumers;
+ private final List<ConsumerVertexGroup> consumerVertexGroups;
+
+ private final Map<ExecutionVertexID, DefaultExecutionVertex>
executionVertexById;
Review comment:
Same as above.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertex.java
##########
@@ -18,35 +18,52 @@
package org.apache.flink.runtime.scheduler.adapter;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
-import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.function.Supplier;
+import static
org.apache.flink.runtime.scheduler.SchedulerUtils.createFlattenIterator;
import static org.apache.flink.util.Preconditions.checkNotNull;
/** Default implementation of {@link SchedulingExecutionVertex}. */
class DefaultExecutionVertex implements SchedulingExecutionVertex {
private final ExecutionVertexID executionVertexId;
- private final List<DefaultResultPartition> consumedResults;
-
private final List<DefaultResultPartition> producedResults;
private final Supplier<ExecutionState> stateSupplier;
+ private final List<ConsumedPartitionGroup> consumedPartitionGroups;
+
+ private final Map<IntermediateResultPartitionID, DefaultResultPartition>
resultPartitionsById;
Review comment:
I'm wondering that since we initialize
`Map<IntermediateResultPartitionID, DefaultResultPartition>` in
`DefaultExecutionTopology#computeExecutionGraphIndex`, would the map be garbage
collected if we only pass the `Function` as the parameter?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertexTest.java
##########
@@ -65,12 +68,20 @@ public void setUp() throws Exception {
Collections.singletonList(schedulingResultPartition),
stateSupplier);
schedulingResultPartition.setProducer(producerVertex);
+
+ List<ConsumedPartitionGroup> consumedPartitionIds =
Review comment:
Sorry for being careless about it. T-T Resolved.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingResultPartition.java
##########
@@ -44,4 +46,14 @@
* @return result partition state
*/
ResultPartitionState getState();
+
+ /**
+ * Get the grouped {@link ExecutionVertexID}.
Review comment:
Resolved.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java
##########
@@ -112,4 +117,33 @@ private static CheckpointIDCounter
createCheckpointIdCounter(
CheckpointRecoveryFactory recoveryFactory, JobID jobId) throws
Exception {
return recoveryFactory.createCheckpointIDCounter(jobId);
}
+
+ public static <ID, GROUP extends Iterable<ID>, RESULT> Iterator<RESULT>
createFlattenIterator(
Review comment:
Thank you for providing a better solution! I've also replaced all the
reference.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertexTest.java
##########
@@ -97,9 +108,8 @@ public void testGetProducedResultPartitions() {
@Test
public void testGetConsumedResultPartitions() {
IntermediateResultPartitionID partitionIds1 =
- IterableUtils.toStream(consumerVertex.getConsumedResults())
+
IterableUtils.toStream(consumerVertex.getConsumerPartitionGroups().get(0))
Review comment:
Totally agreed. Sorry that I forget to rewind this when I rewind the
merging of FLINK-21328 and FLINK-21330.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java
##########
@@ -66,16 +76,39 @@ public void setState(ExecutionState state) {
@Override
public Iterable<TestingSchedulingResultPartition> getConsumedResults() {
- return consumedPartitions;
+ return () ->
+ createFlattenIterator(
+ this::getConsumerPartitionGroups,
this::getResultPartitionOrThrow);
}
@Override
public Iterable<TestingSchedulingResultPartition> getProducedResults() {
return producedPartitions;
}
- void addConsumedPartition(TestingSchedulingResultPartition partition) {
- consumedPartitions.add(partition);
+ @Override
+ public List<ConsumedPartitionGroup> getConsumerPartitionGroups() {
+ return consumedPartitionGroups;
+ }
+
+ @Override
+ public TestingSchedulingResultPartition getResultPartitionOrThrow(
+ IntermediateResultPartitionID id) {
+ return resultPartitionsById.get(id);
+ }
+
+ void addConsumedPartition(TestingSchedulingResultPartition
consumedPartition) {
+ this.consumedPartitionGroups.add(
+
ConsumedPartitionGroup.fromSinglePartition(consumedPartition.getId()));
+ this.resultPartitionsById.putIfAbsent(consumedPartition.getId(),
consumedPartition);
+ }
+
+ void addConsumedPartitionGroup(
+ ConsumedPartitionGroup consumedResultIdGroup,
Review comment:
Sorry for being careless. Resolved.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java
##########
@@ -104,8 +139,20 @@ Builder withExecutionVertexID(JobVertexID jobVertexId, int
subtaskIndex) {
return this;
}
- public Builder
withConsumedPartitions(List<TestingSchedulingResultPartition> partitions) {
- this.partitions = partitions;
+ public Builder withConsumedPartitions(
+ List<ConsumedPartitionGroup> partitions,
Review comment:
Sorry for being careless. Resolved.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java
##########
@@ -104,8 +139,20 @@ Builder withExecutionVertexID(JobVertexID jobVertexId, int
subtaskIndex) {
return this;
}
- public Builder
withConsumedPartitions(List<TestingSchedulingResultPartition> partitions) {
- this.partitions = partitions;
+ public Builder withConsumedPartitions(
Review comment:
Agreed and resolved.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java
##########
@@ -95,7 +128,9 @@ public static TestingSchedulingExecutionVertex
withExecutionVertexID(
public static class Builder {
private JobVertexID jobVertexId = new JobVertexID();
private int subtaskIndex = 0;
- private List<TestingSchedulingResultPartition> partitions = new
ArrayList<>();
+ private final List<ConsumedPartitionGroup> partitions = new
ArrayList<>();
Review comment:
Agreed and resolved.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingResultPartition.java
##########
@@ -38,17 +41,24 @@
private TestingSchedulingExecutionVertex producer;
- private Collection<TestingSchedulingExecutionVertex> consumers;
+ private final List<ConsumerVertexGroup> consumerVertexGroups;
+
+ private final Map<ExecutionVertexID, TestingSchedulingExecutionVertex>
executionVerticesById;
private ResultPartitionState state;
TestingSchedulingResultPartition(
- IntermediateDataSetID dataSetID, ResultPartitionType type,
ResultPartitionState state) {
+ IntermediateDataSetID dataSetID,
+ int partitionNum,
Review comment:
I personally think it's better to create `IntermediateResultPartitionID`
with `IntermediateDataSetID` and `partitionNum`. Let's try to rewind it.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultSchedulingPipelinedRegionTest.java
##########
@@ -123,7 +123,7 @@ public void returnsIncidentBlockingPartitions() throws
Exception {
final DefaultExecutionVertex vertexB0 =
topology.getVertex(new ExecutionVertexID(b.getID(), 0));
final IntermediateResultPartitionID b0ConsumedResultPartition =
-
Iterables.getOnlyElement(vertexB0.getConsumedResults()).getId();
Review comment:
Same as above. Sorry that I forget to rewind this when I rewind the
merging of FLINK-21328 and FLINK-21330.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
##########
@@ -140,6 +143,20 @@ private void addSchedulingResultPartitions(
}
}
+ private void addSchedulingResultPartitions(
+ final Iterable<ConsumedPartitionGroup> resultPartitionGroups,
+ final Function<IntermediateResultPartitionID,
SchedulingResultPartition>
+ resultPartitionById) {
+ for (ConsumedPartitionGroup consumedPartitionGroup :
resultPartitionGroups) {
+ for (IntermediateResultPartitionID consumedPartitionId :
consumedPartitionGroup) {
+ schedulingResultPartitions.put(
+ consumedPartitionId,
+ (TestingSchedulingResultPartition)
Review comment:
We can remove this method since there is no need of it.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]