zhuzhurk commented on code in PR #25551:
URL: https://github.com/apache/flink/pull/25551#discussion_r1895666240


##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ConsumedSubpartitionContext.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.deployment;
+
+import org.apache.flink.runtime.executiongraph.IndexRange;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/*
+ * Helper class used to track and manage the relationships between shuffle 
descriptors and their
+ * associated subpartitions.
+ */
+class ConsumedSubpartitionContext implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    /** The number of consumed shuffle descriptors. */
+    private final int numConsumedShuffleDescriptors;
+
+    /**
+     * A mapping between ranges of consumed shuffle descriptors and their 
corresponding subpartition
+     * ranges.
+     */
+    private final Map<IndexRange, IndexRange> 
consumedShuffleDescriptorToSubpartitionRangeMap;
+
+    private ConsumedSubpartitionContext(
+            int numConsumedShuffleDescriptors,
+            Map<IndexRange, IndexRange> 
consumedShuffleDescriptorToSubpartitionRangeMap) {
+        this.numConsumedShuffleDescriptors = numConsumedShuffleDescriptors;
+        this.consumedShuffleDescriptorToSubpartitionRangeMap =
+                checkNotNull(consumedShuffleDescriptorToSubpartitionRangeMap);
+    }
+
+    public int getNumConsumedShuffleDescriptors() {
+        return numConsumedShuffleDescriptors;
+    }
+
+    public Collection<IndexRange> getConsumedShuffleDescriptorRanges() {
+        return Collections.unmodifiableCollection(
+                consumedShuffleDescriptorToSubpartitionRangeMap.keySet());
+    }
+
+    public IndexRange getConsumedSubpartitionRange(Integer 
shuffleDescriptorIndex) {

Review Comment:
   Integer -> int



##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java:
##########
@@ -398,6 +403,35 @@ private static boolean 
isProducerFailedOrCanceled(ExecutionState producerState)
                 || producerState == ExecutionState.FAILED;
     }
 
