zhuzhurk commented on code in PR #25887:
URL: https://github.com/apache/flink/pull/25887#discussion_r1901541146
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumedPartitionGroup.java:
##########
@@ -40,7 +40,7 @@
*/
public class ConsumedPartitionGroup implements
Iterable<IntermediateResultPartitionID> {
- private final List<IntermediateResultPartitionID> resultPartitions;
+ private final Map<IntermediateResultPartitionID, Integer>
resultPartitionsInOrdered;
Review Comment:
-> partitionIdToIndexInOrder
Comments are also needed to explain it, like what does the index stand for.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java:
##########
@@ -314,8 +314,13 @@ private static ConsumedPartitionGroup
createConsumedPartitionGroup(
final int numConsumers,
final List<IntermediateResultPartitionID> consumedPartitions,
final ResultPartitionType resultPartitionType) {
+ final Map<IntermediateResultPartitionID, Integer>
partitionIdToIndexInOrdered =
Review Comment:
partitionIdToIndexInOrdered -> partitionIdToIndexInOrder
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumedPartitionGroup.java:
##########
@@ -55,29 +55,30 @@ public class ConsumedPartitionGroup implements
Iterable<IntermediateResultPartit
private ConsumedPartitionGroup(
int numConsumers,
- List<IntermediateResultPartitionID> resultPartitions,
Review Comment:
From the perspective of code readability and maintenance, it's better to
construct the map here from the list.
It can keep the map in control and better reflects the meaning of it.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ConsumedSubpartitionContext.java:
##########
@@ -113,23 +111,23 @@ public IndexRange getConsumedSubpartitionRange(int
shuffleDescriptorIndex) {
*
* @param consumedSubpartitionGroups a mapping of consumed partition index
ranges to
* subpartition ranges.
- * @param consumedResultPartitions an iterator of {@link
IntermediateResultPartitionID} for the
- * consumed result partitions.
- * @param partitions all partition ids of consumed {@link
IntermediateResult}.
+ * @param partitionIdToShuffleDescriptorIndexMap a map that associates
each {@link
+ * IntermediateResultPartitionID} with its corresponding shuffle
descriptor index.
+ * @param partitionIdRetriever a function that retrieves the {@link
+ * IntermediateResultPartitionID} for a given index.
* @return a {@link ConsumedSubpartitionContext} instance constructed from
the input parameters.
*/
public static ConsumedSubpartitionContext buildConsumedSubpartitionContext(
Map<IndexRange, IndexRange> consumedSubpartitionGroups,
- Iterator<IntermediateResultPartitionID> consumedResultPartitions,
- IntermediateResultPartitionID[] partitions) {
- Map<IntermediateResultPartitionID, Integer>
partitionIdToShuffleDescriptorIndexMap =
- new HashMap<>();
- while (consumedResultPartitions.hasNext()) {
- IntermediateResultPartitionID partitionId =
consumedResultPartitions.next();
- partitionIdToShuffleDescriptorIndexMap.put(
- partitionId,
partitionIdToShuffleDescriptorIndexMap.size());
+ Map<IntermediateResultPartitionID, Integer>
partitionIdToShuffleDescriptorIndexMap,
+ Function<Integer, IntermediateResultPartitionID>
partitionIdRetriever) {
+ if (consumedSubpartitionGroups.size() == 1
Review Comment:
It's better to add some comments to explain this case.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ConsumedSubpartitionContext.java:
##########
@@ -113,23 +111,23 @@ public IndexRange getConsumedSubpartitionRange(int
shuffleDescriptorIndex) {
*
* @param consumedSubpartitionGroups a mapping of consumed partition index
ranges to
* subpartition ranges.
- * @param consumedResultPartitions an iterator of {@link
IntermediateResultPartitionID} for the
- * consumed result partitions.
- * @param partitions all partition ids of consumed {@link
IntermediateResult}.
+ * @param partitionIdToShuffleDescriptorIndexMap a map that associates
each {@link
+ * IntermediateResultPartitionID} with its corresponding shuffle
descriptor index.
+ * @param partitionIdRetriever a function that retrieves the {@link
+ * IntermediateResultPartitionID} for a given index.
* @return a {@link ConsumedSubpartitionContext} instance constructed from
the input parameters.
*/
public static ConsumedSubpartitionContext buildConsumedSubpartitionContext(
Map<IndexRange, IndexRange> consumedSubpartitionGroups,
- Iterator<IntermediateResultPartitionID> consumedResultPartitions,
- IntermediateResultPartitionID[] partitions) {
- Map<IntermediateResultPartitionID, Integer>
partitionIdToShuffleDescriptorIndexMap =
- new HashMap<>();
- while (consumedResultPartitions.hasNext()) {
- IntermediateResultPartitionID partitionId =
consumedResultPartitions.next();
- partitionIdToShuffleDescriptorIndexMap.put(
- partitionId,
partitionIdToShuffleDescriptorIndexMap.size());
+ Map<IntermediateResultPartitionID, Integer>
partitionIdToShuffleDescriptorIndexMap,
Review Comment:
How to about to just pass in a `ConsumedPartitionGroup`?
It can better reflects the relationship of the context with the consumption
group.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]