This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 80a924309ce910715c4079c7e52e9d560318bd38 Author: Yuxin Tan <[email protected]> AuthorDate: Mon May 8 13:54:07 2023 +0800 [FLINK-31635][network] Introduce tiered result partition This closes #22330 --- .../shuffle/TieredInternalShuffleMaster.java | 97 +++++++++ .../tiered/shuffle/TieredResultPartition.java | 225 +++++++++++++++++++++ .../flink/runtime/shuffle/NettyShuffleMaster.java | 15 +- .../hybrid/tiered/TestingBufferAccumulator.java | 45 +++++ .../tiered/shuffle/TieredResultPartitionTest.java | 216 ++++++++++++++++++++ 5 files changed, 597 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredInternalShuffleMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredInternalShuffleMaster.java new file mode 100644 index 00000000000..5f8baa3ac36 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredInternalShuffleMaster.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageConfiguration; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMasterClient; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils.convertId; + +/** + * A wrapper internal shuffle master class for tiered storage. All the tiered storage operations + * with the shuffle master should be wrapped in this class. + */ +public class TieredInternalShuffleMaster { + + private final TieredStorageMasterClient tieredStorageMasterClient; + + private final Map<JobID, List<ResultPartitionID>> jobPartitionIds; + + private final Map<ResultPartitionID, JobID> partitionJobIds; + + public TieredInternalShuffleMaster(Configuration conf) { + TieredStorageConfiguration tieredStorageConfiguration = + TieredStorageConfiguration.fromConfiguration(conf); + TieredStorageResourceRegistry resourceRegistry = new TieredStorageResourceRegistry(); + List<TierMasterAgent> tierFactories = + tieredStorageConfiguration.getTierFactories().stream() + .map(tierFactory -> tierFactory.createMasterAgent(resourceRegistry)) + .collect(Collectors.toList()); + this.tieredStorageMasterClient = new TieredStorageMasterClient(tierFactories); + this.jobPartitionIds = new HashMap<>(); + this.partitionJobIds = new HashMap<>(); + } + + public void addPartition(JobID jobID, ResultPartitionID resultPartitionID) { + jobPartitionIds.computeIfAbsent(jobID, ignore -> new ArrayList<>()).add(resultPartitionID); + partitionJobIds.put(resultPartitionID, jobID); + tieredStorageMasterClient.addPartition(convertId(resultPartitionID)); + } + + public void releasePartition(ResultPartitionID resultPartitionID) { + tieredStorageMasterClient.releasePartition(convertId(resultPartitionID)); + JobID jobID = partitionJobIds.remove(resultPartitionID); + if (jobID == null) { + return; + } + + List<ResultPartitionID> resultPartitionIDs = jobPartitionIds.get(jobID); + if (resultPartitionIDs == null) { + return; + } + + resultPartitionIDs.remove(resultPartitionID); + // If the result partition id list has been empty, remove the jobID from the map eagerly + if (resultPartitionIDs.isEmpty()) { + jobPartitionIds.remove(jobID); + } + } + + public void unregisterJob(JobID jobID) { + List<ResultPartitionID> resultPartitionIDs = jobPartitionIds.remove(jobID); + if (resultPartitionIDs != null) { + resultPartitionIDs.forEach( + resultPartitionID -> { + tieredStorageMasterClient.releasePartition(convertId(resultPartitionID)); + partitionJobIds.remove(resultPartitionID); + }); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartition.java new file mode 100644 index 00000000000..cd24ac89577 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartition.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle; + +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.event.AbstractEvent; +import org.apache.flink.runtime.io.network.api.EndOfData; +import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; +import org.apache.flink.runtime.io.network.api.StopMode; +import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferCompressor; +import org.apache.flink.runtime.io.network.buffer.BufferPool; +import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; +import org.apache.flink.runtime.io.network.partition.ResultPartition; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageProducerClient; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry; +import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; +import org.apache.flink.util.concurrent.FutureUtils; +import org.apache.flink.util.function.SupplierWithException; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * {@link TieredResultPartition} appends records and events to the tiered storage, which supports + * the upstream dynamically switches storage tier for writing shuffle data, and the downstream will + * read data from the relevant tier. + */ +public class TieredResultPartition extends ResultPartition { + + private final TieredStoragePartitionId partitionId; + + private final TieredStorageProducerClient tieredStorageProducerClient; + + private final TieredStorageResourceRegistry tieredStorageResourceRegistry; + + private boolean hasNotifiedEndOfUserRecords; + + public TieredResultPartition( + String owningTaskName, + int partitionIndex, + ResultPartitionID partitionId, + ResultPartitionType partitionType, + int numSubpartitions, + int numTargetKeyGroups, + ResultPartitionManager partitionManager, + @Nullable BufferCompressor bufferCompressor, + SupplierWithException<BufferPool, IOException> bufferPoolFactory, + TieredStorageProducerClient tieredStorageProducerClient, + TieredStorageResourceRegistry tieredStorageResourceRegistry) { + super( + owningTaskName, + partitionIndex, + partitionId, + partitionType, + numSubpartitions, + numTargetKeyGroups, + partitionManager, + bufferCompressor, + bufferPoolFactory); + + this.partitionId = TieredStorageIdMappingUtils.convertId(partitionId); + this.tieredStorageProducerClient = tieredStorageProducerClient; + this.tieredStorageResourceRegistry = tieredStorageResourceRegistry; + } + + @Override + protected void setupInternal() throws IOException { + if (isReleased()) { + throw new IOException("Result partition has been released."); + } + } + + @Override + public void setMetricGroup(TaskIOMetricGroup metrics) { + super.setMetricGroup(metrics); + } + + @Override + public void emitRecord(ByteBuffer record, int consumerId) throws IOException { + resultPartitionBytes.inc(consumerId, record.remaining()); + emit(record, consumerId, Buffer.DataType.DATA_BUFFER, false); + } + + @Override + public void broadcastRecord(ByteBuffer record) throws IOException { + resultPartitionBytes.incAll(record.remaining()); + broadcast(record, Buffer.DataType.DATA_BUFFER); + } + + @Override + public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException { + Buffer buffer = EventSerializer.toBuffer(event, isPriorityEvent); + try { + ByteBuffer serializedEvent = buffer.getNioBufferReadable(); + broadcast(serializedEvent, buffer.getDataType()); + } finally { + buffer.recycleBuffer(); + } + } + + private void broadcast(ByteBuffer record, Buffer.DataType dataType) throws IOException { + checkInProduceState(); + emit(record, 0, dataType, true); + } + + private void emit( + ByteBuffer record, int consumerId, Buffer.DataType dataType, boolean isBroadcast) + throws IOException { + tieredStorageProducerClient.write( + record, TieredStorageIdMappingUtils.convertId(consumerId), dataType, isBroadcast); + } + + @Override + public ResultSubpartitionView createSubpartitionView( + int subpartitionId, BufferAvailabilityListener availabilityListener) + throws IOException { + checkState(!isReleased(), "ResultPartition already released."); + // TODO, create subpartition views + return null; + } + + @Override + public void finish() throws IOException { + broadcastEvent(EndOfPartitionEvent.INSTANCE, false); + checkState(!isReleased(), "Result partition is already released."); + super.finish(); + } + + @Override + public void close() { + super.close(); + tieredStorageProducerClient.close(); + } + + @Override + protected void releaseInternal() { + tieredStorageResourceRegistry.clearResourceFor(partitionId); + } + + @Override + public void notifyEndOfData(StopMode mode) throws IOException { + if (!hasNotifiedEndOfUserRecords) { + broadcastEvent(new EndOfData(mode), false); + hasNotifiedEndOfUserRecords = true; + } + } + + @Override + public void alignedBarrierTimeout(long checkpointId) throws IOException { + // Nothing to do. + } + + @Override + public void abortCheckpoint(long checkpointId, CheckpointException cause) { + // Nothing to do. + } + + @Override + public void flushAll() { + // Nothing to do. + } + + @Override + public void flush(int subpartitionIndex) { + // Nothing to do. + } + + @Override + public CompletableFuture<Void> getAllDataProcessedFuture() { + // Nothing to do. + return FutureUtils.completedVoidFuture(); + } + + @Override + public void onSubpartitionAllDataProcessed(int subpartition) { + // Nothing to do. + } + + @Override + public int getNumberOfQueuedBuffers() { + // Nothing to do. + return 0; + } + + @Override + public long getSizeOfQueuedBuffersUnsafe() { + // Nothing to do. + return 0; + } + + @Override + public int getNumberOfQueuedBuffers(int targetSubpartition) { + // Nothing to do. + return 0; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java index 7beb385d395..75441b7331f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.TieredInternalShuffleMaster; import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor.LocalExecutionPartitionConnectionInfo; import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor.NetworkPartitionConnectionInfo; import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor.PartitionConnectionInfo; @@ -49,6 +50,8 @@ public class NettyShuffleMaster implements ShuffleMaster<NettyShuffleDescriptor> private final int networkBufferSize; + private final TieredInternalShuffleMaster tieredInternalShuffleMaster; + public NettyShuffleMaster(Configuration conf) { checkNotNull(conf); buffersPerInputChannel = @@ -64,6 +67,7 @@ public class NettyShuffleMaster implements ShuffleMaster<NettyShuffleDescriptor> sortShuffleMinBuffers = conf.getInteger(NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS); networkBufferSize = ConfigurationParserUtils.getPageSize(conf); + tieredInternalShuffleMaster = new TieredInternalShuffleMaster(conf); checkArgument( !maxRequiredBuffersPerGate.isPresent() || maxRequiredBuffersPerGate.get() >= 1, @@ -96,11 +100,20 @@ public class NettyShuffleMaster implements ShuffleMaster<NettyShuffleDescriptor> producerDescriptor, partitionDescriptor.getConnectionIndex()), resultPartitionID); + tieredInternalShuffleMaster.addPartition(jobID, resultPartitionID); + return CompletableFuture.completedFuture(shuffleDeploymentDescriptor); } @Override - public void releasePartitionExternally(ShuffleDescriptor shuffleDescriptor) {} + public void releasePartitionExternally(ShuffleDescriptor shuffleDescriptor) { + tieredInternalShuffleMaster.releasePartition(shuffleDescriptor.getResultPartitionID()); + } + + @Override + public void unregisterJob(JobID jobID) { + tieredInternalShuffleMaster.unregisterJob(jobID); + } private static PartitionConnectionInfo createConnectionInfo( ProducerDescriptor producerDescriptor, int connectionIndex) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingBufferAccumulator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingBufferAccumulator.java new file mode 100644 index 00000000000..9430f5f3b2e --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingBufferAccumulator.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.BufferAccumulator; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.function.BiConsumer; + +/** Test implementation for {@link BufferAccumulator}. */ +public class TestingBufferAccumulator implements BufferAccumulator { + + @Override + public void setup( + int numSubpartitions, + BiConsumer<TieredStorageSubpartitionId, List<Buffer>> bufferFlusher) {} + + @Override + public void receive( + ByteBuffer record, TieredStorageSubpartitionId subpartitionId, Buffer.DataType dataType) + throws IOException {} + + @Override + public void close() {} +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartitionTest.java new file mode 100644 index 00000000000..7e69f015366 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartitionTest.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle; + +import org.apache.flink.runtime.executiongraph.IOMetrics; +import org.apache.flink.runtime.executiongraph.ResultPartitionBytes; +import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool; +import org.apache.flink.runtime.io.disk.FileChannelManager; +import org.apache.flink.runtime.io.disk.FileChannelManagerImpl; +import org.apache.flink.runtime.io.network.buffer.BufferCompressor; +import org.apache.flink.runtime.io.network.buffer.BufferPool; +import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.TestingBufferAccumulator; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageProducerClient; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry; +import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link TieredResultPartition}. */ +class TieredResultPartitionTest { + + private static final int NUM_THREADS = 4; + + private static final int NETWORK_BUFFER_SIZE = 1024; + + private static final int NUM_TOTAL_BUFFERS = 1000; + + private static final int NUM_TOTAL_BYTES_IN_READ_POOL = 32 * 1024 * 1024; + + private FileChannelManager fileChannelManager; + + private NetworkBufferPool globalPool; + + private BatchShuffleReadBufferPool readBufferPool; + + private ScheduledExecutorService readIOExecutor; + + private TaskIOMetricGroup taskIOMetricGroup; + + @TempDir public java.nio.file.Path tempDataPath; + + @BeforeEach + void before() { + fileChannelManager = + new FileChannelManagerImpl(new String[] {tempDataPath.toString()}, "testing"); + globalPool = new NetworkBufferPool(NUM_TOTAL_BUFFERS, NETWORK_BUFFER_SIZE); + readBufferPool = + new BatchShuffleReadBufferPool(NUM_TOTAL_BYTES_IN_READ_POOL, NETWORK_BUFFER_SIZE); + readIOExecutor = + new ScheduledThreadPoolExecutor( + NUM_THREADS, + new ExecutorThreadFactory("test-io-scheduler-thread"), + (ignored, executor) -> { + if (executor.isShutdown()) { + // ignore rejected as shutdown. + } else { + throw new RejectedExecutionException(); + } + }); + } + + @AfterEach + void after() throws Exception { + fileChannelManager.close(); + globalPool.destroy(); + readBufferPool.destroy(); + readIOExecutor.shutdown(); + } + + @Test + void testClose() throws Exception { + final int numBuffers = 1; + + BufferPool bufferPool = globalPool.createBufferPool(numBuffers, numBuffers); + TieredResultPartition partition = createTieredStoreResultPartition(1, bufferPool, false); + + partition.close(); + assertThat(bufferPool.isDestroyed()).isTrue(); + } + + @Test + @Timeout(30) + void testRelease() throws Exception { + final int numSubpartitions = 2; + final int numBuffers = 10; + + BufferPool bufferPool = globalPool.createBufferPool(numBuffers, numBuffers); + TieredResultPartition partition = + createTieredStoreResultPartition(numSubpartitions, bufferPool, false); + + partition.emitRecord(ByteBuffer.allocate(NETWORK_BUFFER_SIZE * 5), 1); + partition.close(); + assertThat(bufferPool.isDestroyed()).isTrue(); + + partition.release(); + + while (checkNotNull(fileChannelManager.getPaths()[0].listFiles()).length != 0) { + Thread.sleep(10); + } + + assertThat(NUM_TOTAL_BUFFERS).isEqualTo(globalPool.getNumberOfAvailableMemorySegments()); + } + + @Test + void testCreateSubpartitionViewAfterRelease() throws Exception { + final int numBuffers = 10; + BufferPool bufferPool = globalPool.createBufferPool(numBuffers, numBuffers); + TieredResultPartition resultPartition = + createTieredStoreResultPartition(2, bufferPool, false); + resultPartition.release(); + assertThatThrownBy( + () -> + resultPartition.createSubpartitionView( + 0, new NoOpBufferAvailablityListener())) + .isInstanceOf(IllegalStateException.class); + } + + @Test + void testEmitRecords() throws Exception { + BufferPool bufferPool = globalPool.createBufferPool(3, 3); + int bufferSize = NETWORK_BUFFER_SIZE; + try (TieredResultPartition partition = + createTieredStoreResultPartition(2, bufferPool, false)) { + partition.emitRecord(ByteBuffer.allocate(bufferSize), 0); + partition.broadcastRecord(ByteBuffer.allocate(bufferSize)); + IOMetrics ioMetrics = taskIOMetricGroup.createSnapshot(); + assertThat(ioMetrics.getResultPartitionBytes()).hasSize(1); + ResultPartitionBytes partitionBytes = + ioMetrics.getResultPartitionBytes().values().iterator().next(); + assertThat(partitionBytes.getSubpartitionBytes()) + .containsExactly((long) 2 * bufferSize, bufferSize); + } + } + + @Test + void testMetricsUpdateForBroadcastOnlyResultPartition() throws Exception { + BufferPool bufferPool = globalPool.createBufferPool(3, 3); + int bufferSize = NETWORK_BUFFER_SIZE; + try (TieredResultPartition partition = + createTieredStoreResultPartition(2, bufferPool, true)) { + partition.broadcastRecord(ByteBuffer.allocate(bufferSize)); + IOMetrics ioMetrics = taskIOMetricGroup.createSnapshot(); + assertThat(ioMetrics.getResultPartitionBytes()).hasSize(1); + ResultPartitionBytes partitionBytes = + ioMetrics.getResultPartitionBytes().values().iterator().next(); + assertThat(partitionBytes.getSubpartitionBytes()) + .containsExactly(bufferSize, bufferSize); + } + } + + private TieredResultPartition createTieredStoreResultPartition( + int numSubpartitions, BufferPool bufferPool, boolean isBroadcastOnly) + throws IOException { + TieredResultPartition tieredResultPartition = + new TieredResultPartition( + "TieredStoreResultPartitionTest", + 0, + new ResultPartitionID(), + ResultPartitionType.HYBRID_SELECTIVE, + numSubpartitions, + numSubpartitions, + new ResultPartitionManager(), + new BufferCompressor(NETWORK_BUFFER_SIZE, "LZ4"), + () -> bufferPool, + new TieredStorageProducerClient( + numSubpartitions, + isBroadcastOnly, + new TestingBufferAccumulator(), + null, + new ArrayList<>()), + new TieredStorageResourceRegistry()); + taskIOMetricGroup = + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(); + tieredResultPartition.setup(); + tieredResultPartition.setMetricGroup(taskIOMetricGroup); + return tieredResultPartition; + } +}