+    private Map<IndexRange, IndexRange> 
constructConsumedSubpartitionGroupByChannelRange(

Review Comment:
   It's better to add some comments to describe what this method does, and why 
we need to do this conversion.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ConsumedSubpartitionContext.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.deployment;
+
+import org.apache.flink.runtime.executiongraph.IndexRange;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/*
+ * Helper class used to track and manage the relationships between shuffle 
descriptors and their
+ * associated subpartitions.
+ */
+class ConsumedSubpartitionContext implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    /** The number of consumed shuffle descriptors. */
+    private final int numConsumedShuffleDescriptors;
+
+    /**
+     * A mapping between ranges of consumed shuffle descriptors and their 
corresponding subpartition
+     * ranges.
+     */
+    private final Map<IndexRange, IndexRange> 
consumedShuffleDescriptorToSubpartitionRangeMap;

Review Comment:
   It's better to give an concrete example to explain this mapping. 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ConsumedSubpartitionContext.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.deployment;
+
+import org.apache.flink.runtime.executiongraph.IndexRange;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/*
+ * Helper class used to track and manage the relationships between shuffle 
descriptors and their
+ * associated subpartitions.
+ */
+class ConsumedSubpartitionContext implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    /** The number of consumed shuffle descriptors. */
+    private final int numConsumedShuffleDescriptors;
+
+    /**
+     * A mapping between ranges of consumed shuffle descriptors and their 
corresponding subpartition
+     * ranges.
+     */
+    private final Map<IndexRange, IndexRange> 
consumedShuffleDescriptorToSubpartitionRangeMap;

Review Comment:
   Or maybe two, one for all-to-all case and one for point-wise case. Because 
the index mapping can be different.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ConsumedSubpartitionContext.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.deployment;
+
+import org.apache.flink.runtime.executiongraph.IndexRange;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/*
+ * Helper class used to track and manage the relationships between shuffle 
descriptors and their
+ * associated subpartitions.
+ */
+class ConsumedSubpartitionContext implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    /** The number of consumed shuffle descriptors. */
+    private final int numConsumedShuffleDescriptors;
+
+    /**
+     * A mapping between ranges of consumed shuffle descriptors and their 
corresponding subpartition
+     * ranges.
+     */
+    private final Map<IndexRange, IndexRange> 
consumedShuffleDescriptorToSubpartitionRangeMap;
+
+    private ConsumedSubpartitionContext(
+            int numConsumedShuffleDescriptors,
+            Map<IndexRange, IndexRange> 
consumedShuffleDescriptorToSubpartitionRangeMap) {
+        this.numConsumedShuffleDescriptors = numConsumedShuffleDescriptors;
+        this.consumedShuffleDescriptorToSubpartitionRangeMap =
+                checkNotNull(consumedShuffleDescriptorToSubpartitionRangeMap);
+    }
+
+    public int getNumConsumedShuffleDescriptors() {
+        return numConsumedShuffleDescriptors;
+    }
+
+    public Collection<IndexRange> getConsumedShuffleDescriptorRanges() {
+        return Collections.unmodifiableCollection(
+                consumedShuffleDescriptorToSubpartitionRangeMap.keySet());
+    }
+
+    public IndexRange getConsumedSubpartitionRange(Integer 
shuffleDescriptorIndex) {
+        for (Map.Entry<IndexRange, IndexRange> entry :
+                consumedShuffleDescriptorToSubpartitionRangeMap.entrySet()) {
+            IndexRange shuffleDescriptorRange = entry.getKey();
+            if (shuffleDescriptorIndex >= 
shuffleDescriptorRange.getStartIndex()
+                    && shuffleDescriptorIndex <= 
shuffleDescriptorRange.getEndIndex()) {
+                return entry.getValue();
+            }
+        }
+        throw new IllegalArgumentException(
+                "Cannot find consumed subpartition range for shuffle 
descriptor index "
+                        + shuffleDescriptorIndex);
+    }
+
+    /**
+     * Builds a {@link ConsumedSubpartitionContext} based on the provided 
inputs.
+     *
+     * <p>Note: The construction is based on subscribing to consecutive 
subpartitions of the same
+     * partition. If this assumption is violated, the calculation of the 
number of consumed
+     * ShuffleDescriptors will be inaccurate.

Review Comment:
   > will be inaccurate
   
   Can we throw exceptions if this assumption is violated? It's better to 
expose problems instead of produces wrong results silently.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java:
##########
@@ -121,9 +105,12 @@ private static void connectPointwise(
         for (ExecutionVertexInputInfo executionVertexInputInfo :
                 jobVertexInputInfo.getExecutionVertexInputInfos()) {
             int consumerIndex = executionVertexInputInfo.getSubtaskIndex();
-            IndexRange range = 
executionVertexInputInfo.getPartitionIndexRange();
+            List<IndexRange> range =

Review Comment:
   -> ranges



##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java:
##########
@@ -130,19 +144,31 @@ public ResultPartitionType getConsumedPartitionType() {
         return consumedPartitionType;
     }
 
-    @Nonnegative
-    public int getConsumedSubpartitionIndex() {
-        checkState(
-                consumedSubpartitionIndexRange.getStartIndex()
-                        == consumedSubpartitionIndexRange.getEndIndex());
-        return consumedSubpartitionIndexRange.getStartIndex();
+    public int getNumConsumedShuffleDescriptors() {
+        return consumedSubpartitionContext.getNumConsumedShuffleDescriptors();
+    }
+
+    public Collection<IndexRange> getConsumedShuffleDescriptorRanges() {
+        return 
consumedSubpartitionContext.getConsumedShuffleDescriptorRanges();
     }
 
-    /** Return the index range of the consumed subpartitions. */
-    public IndexRange getConsumedSubpartitionIndexRange() {
-        return consumedSubpartitionIndexRange;
+    public IndexRange getConsumedSubpartitionRange(Integer 
shuffleDescriptorIndex) {

Review Comment:
   Integer -> int



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to