This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit a2b402d66de9ecde75881796a6d7ee09fc4d6963 Author: Weijie Guo <[email protected]> AuthorDate: Mon Oct 24 18:16:01 2022 +0800 [FLINK-28889] Introduce HsSubpartitionConsumerMemoryDataManager and let HsMemoryDataManager supports multiple consumer. --- .../io/network/partition/hybrid/HsDataView.java | 4 +- .../partition/hybrid/HsFileDataManager.java | 3 +- .../partition/hybrid/HsMemoryDataManager.java | 60 ++++--- .../hybrid/HsMemoryDataManagerOperation.java | 15 +- .../partition/hybrid/HsResultPartition.java | 6 +- .../hybrid/HsSelectiveSpillingStrategy.java | 3 +- .../partition/hybrid/HsSpillingInfoProvider.java | 10 +- ...titionView.java => HsSubpartitionConsumer.java} | 20 ++- ... HsSubpartitionConsumerInternalOperations.java} | 6 +- .../HsSubpartitionConsumerMemoryDataManager.java | 185 +++++++++++++++++++ .../partition/hybrid/HsSubpartitionFileReader.java | 2 +- .../hybrid/HsSubpartitionFileReaderImpl.java | 6 +- .../hybrid/HsSubpartitionMemoryDataManager.java | 173 ++++++------------ .../partition/hybrid/HsFileDataManagerTest.java | 10 +- .../partition/hybrid/HsMemoryDataManagerTest.java | 20 +++ ...sSubpartitionConsumerMemoryDataManagerTest.java | 197 +++++++++++++++++++++ .../hybrid/HsSubpartitionFileReaderImplTest.java | 44 ++--- .../HsSubpartitionMemoryDataManagerTest.java | 132 ++++---------- .../partition/hybrid/HsSubpartitionViewTest.java | 60 ++++--- .../hybrid/TestingMemoryDataManagerOperation.java | 26 ++- .../hybrid/TestingSpillingInfoProvider.java | 2 +- ...tingSubpartitionConsumerInternalOperation.java} | 6 +- 22 files changed, 669 insertions(+), 321 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsDataView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsDataView.java index c3d4cb9ed83..b8888cfcb38 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsDataView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsDataView.java @@ -24,8 +24,8 @@ import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAn import java.util.Optional; /** - * A view for {@link HsSubpartitionView} to find out what data exists in memory or disk and polling - * the data. + * A view for {@link HsSubpartitionConsumer} to find out what data exists in memory or disk and + * polling the data. */ public interface HsDataView { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java index 75d322f6452..82db961be9c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java @@ -153,7 +153,8 @@ public class HsFileDataManager implements Runnable, BufferRecycler { /** This method only called by result partition to create subpartitionFileReader. */ public HsDataView registerNewSubpartition( - int subpartitionId, HsSubpartitionViewInternalOperations operation) throws IOException { + int subpartitionId, HsSubpartitionConsumerInternalOperations operation) + throws IOException { synchronized (lock) { checkState(!isReleased, "HsFileDataManager is already released."); lazyInitialize(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java index a8d7d89e7e4..da0c3962ad2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferBuilder; import org.apache.flink.runtime.io.network.buffer.BufferCompressor; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategy.Decision; +import org.apache.flink.util.Preconditions; import org.apache.flink.util.concurrent.ExecutorThreadFactory; import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.util.function.SupplierWithException; @@ -35,6 +36,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.file.Path; import java.util.ArrayList; +import java.util.Collection; import java.util.Deque; import java.util.List; import java.util.Map; @@ -72,8 +74,12 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData private final AtomicInteger numUnSpillBuffers = new AtomicInteger(0); - private final Map<Integer, HsSubpartitionViewInternalOperations> subpartitionViewOperationsMap = - new ConcurrentHashMap<>(); + /** + * Each element of the list is all views of the subpartition corresponding to its index, which + * are stored in the form of a map that maps consumer id to its subpartition view. + */ + private final List<Map<HsConsumerId, HsSubpartitionConsumerInternalOperations>> + subpartitionViewOperationsMap; /** * Currently, it is only used to regularly check the actual size of local buffer pool (the size @@ -106,6 +112,7 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData ReadWriteLock readWriteLock = new ReentrantReadWriteLock(true); this.lock = readWriteLock.writeLock(); + this.subpartitionViewOperationsMap = new ArrayList<>(numSubpartitions); for (int subpartitionId = 0; subpartitionId < numSubpartitions; ++subpartitionId) { subpartitionMemoryDataManagers[subpartitionId] = new HsSubpartitionMemoryDataManager( @@ -114,6 +121,7 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData readWriteLock.readLock(), bufferCompressor, this); + subpartitionViewOperationsMap.add(new ConcurrentHashMap<>()); } poolSize = new AtomicInteger(this.bufferPool.getNumBuffers()); @@ -156,20 +164,19 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData } /** - * Register {@link HsSubpartitionViewInternalOperations} to {@link + * Register {@link HsSubpartitionConsumerInternalOperations} to {@link * #subpartitionViewOperationsMap}. It is used to obtain the consumption progress of the * subpartition. */ - public HsDataView registerSubpartitionView( - int subpartitionId, HsSubpartitionViewInternalOperations viewOperations) { - HsSubpartitionViewInternalOperations oldView = - subpartitionViewOperationsMap.put(subpartitionId, viewOperations); - if (oldView != null) { - LOG.debug( - "subpartition : {} register subpartition view will replace old view. ", - subpartitionId); - } - return getSubpartitionMemoryDataManager(subpartitionId); + public HsDataView registerNewConsumer( + int subpartitionId, + HsConsumerId consumerId, + HsSubpartitionConsumerInternalOperations viewOperations) { + HsSubpartitionConsumerInternalOperations oldView = + subpartitionViewOperationsMap.get(subpartitionId).put(consumerId, viewOperations); + Preconditions.checkState( + oldView == null, "Each subpartition view should have unique consumerId."); + return getSubpartitionMemoryDataManager(subpartitionId).registerNewConsumer(consumerId); } /** Close this {@link HsMemoryDataManager}, it means no data can append to memory. */ @@ -232,11 +239,11 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData // Write lock should be acquired before invoke this method. @Override - public List<Integer> getNextBufferIndexToConsume() { + public List<Integer> getNextBufferIndexToConsume(HsConsumerId consumerId) { ArrayList<Integer> consumeIndexes = new ArrayList<>(numSubpartitions); for (int channel = 0; channel < numSubpartitions; channel++) { - HsSubpartitionViewInternalOperations viewOperation = - subpartitionViewOperationsMap.get(channel); + HsSubpartitionConsumerInternalOperations viewOperation = + subpartitionViewOperationsMap.get(channel).get(consumerId); // Access consuming offset without lock to prevent deadlock. // A consuming thread may being blocked on the memory data manager lock, while holding // the viewOperation lock. @@ -280,12 +287,23 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData } @Override - public void onDataAvailable(int subpartitionId) { - HsSubpartitionViewInternalOperations subpartitionViewInternalOperations = + public void onDataAvailable(int subpartitionId, Collection<HsConsumerId> consumerIds) { + Map<HsConsumerId, HsSubpartitionConsumerInternalOperations> consumerViewMap = subpartitionViewOperationsMap.get(subpartitionId); - if (subpartitionViewInternalOperations != null) { - subpartitionViewInternalOperations.notifyDataAvailable(); - } + consumerIds.forEach( + consumerId -> { + HsSubpartitionConsumerInternalOperations consumerView = + consumerViewMap.get(consumerId); + if (consumerView != null) { + consumerView.notifyDataAvailable(); + } + }); + } + + @Override + public void onConsumerReleased(int subpartitionId, HsConsumerId consumerId) { + subpartitionViewOperationsMap.get(subpartitionId).remove(consumerId); + getSubpartitionMemoryDataManager(subpartitionId).releaseConsumer(consumerId); } // ------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerOperation.java index 37df7709d63..794e95b98ea 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerOperation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerOperation.java @@ -20,6 +20,8 @@ package org.apache.flink.runtime.io.network.partition.hybrid; import org.apache.flink.runtime.io.network.buffer.BufferBuilder; +import java.util.Collection; + /** * This interface is used by {@link HsSubpartitionMemoryDataManager} to operate {@link * HsMemoryDataManager}. Spilling decision may be made and handled inside these operations. @@ -53,7 +55,16 @@ public interface HsMemoryDataManagerOperation { /** * This method is called when subpartition data become available. * - * @param subpartitionId the subpartition need notify data available. + * @param subpartitionId the subpartition's identifier that this consumer belongs to. + * @param consumerIds the consumer's identifier which need notify data available. + */ + void onDataAvailable(int subpartitionId, Collection<HsConsumerId> consumerIds); + + /** + * This method is called when consumer is decided to released. + * + * @param subpartitionId the subpartition's identifier that this consumer belongs to. + * @param consumerId the consumer's identifier which decided to be released. */ - void onDataAvailable(int subpartitionId); + void onConsumerReleased(int subpartitionId, HsConsumerId consumerId); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java index 9b75f1e3724..5e10b773397 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java @@ -186,13 +186,15 @@ public class HsResultPartition extends ResultPartition { throw new PartitionNotFoundException(getPartitionId()); } - HsSubpartitionView subpartitionView = new HsSubpartitionView(availabilityListener); + HsSubpartitionConsumer subpartitionView = new HsSubpartitionConsumer(availabilityListener); HsDataView diskDataView = fileDataManager.registerNewSubpartition(subpartitionId, subpartitionView); HsDataView memoryDataView = checkNotNull(memoryDataManager) - .registerSubpartitionView(subpartitionId, subpartitionView); + // TODO pass real consumerId in the next commit. + .registerNewConsumer( + subpartitionId, HsConsumerId.DEFAULT, subpartitionView); subpartitionView.setDiskDataView(diskDataView); subpartitionView.setMemoryDataView(memoryDataView); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategy.java index 8c7217a4b93..56cf0a39f4a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategy.java @@ -95,7 +95,8 @@ public class HsSelectiveSpillingStrategy implements HsSpillingStrategy { TreeMap<Integer, List<BufferIndexAndChannel>> subpartitionToHighPriorityBuffers = getBuffersByConsumptionPriorityInOrder( - spillingInfoProvider.getNextBufferIndexToConsume(), + // selective spilling strategy does not support multiple consumer. + spillingInfoProvider.getNextBufferIndexToConsume(HsConsumerId.DEFAULT), subpartitionToBuffers, spillNum); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingInfoProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingInfoProvider.java index deb32e7058e..7c6b0ccd70d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingInfoProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingInfoProvider.java @@ -31,12 +31,14 @@ public interface HsSpillingInfoProvider { int getNumSubpartitions(); /** - * Get all downstream next buffer index to consume. + * Get all subpartition's next buffer index to consume of specific consumer. * - * @return A list containing all downstream next buffer index to consume, if the downstream - * subpartition view has not been registered, the corresponding return value is -1. + * @param consumerId of the target downstream consumer. + * @return A list containing all subpartition's next buffer index to consume of specific + * consumer, if the downstream subpartition view has not been registered, the corresponding + * return value is -1. */ - List<Integer> getNextBufferIndexToConsume(); + List<Integer> getNextBufferIndexToConsume(HsConsumerId consumerId); /** * Get all buffers with the expected status from the subpartition. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionConsumer.java similarity index 94% rename from flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionConsumer.java index bc9ec5a1744..7878d68bc33 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionConsumer.java @@ -33,8 +33,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; /** The read view of HsResultPartition, data can be read from memory or disk. */ -public class HsSubpartitionView - implements ResultSubpartitionView, HsSubpartitionViewInternalOperations { +public class HsSubpartitionConsumer + implements ResultSubpartitionView, HsSubpartitionConsumerInternalOperations { private final BufferAvailabilityListener availabilityListener; private final Object lock = new Object(); @@ -66,7 +66,7 @@ public class HsSubpartitionView // memoryDataView can be null only before initialization. private HsDataView memoryDataView; - public HsSubpartitionView(BufferAvailabilityListener availabilityListener) { + public HsSubpartitionConsumer(BufferAvailabilityListener availabilityListener) { this.availabilityListener = availabilityListener; } @@ -270,21 +270,25 @@ public class HsSubpartitionView } private void releaseInternal(@Nullable Throwable throwable) { - boolean releaseSubpartitionReader = false; + boolean releaseDiskView; + boolean releaseMemoryView; synchronized (lock) { if (isReleased) { return; } isReleased = true; failureCause = throwable; - if (diskDataView != null) { - releaseSubpartitionReader = true; - } + releaseDiskView = diskDataView != null; + releaseMemoryView = memoryDataView != null; } // release subpartition reader outside of lock to avoid deadlock. - if (releaseSubpartitionReader) { + if (releaseDiskView) { //noinspection FieldAccessNotGuarded diskDataView.releaseDataView(); } + if (releaseMemoryView) { + //noinspection FieldAccessNotGuarded + memoryDataView.releaseDataView(); + } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewInternalOperations.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionConsumerInternalOperations.java similarity index 87% rename from flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewInternalOperations.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionConsumerInternalOperations.java index ab967e3d450..3f2e58507f3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewInternalOperations.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionConsumerInternalOperations.java @@ -19,10 +19,10 @@ package org.apache.flink.runtime.io.network.partition.hybrid; /** - * Operations provided by HsSubpartitionView that are used by other internal components of hybrid - * result partition. + * Operations provided by {@link HsSubpartitionConsumer} that are used by other internal components + * of hybrid result partition. */ -public interface HsSubpartitionViewInternalOperations { +public interface HsSubpartitionConsumerInternalOperations { /** Callback for new data become available. */ void notifyDataAvailable(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionConsumerMemoryDataManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionConsumerMemoryDataManager.java new file mode 100644 index 00000000000..606ffa32bbf --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionConsumerMemoryDataManager.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.partition.ResultSubpartition; +import org.apache.flink.util.function.SupplierWithException; + +import javax.annotation.concurrent.GuardedBy; + +import java.util.Deque; +import java.util.LinkedList; +import java.util.Optional; +import java.util.concurrent.locks.Lock; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This class is responsible for managing the data of a single consumer. {@link + * HsSubpartitionMemoryDataManager} will create a new {@link + * HsSubpartitionConsumerMemoryDataManager} when a consumer is registered. + */ +public class HsSubpartitionConsumerMemoryDataManager implements HsDataView { + + @GuardedBy("consumerLock") + private final Deque<HsBufferContext> unConsumedBuffers = new LinkedList<>(); + + private final Lock consumerLock; + + private final Lock resultPartitionLock; + + private final HsConsumerId consumerId; + + private final int subpartitionId; + + private final HsMemoryDataManagerOperation memoryDataManagerOperation; + + public HsSubpartitionConsumerMemoryDataManager( + Lock resultPartitionLock, + Lock consumerLock, + int subpartitionId, + HsConsumerId consumerId, + HsMemoryDataManagerOperation memoryDataManagerOperation) { + this.resultPartitionLock = resultPartitionLock; + this.consumerLock = consumerLock; + this.subpartitionId = subpartitionId; + this.consumerId = consumerId; + this.memoryDataManagerOperation = memoryDataManagerOperation; + } + + @GuardedBy("consumerLock") + // this method only called from subpartitionMemoryDataManager with write lock. + public void addInitialBuffers(Deque<HsBufferContext> buffers) { + unConsumedBuffers.addAll(buffers); + } + + @GuardedBy("consumerLock") + // this method only called from subpartitionMemoryDataManager with write lock. + public boolean addBuffer(HsBufferContext bufferContext) { + unConsumedBuffers.add(bufferContext); + trimHeadingReleasedBuffers(); + return unConsumedBuffers.size() <= 1; + } + + /** + * Check whether the head of {@link #unConsumedBuffers} is the buffer to be consumed. If so, + * return the buffer and backlog. + * + * @param toConsumeIndex index of buffer to be consumed. + * @return If the head of {@link #unConsumedBuffers} is target, return optional of the buffer + * and backlog. Otherwise, return {@link Optional#empty()}. + */ + @SuppressWarnings("FieldAccessNotGuarded") + // Note that: callWithLock ensure that code block guarded by resultPartitionReadLock and + // subpartitionLock. + @Override + public Optional<ResultSubpartition.BufferAndBacklog> consumeBuffer(int toConsumeIndex) { + Optional<Tuple2<HsBufferContext, Buffer.DataType>> bufferAndNextDataType = + callWithLock( + () -> { + if (!checkFirstUnConsumedBufferIndex(toConsumeIndex)) { + return Optional.empty(); + } + + HsBufferContext bufferContext = + checkNotNull(unConsumedBuffers.pollFirst()); + bufferContext.consumed(consumerId); + Buffer.DataType nextDataType = + peekNextToConsumeDataTypeInternal(toConsumeIndex + 1); + return Optional.of(Tuple2.of(bufferContext, nextDataType)); + }); + + bufferAndNextDataType.ifPresent( + tuple -> + memoryDataManagerOperation.onBufferConsumed( + tuple.f0.getBufferIndexAndChannel())); + return bufferAndNextDataType.map( + tuple -> + new ResultSubpartition.BufferAndBacklog( + tuple.f0.getBuffer().readOnlySlice(), + getBacklog(), + tuple.f1, + toConsumeIndex)); + } + + /** + * Check whether the head of {@link #unConsumedBuffers} is the buffer to be consumed next time. + * If so, return the next buffer's data type. + * + * @param nextToConsumeIndex index of the buffer to be consumed next time. + * @return If the head of {@link #unConsumedBuffers} is target, return the buffer's data type. + * Otherwise, return {@link Buffer.DataType#NONE}. + */ + @SuppressWarnings("FieldAccessNotGuarded") + // Note that: callWithLock ensure that code block guarded by resultPartitionReadLock and + // consumerLock. + @Override + public Buffer.DataType peekNextToConsumeDataType(int nextToConsumeIndex) { + return callWithLock(() -> peekNextToConsumeDataTypeInternal(nextToConsumeIndex)); + } + + @GuardedBy("consumerLock") + private Buffer.DataType peekNextToConsumeDataTypeInternal(int nextToConsumeIndex) { + return checkFirstUnConsumedBufferIndex(nextToConsumeIndex) + ? checkNotNull(unConsumedBuffers.peekFirst()).getBuffer().getDataType() + : Buffer.DataType.NONE; + } + + @GuardedBy("consumerLock") + private boolean checkFirstUnConsumedBufferIndex(int expectedBufferIndex) { + trimHeadingReleasedBuffers(); + return !unConsumedBuffers.isEmpty() + && unConsumedBuffers.peekFirst().getBufferIndexAndChannel().getBufferIndex() + == expectedBufferIndex; + } + + @SuppressWarnings("FieldAccessNotGuarded") + // Un-synchronized get unConsumedBuffers size to provide memory data backlog,this will make the + // result greater than or equal to the actual backlog, but obtaining an accurate backlog will + // bring too much extra overhead. + @Override + public int getBacklog() { + return unConsumedBuffers.size(); + } + + @Override + public void releaseDataView() { + memoryDataManagerOperation.onConsumerReleased(subpartitionId, consumerId); + } + + @GuardedBy("consumerLock") + private void trimHeadingReleasedBuffers() { + while (!unConsumedBuffers.isEmpty() && unConsumedBuffers.peekFirst().isReleased()) { + unConsumedBuffers.removeFirst(); + } + } + + private <R, E extends Exception> R callWithLock(SupplierWithException<R, E> callable) throws E { + try { + resultPartitionLock.lock(); + consumerLock.lock(); + return callable.get(); + } finally { + consumerLock.unlock(); + resultPartitionLock.unlock(); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReader.java index 2ec2a632281..c87bb237660 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReader.java @@ -58,7 +58,7 @@ public interface HsSubpartitionFileReader extends Comparable<HsSubpartitionFileR HsSubpartitionFileReader createFileReader( int subpartitionId, FileChannel dataFileChannel, - HsSubpartitionViewInternalOperations operation, + HsSubpartitionConsumerInternalOperations operation, HsFileDataIndex dataIndex, int maxBuffersReadAhead, Consumer<HsSubpartitionFileReader> fileReaderReleaser, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java index e6dc7122c5e..de7bd599e14 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java @@ -57,7 +57,7 @@ public class HsSubpartitionFileReaderImpl implements HsSubpartitionFileReader { private final FileChannel dataFileChannel; - private final HsSubpartitionViewInternalOperations operations; + private final HsSubpartitionConsumerInternalOperations operations; private final CachedRegionManager cachedRegionManager; @@ -72,7 +72,7 @@ public class HsSubpartitionFileReaderImpl implements HsSubpartitionFileReader { public HsSubpartitionFileReaderImpl( int subpartitionId, FileChannel dataFileChannel, - HsSubpartitionViewInternalOperations operations, + HsSubpartitionConsumerInternalOperations operations, HsFileDataIndex dataIndex, int maxBufferReadAhead, Consumer<HsSubpartitionFileReader> fileReaderReleaser, @@ -487,7 +487,7 @@ public class HsSubpartitionFileReaderImpl implements HsSubpartitionFileReader { public HsSubpartitionFileReader createFileReader( int subpartitionId, FileChannel dataFileChannel, - HsSubpartitionViewInternalOperations operation, + HsSubpartitionConsumerInternalOperations operation, HsFileDataIndex dataIndex, int maxBuffersReadAhead, Consumer<HsSubpartitionFileReader> fileReaderReleaser, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java index f6804a62b1b..d01cb0d2c78 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.io.network.partition.hybrid; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.runtime.io.network.buffer.Buffer; @@ -28,7 +27,6 @@ import org.apache.flink.runtime.io.network.buffer.BufferCompressor; import org.apache.flink.runtime.io.network.buffer.BufferConsumer; import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; -import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog; import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider.ConsumeStatus; import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider.ConsumeStatusWithId; import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider.SpillStatus; @@ -40,6 +38,7 @@ import javax.annotation.concurrent.GuardedBy; import java.nio.ByteBuffer; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.Deque; import java.util.HashMap; import java.util.LinkedList; @@ -49,16 +48,18 @@ import java.util.Optional; import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; /** * This class is responsible for managing the data in a single subpartition. One {@link * HsMemoryDataManager} will hold multiple {@link HsSubpartitionMemoryDataManager}. */ -public class HsSubpartitionMemoryDataManager implements HsDataView { +public class HsSubpartitionMemoryDataManager { private final int targetChannel; private final int bufferSize; @@ -74,9 +75,6 @@ public class HsSubpartitionMemoryDataManager implements HsDataView { @GuardedBy("subpartitionLock") private final Deque<HsBufferContext> allBuffers = new LinkedList<>(); - @GuardedBy("subpartitionLock") - private final Deque<HsBufferContext> unConsumedBuffers = new LinkedList<>(); - @GuardedBy("subpartitionLock") private final Map<Integer, HsBufferContext> bufferIndexToContexts = new HashMap<>(); @@ -84,7 +82,10 @@ public class HsSubpartitionMemoryDataManager implements HsDataView { private final Lock resultPartitionLock; /** DO NOT USE DIRECTLY. Use {@link #runWithLock} or {@link #callWithLock} instead. */ - private final Object subpartitionLock = new Object(); + private final ReentrantReadWriteLock subpartitionLock = new ReentrantReadWriteLock(); + + @GuardedBy("subpartitionLock") + private final Map<HsConsumerId, HsSubpartitionConsumerMemoryDataManager> consumerMap; @Nullable private final BufferCompressor bufferCompressor; @@ -101,82 +102,7 @@ public class HsSubpartitionMemoryDataManager implements HsDataView { this.resultPartitionLock = resultPartitionLock; this.memoryDataManagerOperation = memoryDataManagerOperation; this.bufferCompressor = bufferCompressor; - } - - // ------------------------------------------------------------------------ - // Called by Consumer - // ------------------------------------------------------------------------ - - /** - * Check whether the head of {@link #unConsumedBuffers} is the buffer to be consumed next time. - * If so, return the next buffer's data type. - * - * @param nextToConsumeIndex index of the buffer to be consumed next time. - * @return If the head of {@link #unConsumedBuffers} is target, return the buffer's data type. - * Otherwise, return {@link DataType#NONE}. - */ - @SuppressWarnings("FieldAccessNotGuarded") - // Note that: callWithLock ensure that code block guarded by resultPartitionReadLock and - // subpartitionLock. - @Override - public DataType peekNextToConsumeDataType(int nextToConsumeIndex) { - return callWithLock(() -> peekNextToConsumeDataTypeInternal(nextToConsumeIndex)); - } - - /** - * Check whether the head of {@link #unConsumedBuffers} is the buffer to be consumed. If so, - * return the buffer and backlog. - * - * @param toConsumeIndex index of buffer to be consumed. - * @return If the head of {@link #unConsumedBuffers} is target, return optional of the buffer - * and backlog. Otherwise, return {@link Optional#empty()}. - */ - @SuppressWarnings("FieldAccessNotGuarded") - // Note that: callWithLock ensure that code block guarded by resultPartitionReadLock and - // subpartitionLock. - @Override - public Optional<BufferAndBacklog> consumeBuffer(int toConsumeIndex) { - Optional<Tuple2<HsBufferContext, DataType>> bufferAndNextDataType = - callWithLock( - () -> { - if (!checkFirstUnConsumedBufferIndex(toConsumeIndex)) { - return Optional.empty(); - } - - HsBufferContext bufferContext = - checkNotNull(unConsumedBuffers.pollFirst()); - // TODO move this logical to consumer and pass real consumerId. - bufferContext.consumed(HsConsumerId.DEFAULT); - DataType nextDataType = - peekNextToConsumeDataTypeInternal(toConsumeIndex + 1); - return Optional.of(Tuple2.of(bufferContext, nextDataType)); - }); - - bufferAndNextDataType.ifPresent( - tuple -> - memoryDataManagerOperation.onBufferConsumed( - tuple.f0.getBufferIndexAndChannel())); - return bufferAndNextDataType.map( - tuple -> - new BufferAndBacklog( - tuple.f0.getBuffer().readOnlySlice(), - getBacklog(), - tuple.f1, - toConsumeIndex)); - } - - @SuppressWarnings("FieldAccessNotGuarded") - // Un-synchronized get unConsumedBuffers size to provide memory data backlog,this will make the - // result greater than or equal to the actual backlog, but obtaining an accurate backlog will - // bring too much extra overhead. - @Override - public int getBacklog() { - return unConsumedBuffers.size(); - } - - @Override - public void releaseDataView() { - // nothing to do for memory data. + this.consumerMap = new HashMap<>(); } // ------------------------------------------------------------------------ @@ -285,6 +211,29 @@ public class HsSubpartitionMemoryDataManager implements HsDataView { this.outputMetrics = checkNotNull(outputMetrics); } + @SuppressWarnings("FieldAccessNotGuarded") + public HsSubpartitionConsumerMemoryDataManager registerNewConsumer(HsConsumerId consumerId) { + return callWithLock( + () -> { + checkState(!consumerMap.containsKey(consumerId)); + HsSubpartitionConsumerMemoryDataManager newConsumer = + new HsSubpartitionConsumerMemoryDataManager( + resultPartitionLock, + subpartitionLock.readLock(), + targetChannel, + consumerId, + memoryDataManagerOperation); + newConsumer.addInitialBuffers(allBuffers); + consumerMap.put(consumerId, newConsumer); + return newConsumer; + }); + } + + @SuppressWarnings("FieldAccessNotGuarded") + public void releaseConsumer(HsConsumerId consumerId) { + runWithLock(() -> checkNotNull(consumerMap.remove(consumerId))); + } + // ------------------------------------------------------------------------ // Internal Methods // ------------------------------------------------------------------------ @@ -393,36 +342,22 @@ public class HsSubpartitionMemoryDataManager implements HsDataView { // subpartitionLock. private void addFinishedBuffer(HsBufferContext bufferContext) { finishedBufferIndex++; - boolean needNotify = - callWithLock( - () -> { - allBuffers.add(bufferContext); - unConsumedBuffers.add(bufferContext); - bufferIndexToContexts.put( - bufferContext.getBufferIndexAndChannel().getBufferIndex(), - bufferContext); - trimHeadingReleasedBuffers(unConsumedBuffers); - updateStatistics(bufferContext.getBuffer()); - return unConsumedBuffers.size() <= 1; - }); - if (needNotify) { - memoryDataManagerOperation.onDataAvailable(targetChannel); - } - } - - @GuardedBy("subpartitionLock") - private DataType peekNextToConsumeDataTypeInternal(int nextToConsumeIndex) { - return checkFirstUnConsumedBufferIndex(nextToConsumeIndex) - ? checkNotNull(unConsumedBuffers.peekFirst()).getBuffer().getDataType() - : DataType.NONE; - } - - @GuardedBy("subpartitionLock") - private boolean checkFirstUnConsumedBufferIndex(int expectedBufferIndex) { - trimHeadingReleasedBuffers(unConsumedBuffers); - return !unConsumedBuffers.isEmpty() - && unConsumedBuffers.peekFirst().getBufferIndexAndChannel().getBufferIndex() - == expectedBufferIndex; + List<HsConsumerId> needNotify = new ArrayList<>(consumerMap.size()); + runWithLock( + () -> { + allBuffers.add(bufferContext); + bufferIndexToContexts.put( + bufferContext.getBufferIndexAndChannel().getBufferIndex(), + bufferContext); + for (Map.Entry<HsConsumerId, HsSubpartitionConsumerMemoryDataManager> + consumerEntry : consumerMap.entrySet()) { + if (consumerEntry.getValue().addBuffer(bufferContext)) { + needNotify.add(consumerEntry.getKey()); + } + } + updateStatistics(bufferContext.getBuffer()); + }); + memoryDataManagerOperation.onDataAvailable(targetChannel, needNotify); } /** @@ -517,10 +452,10 @@ public class HsSubpartitionMemoryDataManager implements HsDataView { private <E extends Exception> void runWithLock(ThrowingRunnable<E> runnable) throws E { try { resultPartitionLock.lock(); - synchronized (subpartitionLock) { - runnable.run(); - } + subpartitionLock.writeLock().lock(); + runnable.run(); } finally { + subpartitionLock.writeLock().unlock(); resultPartitionLock.unlock(); } } @@ -528,10 +463,10 @@ public class HsSubpartitionMemoryDataManager implements HsDataView { private <R, E extends Exception> R callWithLock(SupplierWithException<R, E> callable) throws E { try { resultPartitionLock.lock(); - synchronized (subpartitionLock) { - return callable.get(); - } + subpartitionLock.writeLock().lock(); + return callable.get(); } finally { + subpartitionLock.writeLock().unlock(); resultPartitionLock.unlock(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java index 4c168c13fd6..0980688b7f2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java @@ -78,7 +78,7 @@ class HsFileDataManagerTest { private HsFileDataManager fileDataManager; - private TestingSubpartitionViewInternalOperation subpartitionViewOperation; + private TestingSubpartitionConsumerInternalOperation subpartitionViewOperation; private TestingHsSubpartitionFileReader.Factory factory; @@ -101,7 +101,7 @@ class HsFileDataManagerTest { HybridShuffleConfiguration.builder( NUM_SUBPARTITIONS, bufferPool.getNumBuffersPerRequest()) .build()); - subpartitionViewOperation = new TestingSubpartitionViewInternalOperation(); + subpartitionViewOperation = new TestingSubpartitionConsumerInternalOperation(); } @AfterEach @@ -352,8 +352,8 @@ class HsFileDataManagerTest { void testConsumeWhileReleaseNoDeadlock() throws Exception { CompletableFuture<Void> consumerStart = new CompletableFuture<>(); CompletableFuture<Void> readerFail = new CompletableFuture<>(); - HsSubpartitionView subpartitionView = - new HsSubpartitionView(new NoOpBufferAvailablityListener()); + HsSubpartitionConsumer subpartitionView = + new HsSubpartitionConsumer(new NoOpBufferAvailablityListener()); HsSubpartitionFileReaderImpl subpartitionFileReader = new HsSubpartitionFileReaderImpl( @@ -497,7 +497,7 @@ class HsFileDataManagerTest { public HsSubpartitionFileReader createFileReader( int subpartitionId, FileChannel dataFileChannel, - HsSubpartitionViewInternalOperations operation, + HsSubpartitionConsumerInternalOperations operation, HsFileDataIndex dataIndex, int maxBuffersReadAhead, Consumer<HsSubpartitionFileReader> fileReaderReleaser, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerTest.java index 773b847ecc1..e3c3663af06 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerTest.java @@ -41,6 +41,7 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.createTestingOutputMetrics; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for {@link HsMemoryDataManager}. */ class HsMemoryDataManagerTest { @@ -192,6 +193,25 @@ class HsMemoryDataManagerTest { assertThat(resultPartitionReleaseFuture).isCompleted(); } + @Test + void testSubpartitionConsumerRelease() throws Exception { + HsSpillingStrategy spillingStrategy = TestingSpillingStrategy.builder().build(); + HsMemoryDataManager memoryDataManager = createMemoryDataManager(spillingStrategy); + memoryDataManager.registerNewConsumer( + 0, HsConsumerId.DEFAULT, new TestingSubpartitionConsumerInternalOperation()); + assertThatThrownBy( + () -> + memoryDataManager.registerNewConsumer( + 0, + HsConsumerId.DEFAULT, + new TestingSubpartitionConsumerInternalOperation())) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("Each subpartition view should have unique consumerId."); + memoryDataManager.onConsumerReleased(0, HsConsumerId.DEFAULT); + memoryDataManager.registerNewConsumer( + 0, HsConsumerId.DEFAULT, new TestingSubpartitionConsumerInternalOperation()); + } + @Test void testPoolSizeCheck() throws Exception { final int requiredBuffers = 10; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionConsumerMemoryDataManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionConsumerMemoryDataManagerTest.java new file mode 100644 index 00000000000..9cb8d8b1751 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionConsumerMemoryDataManagerTest.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.ReadOnlySlicedNetworkBuffer; +import org.apache.flink.runtime.io.network.partition.ResultSubpartition; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayDeque; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.locks.ReentrantLock; + +import static org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.createBuffer; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link HsSubpartitionConsumerMemoryDataManager}. */ +@SuppressWarnings("FieldAccessNotGuarded") +class HsSubpartitionConsumerMemoryDataManagerTest { + + private static final int BUFFER_SIZE = Long.BYTES; + + private static final int SUBPARTITION_ID = 0; + + @Test + void testPeekNextToConsumeDataTypeNotMeetBufferIndexToConsume() throws Exception { + TestingMemoryDataManagerOperation memoryDataManagerOperation = + TestingMemoryDataManagerOperation.builder().build(); + HsSubpartitionConsumerMemoryDataManager subpartitionConsumerMemoryDataManager = + createSubpartitionConsumerMemoryDataManager(memoryDataManagerOperation); + + subpartitionConsumerMemoryDataManager.addBuffer(createBufferContext(0, false)); + assertThat(subpartitionConsumerMemoryDataManager.peekNextToConsumeDataType(1)) + .isEqualTo(Buffer.DataType.NONE); + } + + @Test + void testPeekNextToConsumeDataTypeTrimHeadingReleasedBuffers() throws Exception { + TestingMemoryDataManagerOperation memoryDataManagerOperation = + TestingMemoryDataManagerOperation.builder().build(); + HsSubpartitionConsumerMemoryDataManager subpartitionConsumerMemoryDataManager = + createSubpartitionConsumerMemoryDataManager(memoryDataManagerOperation); + + HsBufferContext buffer1 = createBufferContext(0, false); + HsBufferContext buffer2 = createBufferContext(1, false); + subpartitionConsumerMemoryDataManager.addBuffer(buffer1); + subpartitionConsumerMemoryDataManager.addBuffer(buffer2); + subpartitionConsumerMemoryDataManager.addBuffer(createBufferContext(2, true)); + + buffer1.release(); + buffer2.release(); + + assertThat(subpartitionConsumerMemoryDataManager.peekNextToConsumeDataType(2)) + .isEqualTo(Buffer.DataType.EVENT_BUFFER); + } + + @Test + void testConsumeBufferFirstUnConsumedBufferIndexNotMeetNextToConsume() throws Exception { + TestingMemoryDataManagerOperation memoryDataManagerOperation = + TestingMemoryDataManagerOperation.builder().build(); + HsSubpartitionConsumerMemoryDataManager subpartitionConsumerMemoryDataManager = + createSubpartitionConsumerMemoryDataManager(memoryDataManagerOperation); + + subpartitionConsumerMemoryDataManager.addBuffer(createBufferContext(0, false)); + assertThat(subpartitionConsumerMemoryDataManager.consumeBuffer(1)).isNotPresent(); + } + + @Test + void testConsumeBufferTrimHeadingReleasedBuffers() throws Exception { + TestingMemoryDataManagerOperation memoryDataManagerOperation = + TestingMemoryDataManagerOperation.builder().build(); + HsSubpartitionConsumerMemoryDataManager subpartitionConsumerMemoryDataManager = + createSubpartitionConsumerMemoryDataManager(memoryDataManagerOperation); + + HsBufferContext buffer1 = createBufferContext(0, false); + HsBufferContext buffer2 = createBufferContext(1, false); + subpartitionConsumerMemoryDataManager.addBuffer(buffer1); + subpartitionConsumerMemoryDataManager.addBuffer(buffer2); + subpartitionConsumerMemoryDataManager.addBuffer(createBufferContext(2, true)); + + buffer1.release(); + buffer2.release(); + + assertThat(subpartitionConsumerMemoryDataManager.consumeBuffer(2)).isPresent(); + } + + @Test + void testConsumeBufferReturnSlice() { + TestingMemoryDataManagerOperation memoryDataManagerOperation = + TestingMemoryDataManagerOperation.builder().build(); + HsSubpartitionConsumerMemoryDataManager subpartitionConsumerMemoryDataManager = + createSubpartitionConsumerMemoryDataManager(memoryDataManagerOperation); + + subpartitionConsumerMemoryDataManager.addBuffer(createBufferContext(0, false)); + + Optional<ResultSubpartition.BufferAndBacklog> bufferOpt = + subpartitionConsumerMemoryDataManager.consumeBuffer(0); + assertThat(bufferOpt) + .hasValueSatisfying( + (bufferAndBacklog -> + assertThat(bufferAndBacklog.buffer()) + .isInstanceOf(ReadOnlySlicedNetworkBuffer.class))); + } + + @Test + void testAddBuffer() { + TestingMemoryDataManagerOperation memoryDataManagerOperation = + TestingMemoryDataManagerOperation.builder().build(); + HsSubpartitionConsumerMemoryDataManager subpartitionConsumerMemoryDataManager = + createSubpartitionConsumerMemoryDataManager(memoryDataManagerOperation); + ArrayDeque<HsBufferContext> initialBuffers = new ArrayDeque<>(); + initialBuffers.add(createBufferContext(0, false)); + initialBuffers.add(createBufferContext(1, false)); + subpartitionConsumerMemoryDataManager.addInitialBuffers(initialBuffers); + subpartitionConsumerMemoryDataManager.addBuffer(createBufferContext(2, true)); + + assertThat(subpartitionConsumerMemoryDataManager.consumeBuffer(0)) + .hasValueSatisfying( + bufferAndBacklog -> { + assertThat(bufferAndBacklog.getSequenceNumber()).isEqualTo(0); + assertThat(bufferAndBacklog.buffer().getDataType()) + .isEqualTo(Buffer.DataType.DATA_BUFFER); + }); + assertThat(subpartitionConsumerMemoryDataManager.consumeBuffer(1)) + .hasValueSatisfying( + bufferAndBacklog -> { + assertThat(bufferAndBacklog.getSequenceNumber()).isEqualTo(1); + assertThat(bufferAndBacklog.buffer().getDataType()) + .isEqualTo(Buffer.DataType.DATA_BUFFER); + }); + assertThat(subpartitionConsumerMemoryDataManager.consumeBuffer(2)) + .hasValueSatisfying( + bufferAndBacklog -> { + assertThat(bufferAndBacklog.getSequenceNumber()).isEqualTo(2); + assertThat(bufferAndBacklog.buffer().getDataType()) + .isEqualTo(Buffer.DataType.EVENT_BUFFER); + }); + } + + @Test + void testRelease() { + CompletableFuture<HsConsumerId> consumerReleasedFuture = new CompletableFuture<>(); + TestingMemoryDataManagerOperation memoryDataManagerOperation = + TestingMemoryDataManagerOperation.builder() + .setOnConsumerReleasedBiConsumer( + (subpartitionId, consumerId) -> { + consumerReleasedFuture.complete(consumerId); + }) + .build(); + HsConsumerId consumerId = HsConsumerId.newId(null); + HsSubpartitionConsumerMemoryDataManager subpartitionConsumerMemoryDataManager = + createSubpartitionConsumerMemoryDataManager(consumerId, memoryDataManagerOperation); + subpartitionConsumerMemoryDataManager.releaseDataView(); + assertThat(consumerReleasedFuture).isCompletedWithValue(consumerId); + } + + private static HsBufferContext createBufferContext(int bufferIndex, boolean isEvent) { + return new HsBufferContext( + createBuffer(BUFFER_SIZE, isEvent), bufferIndex, SUBPARTITION_ID); + } + + private HsSubpartitionConsumerMemoryDataManager createSubpartitionConsumerMemoryDataManager( + HsMemoryDataManagerOperation memoryDataManagerOperation) { + return createSubpartitionConsumerMemoryDataManager( + HsConsumerId.DEFAULT, memoryDataManagerOperation); + } + + private HsSubpartitionConsumerMemoryDataManager createSubpartitionConsumerMemoryDataManager( + HsConsumerId consumerId, HsMemoryDataManagerOperation memoryDataManagerOperation) { + return new HsSubpartitionConsumerMemoryDataManager( + new ReentrantLock(), + new ReentrantLock(), + // consumerMemoryDataManager is a member of subpartitionMemoryDataManager, using a + // fixed subpartition id is enough. + SUBPARTITION_ID, + consumerId, + memoryDataManagerOperation); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java index 03a560589d6..e60eff5dc09 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java @@ -75,7 +75,7 @@ class HsSubpartitionFileReaderImplTest { private HsFileDataIndex diskIndex; - private TestingSubpartitionViewInternalOperation subpartitionOperation; + private TestingSubpartitionConsumerInternalOperation subpartitionOperation; private FileChannel dataFileChannel; @@ -87,7 +87,7 @@ class HsSubpartitionFileReaderImplTest { Path dataFilePath = Files.createFile(tempPath.resolve(UUID.randomUUID().toString())); dataFileChannel = openFileChannel(dataFilePath); diskIndex = new HsFileDataIndexImpl(1); - subpartitionOperation = new TestingSubpartitionViewInternalOperation(); + subpartitionOperation = new TestingSubpartitionConsumerInternalOperation(); currentFileOffset = 0L; } @@ -99,10 +99,10 @@ class HsSubpartitionFileReaderImplTest { @Test void testReadBuffer() throws Exception { diskIndex = new HsFileDataIndexImpl(2); - TestingSubpartitionViewInternalOperation viewNotifier1 = - new TestingSubpartitionViewInternalOperation(); - TestingSubpartitionViewInternalOperation viewNotifier2 = - new TestingSubpartitionViewInternalOperation(); + TestingSubpartitionConsumerInternalOperation viewNotifier1 = + new TestingSubpartitionConsumerInternalOperation(); + TestingSubpartitionConsumerInternalOperation viewNotifier2 = + new TestingSubpartitionConsumerInternalOperation(); HsSubpartitionFileReaderImpl fileReader1 = createSubpartitionFileReader(0, viewNotifier1); HsSubpartitionFileReaderImpl fileReader2 = createSubpartitionFileReader(1, viewNotifier2); @@ -142,8 +142,8 @@ class HsSubpartitionFileReaderImplTest { new BufferDecompressor(bufferSize, compressionFactoryName); diskIndex = new HsFileDataIndexImpl(1); - TestingSubpartitionViewInternalOperation viewNotifier = - new TestingSubpartitionViewInternalOperation(); + TestingSubpartitionConsumerInternalOperation viewNotifier = + new TestingSubpartitionConsumerInternalOperation(); HsSubpartitionFileReaderImpl fileReader1 = createSubpartitionFileReader(0, viewNotifier); writeDataToFile(0, 0, 1, 3, bufferCompressor); @@ -362,10 +362,10 @@ class HsSubpartitionFileReaderImplTest { @Test void testCompareTo() throws Exception { diskIndex = new HsFileDataIndexImpl(2); - TestingSubpartitionViewInternalOperation viewNotifier1 = - new TestingSubpartitionViewInternalOperation(); - TestingSubpartitionViewInternalOperation viewNotifier2 = - new TestingSubpartitionViewInternalOperation(); + TestingSubpartitionConsumerInternalOperation viewNotifier1 = + new TestingSubpartitionConsumerInternalOperation(); + TestingSubpartitionConsumerInternalOperation viewNotifier2 = + new TestingSubpartitionConsumerInternalOperation(); HsSubpartitionFileReaderImpl fileReader1 = createSubpartitionFileReader(0, viewNotifier1); HsSubpartitionFileReaderImpl fileReader2 = createSubpartitionFileReader(1, viewNotifier2); assertThat(fileReader1).isEqualByComparingTo(fileReader2); @@ -390,8 +390,8 @@ class HsSubpartitionFileReaderImplTest { @Test void testConsumeBuffer() throws Throwable { - TestingSubpartitionViewInternalOperation viewNotifier = - new TestingSubpartitionViewInternalOperation(); + TestingSubpartitionConsumerInternalOperation viewNotifier = + new TestingSubpartitionConsumerInternalOperation(); HsSubpartitionFileReaderImpl subpartitionFileReader = createSubpartitionFileReader(0, viewNotifier); @@ -428,8 +428,8 @@ class HsSubpartitionFileReaderImplTest { @Test void testPeekNextToConsumeDataTypeOrConsumeBufferThrowException() { - TestingSubpartitionViewInternalOperation viewNotifier = - new TestingSubpartitionViewInternalOperation(); + TestingSubpartitionConsumerInternalOperation viewNotifier = + new TestingSubpartitionConsumerInternalOperation(); HsSubpartitionFileReaderImpl subpartitionFileReader = createSubpartitionFileReader(0, viewNotifier); @@ -446,8 +446,8 @@ class HsSubpartitionFileReaderImplTest { @Test void testPeekNextToConsumeDataType() throws Throwable { - TestingSubpartitionViewInternalOperation viewNotifier = - new TestingSubpartitionViewInternalOperation(); + TestingSubpartitionConsumerInternalOperation viewNotifier = + new TestingSubpartitionConsumerInternalOperation(); HsSubpartitionFileReaderImpl subpartitionFileReader = createSubpartitionFileReader(0, viewNotifier); @@ -477,8 +477,8 @@ class HsSubpartitionFileReaderImplTest { */ @Test void testSubpartitionReaderRegisterMultipleTimes() throws Exception { - TestingSubpartitionViewInternalOperation viewNotifier = - new TestingSubpartitionViewInternalOperation(); + TestingSubpartitionConsumerInternalOperation viewNotifier = + new TestingSubpartitionConsumerInternalOperation(); HsSubpartitionFileReaderImpl subpartitionFileReader = createSubpartitionFileReader(0, viewNotifier); // mock the scenario that buffer 0 is already read form memory. @@ -491,7 +491,7 @@ class HsSubpartitionFileReaderImplTest { checkData(subpartitionFileReader, 2, 3); // after failover, new view and subpartitionFileReader will be created. - viewNotifier = new TestingSubpartitionViewInternalOperation(); + viewNotifier = new TestingSubpartitionConsumerInternalOperation(); subpartitionFileReader = createSubpartitionFileReader(0, viewNotifier); subpartitionFileReader.prepareForScheduling(); memorySegments = createsMemorySegments(3); @@ -529,7 +529,7 @@ class HsSubpartitionFileReaderImplTest { } private HsSubpartitionFileReaderImpl createSubpartitionFileReader( - int targetChannel, HsSubpartitionViewInternalOperations operations) { + int targetChannel, HsSubpartitionConsumerInternalOperations operations) { return new HsSubpartitionFileReaderImpl( targetChannel, dataFileChannel, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManagerTest.java index 174c021990d..998c87397f1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManagerTest.java @@ -28,7 +28,6 @@ import org.apache.flink.runtime.io.network.buffer.Buffer.DataType; import org.apache.flink.runtime.io.network.buffer.BufferBuilder; import org.apache.flink.runtime.io.network.buffer.BufferCompressor; import org.apache.flink.runtime.io.network.buffer.BufferDecompressor; -import org.apache.flink.runtime.io.network.buffer.ReadOnlySlicedNetworkBuffer; import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog; import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider.ConsumeStatus; import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider.SpillStatus; @@ -53,11 +52,14 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static org.apache.flink.runtime.io.network.partition.hybrid.HsConsumerId.DEFAULT; import static org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider.ConsumeStatusWithId.ALL_ANY; import static org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider.ConsumeStatusWithId.fromStatusAndConsumerId; import static org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.createBufferBuilder; import static org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.createTestingOutputMetrics; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatNoException; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for {@link HsSubpartitionMemoryDataManager}. */ class HsSubpartitionMemoryDataManagerTest { @@ -121,98 +123,9 @@ class HsSubpartitionMemoryDataManagerTest { assertThat(finishedBuffers).hasValue(2); } - @Test - void testPeekNextToConsumeDataTypeNotMeetBufferIndexToConsume() throws Exception { - TestingMemoryDataManagerOperation memoryDataManagerOperation = - TestingMemoryDataManagerOperation.builder() - .setRequestBufferFromPoolSupplier(() -> createBufferBuilder(RECORD_SIZE)) - .build(); - HsSubpartitionMemoryDataManager subpartitionMemoryDataManager = - createSubpartitionMemoryDataManager(memoryDataManagerOperation); - - subpartitionMemoryDataManager.append(createRecord(0), DataType.DATA_BUFFER); - - assertThat(subpartitionMemoryDataManager.peekNextToConsumeDataType(1)) - .isEqualTo(DataType.NONE); - } - - @Test - void testPeekNextToConsumeDataTypeTrimHeadingReleasedBuffers() throws Exception { - TestingMemoryDataManagerOperation memoryDataManagerOperation = - TestingMemoryDataManagerOperation.builder() - .setRequestBufferFromPoolSupplier(() -> createBufferBuilder(RECORD_SIZE)) - .build(); - HsSubpartitionMemoryDataManager subpartitionMemoryDataManager = - createSubpartitionMemoryDataManager(memoryDataManagerOperation); - - subpartitionMemoryDataManager.append(createRecord(0), DataType.DATA_BUFFER); - subpartitionMemoryDataManager.append(createRecord(1), DataType.DATA_BUFFER); - subpartitionMemoryDataManager.append(createRecord(2), DataType.EVENT_BUFFER); - - List<BufferIndexAndChannel> toRelease = - HybridShuffleTestUtils.createBufferIndexAndChannelsList(0, 0, 1); - subpartitionMemoryDataManager.releaseSubpartitionBuffers(toRelease); - - assertThat(subpartitionMemoryDataManager.peekNextToConsumeDataType(2)) - .isEqualTo(DataType.EVENT_BUFFER); - } - - @Test - void testConsumeBufferFirstUnConsumedBufferIndexNotMeetNextToConsume() throws Exception { - TestingMemoryDataManagerOperation memoryDataManagerOperation = - TestingMemoryDataManagerOperation.builder() - .setRequestBufferFromPoolSupplier(() -> createBufferBuilder(RECORD_SIZE)) - .build(); - HsSubpartitionMemoryDataManager subpartitionMemoryDataManager = - createSubpartitionMemoryDataManager(memoryDataManagerOperation); - - subpartitionMemoryDataManager.append(createRecord(0), DataType.DATA_BUFFER); - - assertThat(subpartitionMemoryDataManager.consumeBuffer(1)).isNotPresent(); - } - - @Test - void testConsumeBufferTrimHeadingReleasedBuffers() throws Exception { - TestingMemoryDataManagerOperation memoryDataManagerOperation = - TestingMemoryDataManagerOperation.builder() - .setRequestBufferFromPoolSupplier(() -> createBufferBuilder(RECORD_SIZE)) - .build(); - HsSubpartitionMemoryDataManager subpartitionMemoryDataManager = - createSubpartitionMemoryDataManager(memoryDataManagerOperation); - - subpartitionMemoryDataManager.append(createRecord(0), DataType.DATA_BUFFER); - subpartitionMemoryDataManager.append(createRecord(1), DataType.DATA_BUFFER); - subpartitionMemoryDataManager.append(createRecord(2), DataType.EVENT_BUFFER); - - List<BufferIndexAndChannel> toRelease = - HybridShuffleTestUtils.createBufferIndexAndChannelsList(0, 0, 1); - subpartitionMemoryDataManager.releaseSubpartitionBuffers(toRelease); - - assertThat(subpartitionMemoryDataManager.consumeBuffer(2)).isPresent(); - } - - @Test - void testConsumeBufferReturnSlice() throws Exception { - TestingMemoryDataManagerOperation memoryDataManagerOperation = - TestingMemoryDataManagerOperation.builder() - .setRequestBufferFromPoolSupplier(() -> createBufferBuilder(RECORD_SIZE)) - .build(); - HsSubpartitionMemoryDataManager subpartitionMemoryDataManager = - createSubpartitionMemoryDataManager(memoryDataManagerOperation); - - subpartitionMemoryDataManager.append(createRecord(0), DataType.DATA_BUFFER); - - Optional<BufferAndBacklog> bufferOpt = subpartitionMemoryDataManager.consumeBuffer(0); - assertThat(bufferOpt) - .hasValueSatisfying( - (bufferAndBacklog -> - assertThat(bufferAndBacklog.buffer()) - .isInstanceOf(ReadOnlySlicedNetworkBuffer.class))); - } - @ParameterizedTest @ValueSource(strings = {"LZ4", "LZO", "ZSTD", "NULL"}) - void testConsumeBuffer(String compressionFactoryName) throws Exception { + void testCompressBufferAndConsume(String compressionFactoryName) throws Exception { final int numDataBuffers = 10; final int numRecordsPerBuffer = 10; // write numRecordsPerBuffer long record to one buffer, as a single long is @@ -248,9 +161,11 @@ class HsSubpartitionMemoryDataManagerTest { subpartitionMemoryDataManager.append(createRecord(recordValue), DataType.EVENT_BUFFER); expectedRecords.add(Tuple2.of(recordValue, DataType.EVENT_BUFFER)); + HsSubpartitionConsumerMemoryDataManager consumer = + subpartitionMemoryDataManager.registerNewConsumer(DEFAULT); ArrayList<Optional<BufferAndBacklog>> bufferAndBacklogOpts = new ArrayList<>(); for (int i = 0; i < numDataBuffers + 1; i++) { - bufferAndBacklogOpts.add(subpartitionMemoryDataManager.consumeBuffer(i)); + bufferAndBacklogOpts.add(consumer.consumeBuffer(i)); } checkConsumedBufferAndNextDataType( numRecordsPerBuffer, bufferDecompressor, expectedRecords, bufferAndBacklogOpts); @@ -276,6 +191,8 @@ class HsSubpartitionMemoryDataManagerTest { .build(); HsSubpartitionMemoryDataManager subpartitionMemoryDataManager = createSubpartitionMemoryDataManager(memoryDataManagerOperation); + HsSubpartitionConsumerMemoryDataManager consumer = + subpartitionMemoryDataManager.registerNewConsumer(DEFAULT); final int numBuffers = 4; for (int i = 0; i < numBuffers; i++) { subpartitionMemoryDataManager.append(createRecord(i), DataType.DATA_BUFFER); @@ -288,8 +205,8 @@ class HsSubpartitionMemoryDataManagerTest { subpartitionMemoryDataManager.spillSubpartitionBuffers(toStartSpilling, spilledDoneFuture); // consume buffer 0, 1 - subpartitionMemoryDataManager.consumeBuffer(0); - subpartitionMemoryDataManager.consumeBuffer(1); + consumer.consumeBuffer(0); + consumer.consumeBuffer(1); checkBufferIndex( subpartitionMemoryDataManager.getBuffersSatisfyStatus(SpillStatus.ALL, ALL_ANY), @@ -432,6 +349,33 @@ class HsSubpartitionMemoryDataManagerTest { assertThat(metrics.getNumBytesOut().getCount()).isEqualTo(recordSize + eventSize); } + @Test + void testConsumerRegisterRepeatedly() { + TestingMemoryDataManagerOperation memoryDataManagerOperation = + TestingMemoryDataManagerOperation.builder().build(); + HsSubpartitionMemoryDataManager subpartitionMemoryDataManager = + createSubpartitionMemoryDataManager(memoryDataManagerOperation); + + HsConsumerId consumerId = HsConsumerId.newId(null); + subpartitionMemoryDataManager.registerNewConsumer(consumerId); + assertThatThrownBy(() -> subpartitionMemoryDataManager.registerNewConsumer(consumerId)) + .isInstanceOf(IllegalStateException.class); + } + + @Test + void testRegisterAndReleaseConsumer() { + TestingMemoryDataManagerOperation memoryDataManagerOperation = + TestingMemoryDataManagerOperation.builder().build(); + HsSubpartitionMemoryDataManager subpartitionMemoryDataManager = + createSubpartitionMemoryDataManager(memoryDataManagerOperation); + + HsConsumerId consumerId = HsConsumerId.newId(null); + subpartitionMemoryDataManager.registerNewConsumer(consumerId); + subpartitionMemoryDataManager.releaseConsumer(consumerId); + assertThatNoException() + .isThrownBy(() -> subpartitionMemoryDataManager.registerNewConsumer(consumerId)); + } + private static void checkBufferIndex( Deque<BufferIndexAndChannel> bufferWithIdentities, List<Integer> expectedIndexes) { List<Integer> bufferIndexes = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java index 451ab8980da..11ea5c3766b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java @@ -42,11 +42,11 @@ import static org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffle import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -/** Tests for {@link HsSubpartitionView}. */ +/** Tests for {@link HsSubpartitionConsumer}. */ class HsSubpartitionViewTest { @Test void testGetNextBufferFromDisk() { - HsSubpartitionView subpartitionView = createSubpartitionView(); + HsSubpartitionConsumer subpartitionView = createSubpartitionView(); BufferAndBacklog bufferAndBacklog = createBufferAndBacklog(1, DataType.DATA_BUFFER, 0); CompletableFuture<Void> consumeBufferFromMemoryFuture = new CompletableFuture<>(); @@ -77,7 +77,7 @@ class HsSubpartitionViewTest { final int bufferSize = 16; NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, bufferSize); BufferPool bufferPool = networkBufferPool.createBufferPool(10, 10); - HsSubpartitionView subpartitionView = createSubpartitionView(); + HsSubpartitionConsumer subpartitionView = createSubpartitionView(); CompletableFuture<Void> acquireWriteLock = new CompletableFuture<>(); @@ -102,7 +102,8 @@ class HsSubpartitionViewTest { } catch (Exception e) { throw new RuntimeException(e); } - spillingInfoProvider.getNextBufferIndexToConsume(); + spillingInfoProvider.getNextBufferIndexToConsume( + HsConsumerId.DEFAULT); return HsSpillingStrategy.Decision.NO_ACTION; }) .build(); @@ -117,7 +118,8 @@ class HsSubpartitionViewTest { null, 0); memoryDataManager.setOutputMetrics(createTestingOutputMetrics()); - HsDataView hsDataView = memoryDataManager.registerSubpartitionView(0, subpartitionView); + HsDataView hsDataView = + memoryDataManager.registerNewConsumer(0, HsConsumerId.DEFAULT, subpartitionView); subpartitionView.setMemoryDataView(hsDataView); subpartitionView.setDiskDataView(TestingHsDataView.NO_OP); @@ -128,7 +130,7 @@ class HsSubpartitionViewTest { @Test void testGetNextBufferFromDiskNextDataTypeIsNone() { - HsSubpartitionView subpartitionView = createSubpartitionView(); + HsSubpartitionConsumer subpartitionView = createSubpartitionView(); BufferAndBacklog bufferAndBacklog = createBufferAndBacklog(0, DataType.NONE, 0); TestingHsDataView diskDataView = @@ -158,7 +160,7 @@ class HsSubpartitionViewTest { @Test void testGetNextBufferFromMemory() { - HsSubpartitionView subpartitionView = createSubpartitionView(); + HsSubpartitionConsumer subpartitionView = createSubpartitionView(); BufferAndBacklog bufferAndBacklog = createBufferAndBacklog(1, DataType.DATA_BUFFER, 0); TestingHsDataView memoryDataView = @@ -179,7 +181,7 @@ class HsSubpartitionViewTest { @Test void testGetNextBufferThrowException() { - HsSubpartitionView subpartitionView = createSubpartitionView(); + HsSubpartitionConsumer subpartitionView = createSubpartitionView(); TestingHsDataView diskDataView = TestingHsDataView.builder() @@ -200,7 +202,7 @@ class HsSubpartitionViewTest { @Test void testGetNextBufferZeroBacklog() { - HsSubpartitionView subpartitionView = createSubpartitionView(); + HsSubpartitionConsumer subpartitionView = createSubpartitionView(); final int diskBacklog = 0; final int memoryBacklog = 10; @@ -236,7 +238,7 @@ class HsSubpartitionViewTest { @Test void testNotifyDataAvailableNeedNotify() { CompletableFuture<Void> notifyAvailableFuture = new CompletableFuture<>(); - HsSubpartitionView subpartitionView = + HsSubpartitionConsumer subpartitionView = createSubpartitionView(() -> notifyAvailableFuture.complete(null)); TestingHsDataView memoryDataView = @@ -256,7 +258,7 @@ class HsSubpartitionViewTest { @Test void testNotifyDataAvailableNotNeedNotify() { CompletableFuture<Void> notifyAvailableFuture = new CompletableFuture<>(); - HsSubpartitionView subpartitionView = + HsSubpartitionConsumer subpartitionView = createSubpartitionView(() -> notifyAvailableFuture.complete(null)); TestingHsDataView memoryDataView = @@ -277,7 +279,7 @@ class HsSubpartitionViewTest { @Test void testGetZeroBacklogNeedNotify() { CompletableFuture<Void> notifyAvailableFuture = new CompletableFuture<>(); - HsSubpartitionView subpartitionView = + HsSubpartitionConsumer subpartitionView = createSubpartitionView(() -> notifyAvailableFuture.complete(null)); subpartitionView.setMemoryDataView(TestingHsDataView.NO_OP); subpartitionView.setDiskDataView( @@ -294,7 +296,7 @@ class HsSubpartitionViewTest { @Test void testGetAvailabilityAndBacklogPositiveCredit() { - HsSubpartitionView subpartitionView = createSubpartitionView(); + HsSubpartitionConsumer subpartitionView = createSubpartitionView(); subpartitionView.setMemoryDataView(TestingHsDataView.NO_OP); final int backlog = 2; @@ -311,7 +313,7 @@ class HsSubpartitionViewTest { void testGetAvailabilityAndBacklogNonPositiveCreditNextIsData() { final int backlog = 2; - HsSubpartitionView subpartitionView = createSubpartitionView(); + HsSubpartitionConsumer subpartitionView = createSubpartitionView(); subpartitionView.setMemoryDataView( TestingHsDataView.builder() .setConsumeBufferFunction( @@ -336,7 +338,7 @@ class HsSubpartitionViewTest { void testGetAvailabilityAndBacklogNonPositiveCreditNextIsEvent() { final int backlog = 2; - HsSubpartitionView subpartitionView = createSubpartitionView(); + HsSubpartitionConsumer subpartitionView = createSubpartitionView(); subpartitionView.setMemoryDataView( TestingHsDataView.builder() .setConsumeBufferFunction( @@ -359,23 +361,29 @@ class HsSubpartitionViewTest { @Test void testRelease() throws Exception { - HsSubpartitionView subpartitionView = createSubpartitionView(); - CompletableFuture<Void> releaseDataViewFuture = new CompletableFuture<>(); + HsSubpartitionConsumer subpartitionView = createSubpartitionView(); + CompletableFuture<Void> releaseDiskViewFuture = new CompletableFuture<>(); + CompletableFuture<Void> releaseMemoryViewFuture = new CompletableFuture<>(); TestingHsDataView diskDataView = TestingHsDataView.builder() - .setReleaseDataViewRunnable(() -> releaseDataViewFuture.complete(null)) + .setReleaseDataViewRunnable(() -> releaseDiskViewFuture.complete(null)) + .build(); + TestingHsDataView memoryDataView = + TestingHsDataView.builder() + .setReleaseDataViewRunnable(() -> releaseMemoryViewFuture.complete(null)) .build(); subpartitionView.setDiskDataView(diskDataView); - subpartitionView.setMemoryDataView(TestingHsDataView.NO_OP); + subpartitionView.setMemoryDataView(memoryDataView); subpartitionView.releaseAllResources(); assertThat(subpartitionView.isReleased()).isTrue(); - assertThat(releaseDataViewFuture).isCompleted(); + assertThat(releaseDiskViewFuture).isCompleted(); + assertThat(releaseMemoryViewFuture).isCompleted(); } @Test void testGetConsumingOffset() { AtomicInteger nextBufferIndex = new AtomicInteger(0); - HsSubpartitionView subpartitionView = createSubpartitionView(); + HsSubpartitionConsumer subpartitionView = createSubpartitionView(); TestingHsDataView diskDataView = TestingHsDataView.builder() .setConsumeBufferFunction( @@ -398,7 +406,7 @@ class HsSubpartitionViewTest { @Test void testSetDataViewRepeatedly() { - HsSubpartitionView subpartitionView = createSubpartitionView(); + HsSubpartitionConsumer subpartitionView = createSubpartitionView(); subpartitionView.setMemoryDataView(TestingHsDataView.NO_OP); assertThatThrownBy(() -> subpartitionView.setMemoryDataView(TestingHsDataView.NO_OP)) @@ -411,13 +419,13 @@ class HsSubpartitionViewTest { .hasMessageContaining("repeatedly set disk data view is not allowed."); } - private static HsSubpartitionView createSubpartitionView() { - return new HsSubpartitionView(new NoOpBufferAvailablityListener()); + private static HsSubpartitionConsumer createSubpartitionView() { + return new HsSubpartitionConsumer(new NoOpBufferAvailablityListener()); } - private static HsSubpartitionView createSubpartitionView( + private static HsSubpartitionConsumer createSubpartitionView( BufferAvailabilityListener bufferAvailabilityListener) { - return new HsSubpartitionView(bufferAvailabilityListener); + return new HsSubpartitionConsumer(bufferAvailabilityListener); } private static BufferAndBacklog createBufferAndBacklog( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingMemoryDataManagerOperation.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingMemoryDataManagerOperation.java index 2b16d5d4d4f..5ac37c2a1d9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingMemoryDataManagerOperation.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingMemoryDataManagerOperation.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.partition.hybrid; import org.apache.flink.runtime.io.network.buffer.BufferBuilder; import org.apache.flink.util.function.SupplierWithException; +import java.util.Collection; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -37,18 +38,22 @@ public class TestingMemoryDataManagerOperation implements HsMemoryDataManagerOpe private final Runnable onDataAvailableRunnable; + private final BiConsumer<Integer, HsConsumerId> onConsumerReleasedBiConsumer; + private TestingMemoryDataManagerOperation( SupplierWithException<BufferBuilder, InterruptedException> requestBufferFromPoolSupplier, BiConsumer<Integer, Integer> markBufferReadableConsumer, Consumer<BufferIndexAndChannel> onBufferConsumedConsumer, Runnable onBufferFinishedRunnable, - Runnable onDataAvailableRunnable) { + Runnable onDataAvailableRunnable, + BiConsumer<Integer, HsConsumerId> onConsumerReleasedBiConsumer) { this.requestBufferFromPoolSupplier = requestBufferFromPoolSupplier; this.markBufferReadableConsumer = markBufferReadableConsumer; this.onBufferConsumedConsumer = onBufferConsumedConsumer; this.onBufferFinishedRunnable = onBufferFinishedRunnable; this.onDataAvailableRunnable = onDataAvailableRunnable; + this.onConsumerReleasedBiConsumer = onConsumerReleasedBiConsumer; } @Override @@ -72,10 +77,15 @@ public class TestingMemoryDataManagerOperation implements HsMemoryDataManagerOpe } @Override - public void onDataAvailable(int subpartitionId) { + public void onDataAvailable(int subpartitionId, Collection<HsConsumerId> consumerIds) { onDataAvailableRunnable.run(); } + @Override + public void onConsumerReleased(int subpartitionId, HsConsumerId consumerId) { + onConsumerReleasedBiConsumer.accept(subpartitionId, consumerId); + } + public static Builder builder() { return new Builder(); } @@ -93,6 +103,9 @@ public class TestingMemoryDataManagerOperation implements HsMemoryDataManagerOpe private Runnable onDataAvailableRunnable = () -> {}; + private BiConsumer<Integer, HsConsumerId> onConsumerReleasedBiConsumer = + (ignore1, ignore2) -> {}; + public Builder setRequestBufferFromPoolSupplier( SupplierWithException<BufferBuilder, InterruptedException> requestBufferFromPoolSupplier) { @@ -122,6 +135,12 @@ public class TestingMemoryDataManagerOperation implements HsMemoryDataManagerOpe return this; } + public Builder setOnConsumerReleasedBiConsumer( + BiConsumer<Integer, HsConsumerId> onConsumerReleasedBiConsumer) { + this.onConsumerReleasedBiConsumer = onConsumerReleasedBiConsumer; + return this; + } + private Builder() {} public TestingMemoryDataManagerOperation build() { @@ -130,7 +149,8 @@ public class TestingMemoryDataManagerOperation implements HsMemoryDataManagerOpe markBufferReadableConsumer, onBufferConsumedConsumer, onBufferFinishedRunnable, - onDataAvailableRunnable); + onDataAvailableRunnable, + onConsumerReleasedBiConsumer); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSpillingInfoProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSpillingInfoProvider.java index 67b34d0e774..9980f4176fa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSpillingInfoProvider.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSpillingInfoProvider.java @@ -73,7 +73,7 @@ public class TestingSpillingInfoProvider implements HsSpillingInfoProvider { } @Override - public List<Integer> getNextBufferIndexToConsume() { + public List<Integer> getNextBufferIndexToConsume(HsConsumerId consumerId) { return getNextBufferIndexToConsumeSupplier.get(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSubpartitionViewInternalOperation.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSubpartitionConsumerInternalOperation.java similarity index 88% rename from flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSubpartitionViewInternalOperation.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSubpartitionConsumerInternalOperation.java index ff2f8250b2a..1f32189da53 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSubpartitionViewInternalOperation.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSubpartitionConsumerInternalOperation.java @@ -18,9 +18,9 @@ package org.apache.flink.runtime.io.network.partition.hybrid; -/** Mock {@link HsSubpartitionViewInternalOperations} for test. */ -public class TestingSubpartitionViewInternalOperation - implements HsSubpartitionViewInternalOperations { +/** Mock {@link HsSubpartitionConsumerInternalOperations} for test. */ +public class TestingSubpartitionConsumerInternalOperation + implements HsSubpartitionConsumerInternalOperations { // -1 indicates downstream just start consuming offset. private int consumingOffset = -1;
