zhuzhurk commented on code in PR #25551: URL: https://github.com/apache/flink/pull/25551#discussion_r1895666240
########## flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ConsumedSubpartitionContext.java: ########## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.deployment; + +import org.apache.flink.runtime.executiongraph.IndexRange; +import org.apache.flink.runtime.executiongraph.IntermediateResult; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/* + * Helper class used to track and manage the relationships between shuffle descriptors and their + * associated subpartitions. + */ +class ConsumedSubpartitionContext implements Serializable { + private static final long serialVersionUID = 1L; + + /** The number of consumed shuffle descriptors. */ + private final int numConsumedShuffleDescriptors; + + /** + * A mapping between ranges of consumed shuffle descriptors and their corresponding subpartition + * ranges. + */ + private final Map<IndexRange, IndexRange> consumedShuffleDescriptorToSubpartitionRangeMap; + + private ConsumedSubpartitionContext( + int numConsumedShuffleDescriptors, + Map<IndexRange, IndexRange> consumedShuffleDescriptorToSubpartitionRangeMap) { + this.numConsumedShuffleDescriptors = numConsumedShuffleDescriptors; + this.consumedShuffleDescriptorToSubpartitionRangeMap = + checkNotNull(consumedShuffleDescriptorToSubpartitionRangeMap); + } + + public int getNumConsumedShuffleDescriptors() { + return numConsumedShuffleDescriptors; + } + + public Collection<IndexRange> getConsumedShuffleDescriptorRanges() { + return Collections.unmodifiableCollection( + consumedShuffleDescriptorToSubpartitionRangeMap.keySet()); + } + + public IndexRange getConsumedSubpartitionRange(Integer shuffleDescriptorIndex) { Review Comment: Integer -> int ########## flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java: ########## @@ -398,6 +403,35 @@ private static boolean isProducerFailedOrCanceled(ExecutionState producerState) || producerState == ExecutionState.FAILED; } + private Map<IndexRange, IndexRange> constructConsumedSubpartitionGroupByChannelRange( Review Comment: It's better to add some comments to describe what this method does, and why we need to do this conversion. ########## flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ConsumedSubpartitionContext.java: ########## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.deployment; + +import org.apache.flink.runtime.executiongraph.IndexRange; +import org.apache.flink.runtime.executiongraph.IntermediateResult; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/* + * Helper class used to track and manage the relationships between shuffle descriptors and their + * associated subpartitions. + */ +class ConsumedSubpartitionContext implements Serializable { + private static final long serialVersionUID = 1L; + + /** The number of consumed shuffle descriptors. */ + private final int numConsumedShuffleDescriptors; + + /** + * A mapping between ranges of consumed shuffle descriptors and their corresponding subpartition + * ranges. + */ + private final Map<IndexRange, IndexRange> consumedShuffleDescriptorToSubpartitionRangeMap; Review Comment: It's better to give an concrete example to explain this mapping. ########## flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ConsumedSubpartitionContext.java: ########## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.deployment; + +import org.apache.flink.runtime.executiongraph.IndexRange; +import org.apache.flink.runtime.executiongraph.IntermediateResult; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/* + * Helper class used to track and manage the relationships between shuffle descriptors and their + * associated subpartitions. + */ +class ConsumedSubpartitionContext implements Serializable { + private static final long serialVersionUID = 1L; + + /** The number of consumed shuffle descriptors. */ + private final int numConsumedShuffleDescriptors; + + /** + * A mapping between ranges of consumed shuffle descriptors and their corresponding subpartition + * ranges. + */ + private final Map<IndexRange, IndexRange> consumedShuffleDescriptorToSubpartitionRangeMap; Review Comment: Or maybe two, one for all-to-all case and one for point-wise case. Because the index mapping can be different. ########## flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ConsumedSubpartitionContext.java: ########## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.deployment; + +import org.apache.flink.runtime.executiongraph.IndexRange; +import org.apache.flink.runtime.executiongraph.IntermediateResult; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/* + * Helper class used to track and manage the relationships between shuffle descriptors and their + * associated subpartitions. + */ +class ConsumedSubpartitionContext implements Serializable { + private static final long serialVersionUID = 1L; + + /** The number of consumed shuffle descriptors. */ + private final int numConsumedShuffleDescriptors; + + /** + * A mapping between ranges of consumed shuffle descriptors and their corresponding subpartition + * ranges. + */ + private final Map<IndexRange, IndexRange> consumedShuffleDescriptorToSubpartitionRangeMap; + + private ConsumedSubpartitionContext( + int numConsumedShuffleDescriptors, + Map<IndexRange, IndexRange> consumedShuffleDescriptorToSubpartitionRangeMap) { + this.numConsumedShuffleDescriptors = numConsumedShuffleDescriptors; + this.consumedShuffleDescriptorToSubpartitionRangeMap = + checkNotNull(consumedShuffleDescriptorToSubpartitionRangeMap); + } + + public int getNumConsumedShuffleDescriptors() { + return numConsumedShuffleDescriptors; + } + + public Collection<IndexRange> getConsumedShuffleDescriptorRanges() { + return Collections.unmodifiableCollection( + consumedShuffleDescriptorToSubpartitionRangeMap.keySet()); + } + + public IndexRange getConsumedSubpartitionRange(Integer shuffleDescriptorIndex) { + for (Map.Entry<IndexRange, IndexRange> entry : + consumedShuffleDescriptorToSubpartitionRangeMap.entrySet()) { + IndexRange shuffleDescriptorRange = entry.getKey(); + if (shuffleDescriptorIndex >= shuffleDescriptorRange.getStartIndex() + && shuffleDescriptorIndex <= shuffleDescriptorRange.getEndIndex()) { + return entry.getValue(); + } + } + throw new IllegalArgumentException( + "Cannot find consumed subpartition range for shuffle descriptor index " + + shuffleDescriptorIndex); + } + + /** + * Builds a {@link ConsumedSubpartitionContext} based on the provided inputs. + * + * <p>Note: The construction is based on subscribing to consecutive subpartitions of the same + * partition. If this assumption is violated, the calculation of the number of consumed + * ShuffleDescriptors will be inaccurate. Review Comment: > will be inaccurate Can we throw exceptions if this assumption is violated? It's better to expose problems instead of produces wrong results silently. ########## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java: ########## @@ -121,9 +105,12 @@ private static void connectPointwise( for (ExecutionVertexInputInfo executionVertexInputInfo : jobVertexInputInfo.getExecutionVertexInputInfos()) { int consumerIndex = executionVertexInputInfo.getSubtaskIndex(); - IndexRange range = executionVertexInputInfo.getPartitionIndexRange(); + List<IndexRange> range = Review Comment: -> ranges ########## flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java: ########## @@ -130,19 +144,31 @@ public ResultPartitionType getConsumedPartitionType() { return consumedPartitionType; } - @Nonnegative - public int getConsumedSubpartitionIndex() { - checkState( - consumedSubpartitionIndexRange.getStartIndex() - == consumedSubpartitionIndexRange.getEndIndex()); - return consumedSubpartitionIndexRange.getStartIndex(); + public int getNumConsumedShuffleDescriptors() { + return consumedSubpartitionContext.getNumConsumedShuffleDescriptors(); + } + + public Collection<IndexRange> getConsumedShuffleDescriptorRanges() { + return consumedSubpartitionContext.getConsumedShuffleDescriptorRanges(); } - /** Return the index range of the consumed subpartitions. */ - public IndexRange getConsumedSubpartitionIndexRange() { - return consumedSubpartitionIndexRange; + public IndexRange getConsumedSubpartitionRange(Integer shuffleDescriptorIndex) { Review Comment: Integer -> int -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
