This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 022f7adeb864f32780f80d7113a0af6ddb3e10d5 Author: Weijie Guo <[email protected]> AuthorDate: Mon Oct 31 15:30:13 2022 +0800 [FLINK-29767] Let consumerVertexGroup also know the type of result partition. --- .../executiongraph/EdgeManagerBuildUtil.java | 12 ++++++++---- .../scheduler/strategy/ConsumerVertexGroup.java | 22 +++++++++++++++++----- .../adapter/DefaultResultPartitionTest.java | 4 +++- .../strategy/TestingSchedulingResultPartition.java | 7 +++++-- .../strategy/TestingSchedulingTopology.java | 8 +++++--- 5 files changed, 38 insertions(+), 15 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java index 8ac55b7b7a2..a0f6520843d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java @@ -99,7 +99,8 @@ public class EdgeManagerBuildUtil { .map(ExecutionVertex::getID) .collect(Collectors.toList()); ConsumerVertexGroup consumerVertexGroup = - ConsumerVertexGroup.fromMultipleVertices(consumerVertices); + ConsumerVertexGroup.fromMultipleVertices( + consumerVertices, intermediateResult.getResultType()); for (IntermediateResultPartition partition : intermediateResult.getPartitions()) { partition.addConsumers(consumerVertexGroup); } @@ -117,7 +118,8 @@ public class EdgeManagerBuildUtil { IntermediateResultPartition partition = intermediateResult.getPartitions()[i]; ConsumerVertexGroup consumerVertexGroup = - ConsumerVertexGroup.fromSingleVertex(executionVertex.getID()); + ConsumerVertexGroup.fromSingleVertex( + executionVertex.getID(), intermediateResult.getResultType()); partition.addConsumers(consumerVertexGroup); ConsumedPartitionGroup consumedPartitionGroup = @@ -132,7 +134,8 @@ public class EdgeManagerBuildUtil { ExecutionVertex executionVertex = taskVertices[index]; ConsumerVertexGroup consumerVertexGroup = - ConsumerVertexGroup.fromSingleVertex(executionVertex.getID()); + ConsumerVertexGroup.fromSingleVertex( + executionVertex.getID(), intermediateResult.getResultType()); int start = index * sourceCount / targetCount; int end = (index + 1) * sourceCount / targetCount; @@ -173,7 +176,8 @@ public class EdgeManagerBuildUtil { } ConsumerVertexGroup consumerVertexGroup = - ConsumerVertexGroup.fromMultipleVertices(consumers); + ConsumerVertexGroup.fromMultipleVertices( + consumers, intermediateResult.getResultType()); partition.addConsumers(consumerVertexGroup); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumerVertexGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumerVertexGroup.java index 10c0bdcc1db..fb8b3f1951c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumerVertexGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumerVertexGroup.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.scheduler.strategy; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; + import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -26,16 +28,26 @@ import java.util.List; public class ConsumerVertexGroup implements Iterable<ExecutionVertexID> { private final List<ExecutionVertexID> vertices; - private ConsumerVertexGroup(List<ExecutionVertexID> vertices) { + private final ResultPartitionType resultPartitionType; + + private ConsumerVertexGroup( + List<ExecutionVertexID> vertices, ResultPartitionType resultPartitionType) { this.vertices = vertices; + this.resultPartitionType = resultPartitionType; + } + + public static ConsumerVertexGroup fromMultipleVertices( + List<ExecutionVertexID> vertices, ResultPartitionType resultPartitionType) { + return new ConsumerVertexGroup(vertices, resultPartitionType); } - public static ConsumerVertexGroup fromMultipleVertices(List<ExecutionVertexID> vertices) { - return new ConsumerVertexGroup(vertices); + public static ConsumerVertexGroup fromSingleVertex( + ExecutionVertexID vertex, ResultPartitionType resultPartitionType) { + return new ConsumerVertexGroup(Collections.singletonList(vertex), resultPartitionType); } - public static ConsumerVertexGroup fromSingleVertex(ExecutionVertexID vertex) { - return new ConsumerVertexGroup(Collections.singletonList(vertex)); + public ResultPartitionType getResultPartitionType() { + return resultPartitionType; } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultResultPartitionTest.java index 9f8c58400e3..9a1f6e7e45d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultResultPartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultResultPartitionTest.java @@ -86,7 +86,9 @@ class DefaultResultPartitionTest { ExecutionVertexID executionVertexId = new ExecutionVertexID(new JobVertexID(), 0); consumerVertexGroups.put( resultPartition.getId(), - Collections.singletonList(ConsumerVertexGroup.fromSingleVertex(executionVertexId))); + Collections.singletonList( + ConsumerVertexGroup.fromSingleVertex( + executionVertexId, resultPartition.getResultType()))); assertThat(resultPartition.getConsumerVertexGroups()).isNotEmpty(); assertThat(resultPartition.getConsumerVertexGroups().get(0)).contains(executionVertexId); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingResultPartition.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingResultPartition.java index 6274eecb285..9e1fdc77ed6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingResultPartition.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingResultPartition.java @@ -102,14 +102,17 @@ public class TestingSchedulingResultPartition implements SchedulingResultPartiti return Collections.unmodifiableList(consumedPartitionGroups); } - void addConsumerGroup(Collection<TestingSchedulingExecutionVertex> consumerVertices) { + void addConsumerGroup( + Collection<TestingSchedulingExecutionVertex> consumerVertices, + ResultPartitionType resultPartitionType) { checkState(this.consumerVertexGroup == null); final ConsumerVertexGroup consumerVertexGroup = ConsumerVertexGroup.fromMultipleVertices( consumerVertices.stream() .map(TestingSchedulingExecutionVertex::getId) - .collect(Collectors.toList())); + .collect(Collectors.toList()), + resultPartitionType); this.consumerVertexGroup = consumerVertexGroup; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java index 95596369e33..c681d32dc4d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java @@ -194,7 +194,8 @@ public class TestingSchedulingTopology implements SchedulingTopology { .withResultPartitionType(resultPartitionType) .build(); - resultPartition.addConsumerGroup(Collections.singleton(consumer)); + resultPartition.addConsumerGroup( + Collections.singleton(consumer), resultPartition.getResultType()); resultPartition.setProducer(producer); producer.addProducedPartition(resultPartition); @@ -304,7 +305,8 @@ public class TestingSchedulingTopology implements SchedulingTopology { resultPartition.setProducer(producer); producer.addProducedPartition(resultPartition); consumer.addConsumedPartition(resultPartition); - resultPartition.addConsumerGroup(Collections.singleton(consumer)); + resultPartition.addConsumerGroup( + Collections.singleton(consumer), resultPartitionType); resultPartitions.add(resultPartition); } @@ -344,7 +346,7 @@ public class TestingSchedulingTopology implements SchedulingTopology { resultPartition.setProducer(producer); producer.addProducedPartition(resultPartition); - resultPartition.addConsumerGroup(consumers); + resultPartition.addConsumerGroup(consumers, resultPartitionType); resultPartitions.add(resultPartition); }
