xiangqiao123 commented on a change in pull request #18050:
URL: https://github.com/apache/flink/pull/18050#discussion_r770484017
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java
##########
@@ -140,6 +150,127 @@ public void testBlockingPartitionResetting() throws
Exception {
assertFalse(consumedPartitionGroup.areAllPartitionsFinished());
}
+ @Test
+ public void testGetNumberOfSubpartitionsForNonDynamicAllToAllGraph()
throws Exception {
+ testGetNumberOfSubpartitions(7, DistributionPattern.ALL_TO_ALL, false,
Arrays.asList(7, 7));
+ }
+
+ @Test
+ public void testGetNumberOfSubpartitionsForNonDynamicPointWiseGraph()
throws Exception {
+ testGetNumberOfSubpartitions(7, DistributionPattern.POINTWISE, false,
Arrays.asList(4, 3));
+ }
+
+ @Test
+ public void
testGetNumberOfSubpartitionsFromConsumerParallelismForDynamicAllToAllGraph()
+ throws Exception {
+ testGetNumberOfSubpartitions(7, DistributionPattern.ALL_TO_ALL, true,
Arrays.asList(7, 7));
+ }
+
+ @Test
+ public void
testGetNumberOfSubpartitionsFromConsumerParallelismForDynamicPointWiseGraph()
+ throws Exception {
+ testGetNumberOfSubpartitions(7, DistributionPattern.POINTWISE, true,
Arrays.asList(4, 4));
Review comment:
Will this scenario exist in real job?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/InternalExecutionGraphAccessor.java
##########
@@ -109,4 +110,8 @@ void notifySchedulerNgAboutInternalTaskFailure(
void deleteBlobs(List<PermanentBlobKey> blobKeys);
void initializeJobVertex(ExecutionJobVertex ejv, long createTimestamp)
throws JobException;
+
+ ExecutionJobVertex getJobVertex(JobVertexID id);
Review comment:
Is it better to use `getExecutionJobVertex` for the method name?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
##########
@@ -77,6 +85,69 @@ public ResultPartitionType getResultType() {
return getEdgeManager().getConsumedPartitionGroupsById(partitionId);
}
+ public int getNumberOfSubpartitions() {
+ if (numberOfSubpartitions == UNKNOWN) {
+ numberOfSubpartitions = getOrComputeNumberOfSubpartitions();
+ }
+ checkState(
+ numberOfSubpartitions > 0,
+ "Number of subpartitions is an unexpected value: " +
numberOfSubpartitions);
+
+ return numberOfSubpartitions;
+ }
+
+ private int getOrComputeNumberOfSubpartitions() {
+ if (!getProducer().getExecutionGraphAccessor().isDynamic()) {
+ // The produced data is partitioned among a number of
subpartitions.
+ //
+ // If no consumers are known at this point, we use a single
subpartition, otherwise we
+ // have one for each consuming sub task.
+ int numberOfSubpartitions = 1;
+ List<ConsumerVertexGroup> consumerVertexGroups =
getConsumerVertexGroups();
+ if (!consumerVertexGroups.isEmpty() &&
!consumerVertexGroups.get(0).isEmpty()) {
+ if (consumerVertexGroups.size() > 1) {
+ throw new IllegalStateException(
+ "Currently, only a single consumer group per
partition is supported.");
+ }
+ numberOfSubpartitions = consumerVertexGroups.get(0).size();
+ }
+
+ return numberOfSubpartitions;
+ } else {
+ if (totalResult.isBroadcast()) {
+ // for dynamic graph and broadcast result, we only produced
one subpartition,
+ // and all the downstream vertices should consume this
subpartition.
+ return 1;
+ } else {
+ return computeNumberOfMaxPossiblePartitionConsumers();
+ }
+ }
+ }
+
+ private int computeNumberOfMaxPossiblePartitionConsumers() {
+ final ExecutionJobVertex consumerJobVertex =
+ getIntermediateResult().getConsumerExecutionJobVertex();
+ final DistributionPattern distributionPattern =
+ getIntermediateResult().getConsumingDistributionPattern();
+
+ // decide the max possible consumer job vertex parallelism
+ int maxConsumerJobVertexParallelism =
consumerJobVertex.getParallelism();
Review comment:
It doesn't seem necessary,and may may cause `return (int)
Math.ceil(((double) maxConsumerJobVertexParallelism) / numberOfPartitions);`
is 0
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java
##########
@@ -140,6 +150,127 @@ public void testBlockingPartitionResetting() throws
Exception {
assertFalse(consumedPartitionGroup.areAllPartitionsFinished());
}
+ @Test
+ public void testGetNumberOfSubpartitionsForNonDynamicAllToAllGraph()
throws Exception {
+ testGetNumberOfSubpartitions(7, DistributionPattern.ALL_TO_ALL, false,
Arrays.asList(7, 7));
+ }
+
+ @Test
+ public void testGetNumberOfSubpartitionsForNonDynamicPointWiseGraph()
throws Exception {
+ testGetNumberOfSubpartitions(7, DistributionPattern.POINTWISE, false,
Arrays.asList(4, 3));
+ }
+
+ @Test
+ public void
testGetNumberOfSubpartitionsFromConsumerParallelismForDynamicAllToAllGraph()
+ throws Exception {
+ testGetNumberOfSubpartitions(7, DistributionPattern.ALL_TO_ALL, true,
Arrays.asList(7, 7));
Review comment:
Will this scenario exist in real job?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]