wanglijie95 commented on a change in pull request #18050:
URL: https://github.com/apache/flink/pull/18050#discussion_r771163449
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
##########
@@ -77,6 +85,69 @@ public ResultPartitionType getResultType() {
return getEdgeManager().getConsumedPartitionGroupsById(partitionId);
}
+ public int getNumberOfSubpartitions() {
+ if (numberOfSubpartitions == UNKNOWN) {
+ numberOfSubpartitions = getOrComputeNumberOfSubpartitions();
+ }
+ checkState(
+ numberOfSubpartitions > 0,
+ "Number of subpartitions is an unexpected value: " +
numberOfSubpartitions);
+
+ return numberOfSubpartitions;
+ }
+
+ private int getOrComputeNumberOfSubpartitions() {
+ if (!getProducer().getExecutionGraphAccessor().isDynamic()) {
+ // The produced data is partitioned among a number of
subpartitions.
+ //
+ // If no consumers are known at this point, we use a single
subpartition, otherwise we
+ // have one for each consuming sub task.
+ int numberOfSubpartitions = 1;
+ List<ConsumerVertexGroup> consumerVertexGroups =
getConsumerVertexGroups();
+ if (!consumerVertexGroups.isEmpty() &&
!consumerVertexGroups.get(0).isEmpty()) {
+ if (consumerVertexGroups.size() > 1) {
+ throw new IllegalStateException(
+ "Currently, only a single consumer group per
partition is supported.");
+ }
+ numberOfSubpartitions = consumerVertexGroups.get(0).size();
+ }
+
+ return numberOfSubpartitions;
+ } else {
+ if (totalResult.isBroadcast()) {
+ // for dynamic graph and broadcast result, we only produced
one subpartition,
+ // and all the downstream vertices should consume this
subpartition.
+ return 1;
+ } else {
+ return computeNumberOfMaxPossiblePartitionConsumers();
+ }
+ }
+ }
+
+ private int computeNumberOfMaxPossiblePartitionConsumers() {
+ final ExecutionJobVertex consumerJobVertex =
+ getIntermediateResult().getConsumerExecutionJobVertex();
+ final DistributionPattern distributionPattern =
+ getIntermediateResult().getConsumingDistributionPattern();
+
+ // decide the max possible consumer job vertex parallelism
+ int maxConsumerJobVertexParallelism =
consumerJobVertex.getParallelism();
Review comment:
When the consumer's parallelism is specified( > 0), we can directly set
the subpartition number to the consumer's parallelism. Only when we don't know
the consumer's parallelism, we need to set it to the max parallelism.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]