xintongsong commented on code in PR #21122: URL: https://github.com/apache/flink/pull/21122#discussion_r1003186537
########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsConsumerId.java: ########## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid; + +import java.util.Objects; + +/** This class represents the identifier of hybrid shuffle's consumer. */ +public class HsConsumerId { + /** + * This consumer id is used in the scenarios that information related to specific consumer needs + * to be ignored. + */ + public static final HsConsumerId ANY_CONSUMER_ID = new HsConsumerId(-1); + + /** + * This consumer id is used in the scenarios that only one consumer is allowed for a single + * subpartition. + */ + public static final HsConsumerId SINGLE_CONSUMER_ID = new HsConsumerId(0); Review Comment: I'd suggest to name this `DEFAULT`. - It is guaranteed that if there's only one consumer, this id will be used. However, there's no guarantee for the other way. It is possible that this id is used when there're multiple consumers. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingInfoProvider.java: ########## @@ -79,4 +79,24 @@ enum ConsumeStatus { /** The buffer is either consumed or not consumed. */ ALL } + + /** This class represents a pair of {@link ConsumeStatus} and consumer id. */ + class ConsumeStatusWithId { + public static final ConsumeStatusWithId ALL_CONSUME_STATUS = Review Comment: I'd suggest the name `ALL_ALL` or `ANY_ANY`, following the pattern `ConsumeStatus_ConsumerID`. If more special values are needed in future, we can follow the same pattern. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsConsumerId.java: ########## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid; + +import java.util.Objects; + +/** This class represents the identifier of hybrid shuffle's consumer. */ +public class HsConsumerId { + /** + * This consumer id is used in the scenarios that information related to specific consumer needs + * to be ignored. + */ + public static final HsConsumerId ANY_CONSUMER_ID = new HsConsumerId(-1); + + /** + * This consumer id is used in the scenarios that only one consumer is allowed for a single + * subpartition. + */ + public static final HsConsumerId SINGLE_CONSUMER_ID = new HsConsumerId(0); + + /** This is a unique field for each consumer of a single subpartition. */ + private final int id; + + public HsConsumerId(int id) { + this.id = id; + } + + public int getId() { + return id; + } Review Comment: Should not expose the internal `id`. This is only for creating a new id from an existing one. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataConsumer.java: ########## @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.partition.ResultSubpartition; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.ThrowingRunnable; + +import javax.annotation.concurrent.GuardedBy; + +import java.util.Deque; +import java.util.LinkedList; +import java.util.Optional; +import java.util.concurrent.locks.Lock; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This class represents a unique consumer which consume a specific subpartition's data in memory. + */ +public class HsSubpartitionMemoryDataConsumer implements HsDataView { + + @GuardedBy("consumerLock") + private final Deque<HsBufferContext> unConsumedBuffers = new LinkedList<>(); + + private final Lock consumerLock; + + private final Lock resultPartitionLock; + + private final HsConsumerId consumerId; + + private final int subpartitionId; + + private final HsMemoryDataManagerOperation memoryDataManagerOperation; + + public HsSubpartitionMemoryDataConsumer( + Lock resultPartitionLock, + Lock consumerLock, + int subpartitionId, + HsConsumerId consumerId, + HsMemoryDataManagerOperation memoryDataManagerOperation) { + this.resultPartitionLock = resultPartitionLock; + this.consumerLock = consumerLock; + this.subpartitionId = subpartitionId; + this.consumerId = consumerId; + this.memoryDataManagerOperation = memoryDataManagerOperation; + } + + @GuardedBy("consumerLock") + // this method must be called with consumerLock.writeLock. Review Comment: This comment doesn't seem correct. How does a `Lock` have `writeLock`? ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataConsumer.java: ########## @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.partition.ResultSubpartition; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.ThrowingRunnable; + +import javax.annotation.concurrent.GuardedBy; + +import java.util.Deque; +import java.util.LinkedList; +import java.util.Optional; +import java.util.concurrent.locks.Lock; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This class represents a unique consumer which consume a specific subpartition's data in memory. + */ +public class HsSubpartitionMemoryDataConsumer implements HsDataView { + + @GuardedBy("consumerLock") + private final Deque<HsBufferContext> unConsumedBuffers = new LinkedList<>(); + + private final Lock consumerLock; + + private final Lock resultPartitionLock; + + private final HsConsumerId consumerId; + + private final int subpartitionId; + + private final HsMemoryDataManagerOperation memoryDataManagerOperation; + + public HsSubpartitionMemoryDataConsumer( + Lock resultPartitionLock, + Lock consumerLock, + int subpartitionId, + HsConsumerId consumerId, + HsMemoryDataManagerOperation memoryDataManagerOperation) { + this.resultPartitionLock = resultPartitionLock; + this.consumerLock = consumerLock; + this.subpartitionId = subpartitionId; + this.consumerId = consumerId; + this.memoryDataManagerOperation = memoryDataManagerOperation; + } + + @GuardedBy("consumerLock") + // this method must be called with consumerLock.writeLock. + public void addInitialBuffers(Deque<HsBufferContext> buffers) { + unConsumedBuffers.addAll(buffers); + } + + // this method only called from subpartitionMemoryDataManager with write lock. + @SuppressWarnings("FieldAccessNotGuarded") + public boolean addBuffer(HsBufferContext bufferContext) { + unConsumedBuffers.add(bufferContext); + trimHeadingReleasedBuffers(); + return unConsumedBuffers.size() <= 1; + } + + /** + * Check whether the head of {@link #unConsumedBuffers} is the buffer to be consumed. If so, + * return the buffer and backlog. + * + * @param toConsumeIndex index of buffer to be consumed. + * @return If the head of {@link #unConsumedBuffers} is target, return optional of the buffer + * and backlog. Otherwise, return {@link Optional#empty()}. + */ + @SuppressWarnings("FieldAccessNotGuarded") + // Note that: callWithLock ensure that code block guarded by resultPartitionReadLock and + // subpartitionLock. + @Override + public Optional<ResultSubpartition.BufferAndBacklog> consumeBuffer(int toConsumeIndex) { + Optional<Tuple2<HsBufferContext, Buffer.DataType>> bufferAndNextDataType = + callWithLock( + () -> { + if (!checkFirstUnConsumedBufferIndex(toConsumeIndex)) { + return Optional.empty(); + } + + HsBufferContext bufferContext = + checkNotNull(unConsumedBuffers.pollFirst()); + bufferContext.consumed(consumerId); + Buffer.DataType nextDataType = + peekNextToConsumeDataTypeInternal(toConsumeIndex + 1); + return Optional.of(Tuple2.of(bufferContext, nextDataType)); + }); + + bufferAndNextDataType.ifPresent( + tuple -> + memoryDataManagerOperation.onBufferConsumed( + tuple.f0.getBufferIndexAndChannel())); + return bufferAndNextDataType.map( + tuple -> + new ResultSubpartition.BufferAndBacklog( + tuple.f0.getBuffer().readOnlySlice(), + getBacklog(), + tuple.f1, + toConsumeIndex)); + } + + /** + * Check whether the head of {@link #unConsumedBuffers} is the buffer to be consumed next time. + * If so, return the next buffer's data type. + * + * @param nextToConsumeIndex index of the buffer to be consumed next time. + * @return If the head of {@link #unConsumedBuffers} is target, return the buffer's data type. + * Otherwise, return {@link Buffer.DataType#NONE}. + */ + @SuppressWarnings("FieldAccessNotGuarded") + // Note that: callWithLock ensure that code block guarded by resultPartitionReadLock and + // consumerLock. + @Override + public Buffer.DataType peekNextToConsumeDataType(int nextToConsumeIndex) { + return callWithLock(() -> peekNextToConsumeDataTypeInternal(nextToConsumeIndex)); + } + + @GuardedBy("consumerLock") + private Buffer.DataType peekNextToConsumeDataTypeInternal(int nextToConsumeIndex) { + return checkFirstUnConsumedBufferIndex(nextToConsumeIndex) + ? checkNotNull(unConsumedBuffers.peekFirst()).getBuffer().getDataType() + : Buffer.DataType.NONE; + } + + @GuardedBy("consumerLock") + private boolean checkFirstUnConsumedBufferIndex(int expectedBufferIndex) { + trimHeadingReleasedBuffers(); + return !unConsumedBuffers.isEmpty() + && unConsumedBuffers.peekFirst().getBufferIndexAndChannel().getBufferIndex() + == expectedBufferIndex; + } + + @SuppressWarnings("FieldAccessNotGuarded") + // Un-synchronized get unConsumedBuffers size to provide memory data backlog,this will make the + // result greater than or equal to the actual backlog, but obtaining an accurate backlog will + // bring too much extra overhead. + @Override + public int getBacklog() { + return unConsumedBuffers.size(); + } + + @Override + public void releaseDataView() { + memoryDataManagerOperation.onConsumerRelease(subpartitionId, consumerId); + } + + @GuardedBy("consumerLock") + private void trimHeadingReleasedBuffers() { + while (!unConsumedBuffers.isEmpty() && unConsumedBuffers.peekFirst().isReleased()) { + unConsumedBuffers.removeFirst(); + } + } + + private <E extends Exception> void runWithLock(ThrowingRunnable<E> runnable) throws E { Review Comment: Unused ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java: ########## @@ -106,14 +111,19 @@ public HsMemoryDataManager( ReadWriteLock readWriteLock = new ReentrantReadWriteLock(true); this.lock = readWriteLock.writeLock(); + this.subpartitionViewOperationsMap = new ArrayList<>(numSubpartitions); + // currently, only HsFullSpillingStrategy supports multiple consumer. + final boolean allowMultipleConsumer = spillStrategy instanceof HsFullSpillingStrategy; Review Comment: It should be `HsResultPartition`, rather than `HsMemoryDataManager` or `HsSubpartitionMemoryDataManager`, to decide whether multiple consumers should be allowed. This is a property of the entire result partition, not the memory component. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java: ########## @@ -76,6 +78,8 @@ public class HsResultPartition extends ResultPartition { @Nullable private HsMemoryDataManager memoryDataManager; + private final boolean enableBroadcastOptimize; Review Comment: The name is confusing. I'd suggest `isBroadcastOnly`. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java: ########## @@ -186,17 +193,18 @@ public ResultSubpartitionView createSubpartitionView( throw new PartitionNotFoundException(getPartitionId()); } + // assign a unique id for each consumer, now it is guaranteed by the value that is one + // higher than the last consumerId's id field. + HsConsumerId consumerId = nextConsumerIds[subpartitionId]; + nextConsumerIds[subpartitionId] = new HsConsumerId(consumerId.getId() + 1); Review Comment: See my other comments for consumer id generation and multiple consumer checking. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java: ########## @@ -240,6 +242,8 @@ public ResultPartition create( .SpillingStrategyType.SELECTIVE) .build(), bufferCompressor, + // Only hybrid full result partition support broadcast optimization. + isBroadcast && type == ResultPartitionType.HYBRID_FULL, Review Comment: Instead of disabling this optimization for selective spilling, I'd suggest to always use full spilling when the result partition is broadcast. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsConsumerId.java: ########## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid; + +import java.util.Objects; + +/** This class represents the identifier of hybrid shuffle's consumer. */ +public class HsConsumerId { + /** + * This consumer id is used in the scenarios that information related to specific consumer needs + * to be ignored. + */ + public static final HsConsumerId ANY_CONSUMER_ID = new HsConsumerId(-1); + + /** + * This consumer id is used in the scenarios that only one consumer is allowed for a single + * subpartition. + */ + public static final HsConsumerId SINGLE_CONSUMER_ID = new HsConsumerId(0); + + /** This is a unique field for each consumer of a single subpartition. */ + private final int id; + + public HsConsumerId(int id) { + this.id = id; + } Review Comment: We can make this `private` to forbid creating with arbitrary integer (e.g., negative), and provide a factory like: ``` public static newId(@Nullable HsConsumerId lastId) { return lastId == null ? SINGLE_CONSUMER_ID : new HsConsumerId(lastId.id + 1); } ``` ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsConsumerId.java: ########## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid; + +import java.util.Objects; + +/** This class represents the identifier of hybrid shuffle's consumer. */ +public class HsConsumerId { + /** + * This consumer id is used in the scenarios that information related to specific consumer needs + * to be ignored. + */ + public static final HsConsumerId ANY_CONSUMER_ID = new HsConsumerId(-1); Review Comment: I'd suggest to name this `ANY`. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java: ########## @@ -393,36 +352,24 @@ private boolean canBeCompressed(Buffer buffer) { // subpartitionLock. private void addFinishedBuffer(HsBufferContext bufferContext) { finishedBufferIndex++; - boolean needNotify = - callWithLock( - () -> { - allBuffers.add(bufferContext); - unConsumedBuffers.add(bufferContext); - bufferIndexToContexts.put( - bufferContext.getBufferIndexAndChannel().getBufferIndex(), - bufferContext); - trimHeadingReleasedBuffers(unConsumedBuffers); - updateStatistics(bufferContext.getBuffer()); - return unConsumedBuffers.size() <= 1; - }); - if (needNotify) { - memoryDataManagerOperation.onDataAvailable(targetChannel); - } - } - - @GuardedBy("subpartitionLock") - private DataType peekNextToConsumeDataTypeInternal(int nextToConsumeIndex) { - return checkFirstUnConsumedBufferIndex(nextToConsumeIndex) - ? checkNotNull(unConsumedBuffers.peekFirst()).getBuffer().getDataType() - : DataType.NONE; - } - - @GuardedBy("subpartitionLock") - private boolean checkFirstUnConsumedBufferIndex(int expectedBufferIndex) { - trimHeadingReleasedBuffers(unConsumedBuffers); - return !unConsumedBuffers.isEmpty() - && unConsumedBuffers.peekFirst().getBufferIndexAndChannel().getBufferIndex() - == expectedBufferIndex; + Set<HsConsumerId> needNotify = new HashSet<>(consumerMap.size()); + runWithLock( + () -> { + allBuffers.add(bufferContext); + bufferIndexToContexts.put( + bufferContext.getBufferIndexAndChannel().getBufferIndex(), + bufferContext); + for (Map.Entry<HsConsumerId, HsSubpartitionMemoryDataConsumer> consumerEntry : + consumerMap.entrySet()) { + if (consumerEntry.getValue().addBuffer(bufferContext)) { + needNotify.add(consumerEntry.getKey()); + } + } + updateStatistics(bufferContext.getBuffer()); + }); + needNotify.forEach( + (consumerId) -> + memoryDataManagerOperation.onDataAvailable(targetChannel, consumerId)); Review Comment: We might change `HsMemoryDataManagerOperation#onDataAvailable` to take a collection of consumer ids. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java: ########## @@ -153,14 +153,18 @@ public synchronized void run() { /** This method only called by result partition to create subpartitionFileReader. */ public HsDataView registerNewSubpartition( - int subpartitionId, HsSubpartitionViewInternalOperations operation) throws IOException { + int subpartitionId, Review Comment: We should change this method name to `registerNewConsumer`. Actually, I think we should change it for all `HsResultPartition`, `HsMemoryDataManager`, `HsSubpartitionMemoryDataManager`, and even rename `HsSubpartitionView` and `HsSubpartitionViewInternalOperations`. We can leave the interface `ResultSubpartitionView` as is, which is shared by different shuffle implementations. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java: ########## @@ -393,36 +352,24 @@ private boolean canBeCompressed(Buffer buffer) { // subpartitionLock. private void addFinishedBuffer(HsBufferContext bufferContext) { finishedBufferIndex++; - boolean needNotify = - callWithLock( - () -> { - allBuffers.add(bufferContext); - unConsumedBuffers.add(bufferContext); - bufferIndexToContexts.put( - bufferContext.getBufferIndexAndChannel().getBufferIndex(), - bufferContext); - trimHeadingReleasedBuffers(unConsumedBuffers); - updateStatistics(bufferContext.getBuffer()); - return unConsumedBuffers.size() <= 1; - }); - if (needNotify) { - memoryDataManagerOperation.onDataAvailable(targetChannel); - } - } - - @GuardedBy("subpartitionLock") - private DataType peekNextToConsumeDataTypeInternal(int nextToConsumeIndex) { - return checkFirstUnConsumedBufferIndex(nextToConsumeIndex) - ? checkNotNull(unConsumedBuffers.peekFirst()).getBuffer().getDataType() - : DataType.NONE; - } - - @GuardedBy("subpartitionLock") - private boolean checkFirstUnConsumedBufferIndex(int expectedBufferIndex) { - trimHeadingReleasedBuffers(unConsumedBuffers); - return !unConsumedBuffers.isEmpty() - && unConsumedBuffers.peekFirst().getBufferIndexAndChannel().getBufferIndex() - == expectedBufferIndex; + Set<HsConsumerId> needNotify = new HashSet<>(consumerMap.size()); Review Comment: Might be `List`. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerOperation.java: ########## @@ -53,7 +53,16 @@ public interface HsMemoryDataManagerOperation { /** * This method is called when subpartition data become available. * - * @param subpartitionId the subpartition need notify data available. + * @param subpartitionId the subpartition's identifier that this consumer belongs to. + * @param consumerId the consumer's identifier which need notify data available. */ - void onDataAvailable(int subpartitionId); + void onDataAvailable(int subpartitionId, HsConsumerId consumerId); + + /** + * This method is called when consumer is decided to released. + * + * @param subpartitionId the subpartition's identifier that this consumer belongs to. + * @param consumerId the consumer's identifier which decided to be released. + */ + void onConsumerRelease(int subpartitionId, HsConsumerId consumerId); Review Comment: `onConsumerReleased` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
