zhuzhurk commented on a change in pull request #18050:
URL: https://github.com/apache/flink/pull/18050#discussion_r781808816
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
##########
@@ -128,6 +130,12 @@ public TaskDeploymentDescriptor createDeploymentDescriptor(
int numConsumers =
resultPartition.getConsumerVertexGroups().get(0).size();
+ // sanity check. Number of consumers should be equal to number of
subpartitions. This
+ // check can be removed iff it supports one consumer to consume
multiple subpartitions.
+ checkState(
Review comment:
Why adding this check? Looks to me it will be violated soon in the
upcoming patch.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
##########
@@ -79,6 +86,60 @@ public ConsumerVertexGroup getConsumerVertexGroup() {
return getEdgeManager().getConsumedPartitionGroupsById(partitionId);
}
+ public int getNumberOfSubpartitions() {
+ if (numberOfSubpartitions == UNKNOWN) {
+ numberOfSubpartitions = getOrComputeNumberOfSubpartitions();
Review comment:
The name `getOrComputeNumberOfSubpartitions` looks a bit confusing to me
because it looks like it may directly return `numberOfSubpartitions` or compute
it. However, it will never directly return `numberOfSubpartitions`.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java
##########
@@ -140,6 +150,126 @@ public void testBlockingPartitionResetting() throws
Exception {
assertFalse(consumedPartitionGroup.areAllPartitionsFinished());
}
+ @Test
+ public void testGetNumberOfSubpartitionsForNonDynamicAllToAllGraph()
throws Exception {
+ testGetNumberOfSubpartitions(7, DistributionPattern.ALL_TO_ALL, false,
Arrays.asList(7, 7));
+ }
+
+ @Test
+ public void testGetNumberOfSubpartitionsForNonDynamicPointWiseGraph()
throws Exception {
+ testGetNumberOfSubpartitions(7, DistributionPattern.POINTWISE, false,
Arrays.asList(4, 3));
+ }
+
+ @Test
+ public void
testGetNumberOfSubpartitionsFromConsumerParallelismForDynamicAllToAllGraph()
+ throws Exception {
+ testGetNumberOfSubpartitions(7, DistributionPattern.ALL_TO_ALL, true,
Arrays.asList(7, 7));
+ }
+
+ @Test
+ public void
testGetNumberOfSubpartitionsFromConsumerParallelismForDynamicPointWiseGraph()
+ throws Exception {
+ testGetNumberOfSubpartitions(7, DistributionPattern.POINTWISE, true,
Arrays.asList(4, 4));
+ }
+
+ @Test
+ public void
testGetNumberOfSubpartitionsFromConsumerMaxParallelismForDynamicAllToAllGraph()
+ throws Exception {
+ testGetNumberOfSubpartitions(
+ -1, DistributionPattern.ALL_TO_ALL, true, Arrays.asList(13,
13));
+ }
+
+ @Test
+ public void
testGetNumberOfSubpartitionsFromConsumerMaxParallelismForDynamicPointWiseGraph()
+ throws Exception {
+ testGetNumberOfSubpartitions(-1, DistributionPattern.POINTWISE, true,
Arrays.asList(7, 7));
+ }
+
+ private void testGetNumberOfSubpartitions(
+ int consumerParallelism,
+ DistributionPattern distributionPattern,
+ boolean isDynamicGraph,
+ Collection<Integer> expectedNumSubpartitions)
+ throws Exception {
+
+ final int producerParallelism = 2;
+ final int consumerMaxParallelism = 13;
+
+ final ExecutionGraph eg =
+ createExecutionGraph(
+ producerParallelism,
+ consumerParallelism,
+ consumerMaxParallelism,
+ distributionPattern,
+ isDynamicGraph);
+
+ final Iterator<ExecutionJobVertex> vertexIterator =
+ eg.getVerticesTopologically().iterator();
+ final ExecutionJobVertex producer = vertexIterator.next();
+
+ if (isDynamicGraph) {
+ ExecutionJobVertexTest.initializeVertex(producer);
+ }
+
+ final IntermediateResult result = producer.getProducedDataSets()[0];
+
+ assertThat(expectedNumSubpartitions.size(), is(producerParallelism));
+ assertThat(
+ Arrays.stream(result.getPartitions())
+
.map(IntermediateResultPartition::getNumberOfSubpartitions)
+ .collect(Collectors.toList()),
+ equalTo(expectedNumSubpartitions));
+ }
+
+ private static ExecutionGraph createExecutionGraph(
+ int producerParallelism,
+ int consumerParallelism,
+ int consumerMaxParallelism,
+ DistributionPattern distributionPattern,
+ boolean isDynamicGraph)
+ throws Exception {
+
+ final JobVertex v1 = new JobVertex("v1");
+ v1.setInvokableClass(NoOpInvokable.class);
+ v1.setParallelism(producerParallelism);
+
+ final JobVertex v2 = new JobVertex("v2");
+ v2.setInvokableClass(NoOpInvokable.class);
+ if (consumerParallelism > 0) {
+ v2.setParallelism(consumerParallelism);
+ }
+ if (consumerMaxParallelism > 0) {
+ v2.setMaxParallelism(consumerMaxParallelism);
+ }
+
+ v2.connectNewDataSetAsInput(v1, distributionPattern,
ResultPartitionType.BLOCKING);
+
+ final JobGraph jobGraph =
+ JobGraphBuilder.newBatchJobGraphBuilder()
+ .addJobVertices(Arrays.asList(v1, v2))
+ .build();
+
+ final Configuration configuration = new Configuration();
+
+ return TestingDefaultExecutionGraphBuilder.newBuilder()
+ .setJobGraph(jobGraph)
+ .setJobMasterConfig(configuration)
+ .setVertexParallelismStore(
+ computeVertexParallelismStoreConsideringDynamicGraph(
+ jobGraph.getVertices(), isDynamicGraph,
consumerMaxParallelism))
+ .buildDynamicGraph();
Review comment:
I guess we should not `buildDynamicGraph` if `isDynamicGraph` is false?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
##########
@@ -79,6 +86,60 @@ public ConsumerVertexGroup getConsumerVertexGroup() {
return getEdgeManager().getConsumedPartitionGroupsById(partitionId);
}
+ public int getNumberOfSubpartitions() {
+ if (numberOfSubpartitions == UNKNOWN) {
+ numberOfSubpartitions = getOrComputeNumberOfSubpartitions();
+ }
+ checkState(
Review comment:
I think we can do the check only in the `set` case, i.e. only in the if
block
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java
##########
@@ -140,6 +150,126 @@ public void testBlockingPartitionResetting() throws
Exception {
assertFalse(consumedPartitionGroup.areAllPartitionsFinished());
}
+ @Test
+ public void testGetNumberOfSubpartitionsForNonDynamicAllToAllGraph()
throws Exception {
+ testGetNumberOfSubpartitions(7, DistributionPattern.ALL_TO_ALL, false,
Arrays.asList(7, 7));
+ }
+
+ @Test
+ public void testGetNumberOfSubpartitionsForNonDynamicPointWiseGraph()
throws Exception {
+ testGetNumberOfSubpartitions(7, DistributionPattern.POINTWISE, false,
Arrays.asList(4, 3));
+ }
+
+ @Test
+ public void
testGetNumberOfSubpartitionsFromConsumerParallelismForDynamicAllToAllGraph()
+ throws Exception {
+ testGetNumberOfSubpartitions(7, DistributionPattern.ALL_TO_ALL, true,
Arrays.asList(7, 7));
+ }
+
+ @Test
+ public void
testGetNumberOfSubpartitionsFromConsumerParallelismForDynamicPointWiseGraph()
+ throws Exception {
+ testGetNumberOfSubpartitions(7, DistributionPattern.POINTWISE, true,
Arrays.asList(4, 4));
+ }
+
+ @Test
+ public void
testGetNumberOfSubpartitionsFromConsumerMaxParallelismForDynamicAllToAllGraph()
+ throws Exception {
+ testGetNumberOfSubpartitions(
+ -1, DistributionPattern.ALL_TO_ALL, true, Arrays.asList(13,
13));
+ }
+
+ @Test
+ public void
testGetNumberOfSubpartitionsFromConsumerMaxParallelismForDynamicPointWiseGraph()
+ throws Exception {
+ testGetNumberOfSubpartitions(-1, DistributionPattern.POINTWISE, true,
Arrays.asList(7, 7));
+ }
+
+ private void testGetNumberOfSubpartitions(
+ int consumerParallelism,
+ DistributionPattern distributionPattern,
+ boolean isDynamicGraph,
+ Collection<Integer> expectedNumSubpartitions)
Review comment:
NIT: Collection -> List
Because we always expect it to be a List.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java
##########
@@ -140,6 +150,126 @@ public void testBlockingPartitionResetting() throws
Exception {
assertFalse(consumedPartitionGroup.areAllPartitionsFinished());
}
+ @Test
+ public void testGetNumberOfSubpartitionsForNonDynamicAllToAllGraph()
throws Exception {
+ testGetNumberOfSubpartitions(7, DistributionPattern.ALL_TO_ALL, false,
Arrays.asList(7, 7));
+ }
+
+ @Test
+ public void testGetNumberOfSubpartitionsForNonDynamicPointWiseGraph()
throws Exception {
Review comment:
NIT: PointWise -> Pointwise
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
##########
@@ -115,7 +118,18 @@ public ResultPartitionType getConsumedPartitionType() {
@Nonnegative
public int getConsumedSubpartitionIndex() {
- return consumedSubpartitionIndex;
+ checkState(
+ consumedSubpartitionIndexRange.startIndex
+ == consumedSubpartitionIndexRange.endIndex);
+ return consumedSubpartitionIndexRange.startIndex;
+ }
+
+ /**
+ * Return the index range of the the consumed subpartitions. This method
will be used by
Review comment:
> This method will be used by adaptive scheduler.
Let's remove this comment because it's not very accurate. b.t.w. adaptive
scheduler current mainly refers to the adaptive streaming scheduler.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
##########
@@ -814,6 +814,7 @@ private void connect(Integer headOfChain, StreamEdge edge) {
}
// set strategy name so that web interface can show it.
jobEdge.setShipStrategyName(partitioner.toString());
+ jobEdge.setBroadcast(partitioner.isBroadcast());
Review comment:
I think `setBroadcast` is also needed for dataset jobs, i.e. in
`JobGraphGenerator`.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java
##########
@@ -140,6 +150,126 @@ public void testBlockingPartitionResetting() throws
Exception {
assertFalse(consumedPartitionGroup.areAllPartitionsFinished());
}
+ @Test
+ public void testGetNumberOfSubpartitionsForNonDynamicAllToAllGraph()
throws Exception {
+ testGetNumberOfSubpartitions(7, DistributionPattern.ALL_TO_ALL, false,
Arrays.asList(7, 7));
+ }
+
+ @Test
+ public void testGetNumberOfSubpartitionsForNonDynamicPointWiseGraph()
throws Exception {
+ testGetNumberOfSubpartitions(7, DistributionPattern.POINTWISE, false,
Arrays.asList(4, 3));
+ }
+
+ @Test
+ public void
testGetNumberOfSubpartitionsFromConsumerParallelismForDynamicAllToAllGraph()
+ throws Exception {
+ testGetNumberOfSubpartitions(7, DistributionPattern.ALL_TO_ALL, true,
Arrays.asList(7, 7));
+ }
+
+ @Test
+ public void
testGetNumberOfSubpartitionsFromConsumerParallelismForDynamicPointWiseGraph()
+ throws Exception {
+ testGetNumberOfSubpartitions(7, DistributionPattern.POINTWISE, true,
Arrays.asList(4, 4));
+ }
+
+ @Test
+ public void
testGetNumberOfSubpartitionsFromConsumerMaxParallelismForDynamicAllToAllGraph()
+ throws Exception {
+ testGetNumberOfSubpartitions(
+ -1, DistributionPattern.ALL_TO_ALL, true, Arrays.asList(13,
13));
+ }
+
+ @Test
+ public void
testGetNumberOfSubpartitionsFromConsumerMaxParallelismForDynamicPointWiseGraph()
+ throws Exception {
+ testGetNumberOfSubpartitions(-1, DistributionPattern.POINTWISE, true,
Arrays.asList(7, 7));
+ }
+
+ private void testGetNumberOfSubpartitions(
+ int consumerParallelism,
+ DistributionPattern distributionPattern,
+ boolean isDynamicGraph,
+ Collection<Integer> expectedNumSubpartitions)
+ throws Exception {
+
+ final int producerParallelism = 2;
+ final int consumerMaxParallelism = 13;
+
+ final ExecutionGraph eg =
+ createExecutionGraph(
+ producerParallelism,
+ consumerParallelism,
+ consumerMaxParallelism,
+ distributionPattern,
+ isDynamicGraph);
+
+ final Iterator<ExecutionJobVertex> vertexIterator =
+ eg.getVerticesTopologically().iterator();
+ final ExecutionJobVertex producer = vertexIterator.next();
+
+ if (isDynamicGraph) {
+ ExecutionJobVertexTest.initializeVertex(producer);
+ }
+
+ final IntermediateResult result = producer.getProducedDataSets()[0];
+
+ assertThat(expectedNumSubpartitions.size(), is(producerParallelism));
+ assertThat(
+ Arrays.stream(result.getPartitions())
+
.map(IntermediateResultPartition::getNumberOfSubpartitions)
+ .collect(Collectors.toList()),
+ equalTo(expectedNumSubpartitions));
+ }
+
+ private static ExecutionGraph createExecutionGraph(
+ int producerParallelism,
+ int consumerParallelism,
+ int consumerMaxParallelism,
+ DistributionPattern distributionPattern,
+ boolean isDynamicGraph)
+ throws Exception {
+
+ final JobVertex v1 = new JobVertex("v1");
+ v1.setInvokableClass(NoOpInvokable.class);
+ v1.setParallelism(producerParallelism);
+
+ final JobVertex v2 = new JobVertex("v2");
+ v2.setInvokableClass(NoOpInvokable.class);
+ if (consumerParallelism > 0) {
+ v2.setParallelism(consumerParallelism);
+ }
+ if (consumerMaxParallelism > 0) {
+ v2.setMaxParallelism(consumerMaxParallelism);
+ }
+
+ v2.connectNewDataSetAsInput(v1, distributionPattern,
ResultPartitionType.BLOCKING);
+
+ final JobGraph jobGraph =
+ JobGraphBuilder.newBatchJobGraphBuilder()
+ .addJobVertices(Arrays.asList(v1, v2))
+ .build();
+
+ final Configuration configuration = new Configuration();
+
+ return TestingDefaultExecutionGraphBuilder.newBuilder()
+ .setJobGraph(jobGraph)
+ .setJobMasterConfig(configuration)
+ .setVertexParallelismStore(
+ computeVertexParallelismStoreConsideringDynamicGraph(
+ jobGraph.getVertices(), isDynamicGraph,
consumerMaxParallelism))
+ .buildDynamicGraph();
Review comment:
And one question comes to me that why the above tests can pass?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]