This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 80a924309ce910715c4079c7e52e9d560318bd38
Author: Yuxin Tan <[email protected]>
AuthorDate: Mon May 8 13:54:07 2023 +0800

    [FLINK-31635][network] Introduce tiered result partition
    
    This closes #22330
---
 .../shuffle/TieredInternalShuffleMaster.java       |  97 +++++++++
 .../tiered/shuffle/TieredResultPartition.java      | 225 +++++++++++++++++++++
 .../flink/runtime/shuffle/NettyShuffleMaster.java  |  15 +-
 .../hybrid/tiered/TestingBufferAccumulator.java    |  45 +++++
 .../tiered/shuffle/TieredResultPartitionTest.java  | 216 ++++++++++++++++++++
 5 files changed, 597 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredInternalShuffleMaster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredInternalShuffleMaster.java
new file mode 100644
index 00000000000..5f8baa3ac36
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredInternalShuffleMaster.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageConfiguration;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMasterClient;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils.convertId;
+
+/**
+ * A wrapper internal shuffle master class for tiered storage. All the tiered 
storage operations
+ * with the shuffle master should be wrapped in this class.
+ */
+public class TieredInternalShuffleMaster {
+
+    private final TieredStorageMasterClient tieredStorageMasterClient;
+
+    private final Map<JobID, List<ResultPartitionID>> jobPartitionIds;
+
+    private final Map<ResultPartitionID, JobID> partitionJobIds;
+
+    public TieredInternalShuffleMaster(Configuration conf) {
+        TieredStorageConfiguration tieredStorageConfiguration =
+                TieredStorageConfiguration.fromConfiguration(conf);
+        TieredStorageResourceRegistry resourceRegistry = new 
TieredStorageResourceRegistry();
+        List<TierMasterAgent> tierFactories =
+                tieredStorageConfiguration.getTierFactories().stream()
+                        .map(tierFactory -> 
tierFactory.createMasterAgent(resourceRegistry))
+                        .collect(Collectors.toList());
+        this.tieredStorageMasterClient = new 
TieredStorageMasterClient(tierFactories);
+        this.jobPartitionIds = new HashMap<>();
+        this.partitionJobIds = new HashMap<>();
+    }
+
+    public void addPartition(JobID jobID, ResultPartitionID resultPartitionID) 
{
+        jobPartitionIds.computeIfAbsent(jobID, ignore -> new 
ArrayList<>()).add(resultPartitionID);
+        partitionJobIds.put(resultPartitionID, jobID);
+        tieredStorageMasterClient.addPartition(convertId(resultPartitionID));
+    }
+
+    public void releasePartition(ResultPartitionID resultPartitionID) {
+        
tieredStorageMasterClient.releasePartition(convertId(resultPartitionID));
+        JobID jobID = partitionJobIds.remove(resultPartitionID);
+        if (jobID == null) {
+            return;
+        }
+
+        List<ResultPartitionID> resultPartitionIDs = 
jobPartitionIds.get(jobID);
+        if (resultPartitionIDs == null) {
+            return;
+        }
+
+        resultPartitionIDs.remove(resultPartitionID);
+        // If the result partition id list has been empty, remove the jobID 
from the map eagerly
+        if (resultPartitionIDs.isEmpty()) {
+            jobPartitionIds.remove(jobID);
+        }
+    }
+
+    public void unregisterJob(JobID jobID) {
+        List<ResultPartitionID> resultPartitionIDs = 
jobPartitionIds.remove(jobID);
+        if (resultPartitionIDs != null) {
+            resultPartitionIDs.forEach(
+                    resultPartitionID -> {
+                        
tieredStorageMasterClient.releasePartition(convertId(resultPartitionID));
+                        partitionJobIds.remove(resultPartitionID);
+                    });
+        }
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartition.java
new file mode 100644
index 00000000000..cd24ac89577
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartition.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle;
+
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.EndOfData;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.StopMode;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import 
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageProducerClient;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
+import org.apache.flink.util.concurrent.FutureUtils;
+import org.apache.flink.util.function.SupplierWithException;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * {@link TieredResultPartition} appends records and events to the tiered 
storage, which supports
+ * the upstream dynamically switches storage tier for writing shuffle data, 
and the downstream will
+ * read data from the relevant tier.
+ */
+public class TieredResultPartition extends ResultPartition {
+
+    private final TieredStoragePartitionId partitionId;
+
+    private final TieredStorageProducerClient tieredStorageProducerClient;
+
+    private final TieredStorageResourceRegistry tieredStorageResourceRegistry;
+
+    private boolean hasNotifiedEndOfUserRecords;
+
+    public TieredResultPartition(
+            String owningTaskName,
+            int partitionIndex,
+            ResultPartitionID partitionId,
+            ResultPartitionType partitionType,
+            int numSubpartitions,
+            int numTargetKeyGroups,
+            ResultPartitionManager partitionManager,
+            @Nullable BufferCompressor bufferCompressor,
+            SupplierWithException<BufferPool, IOException> bufferPoolFactory,
+            TieredStorageProducerClient tieredStorageProducerClient,
+            TieredStorageResourceRegistry tieredStorageResourceRegistry) {
+        super(
+                owningTaskName,
+                partitionIndex,
+                partitionId,
+                partitionType,
+                numSubpartitions,
+                numTargetKeyGroups,
+                partitionManager,
+                bufferCompressor,
+                bufferPoolFactory);
+
+        this.partitionId = TieredStorageIdMappingUtils.convertId(partitionId);
+        this.tieredStorageProducerClient = tieredStorageProducerClient;
+        this.tieredStorageResourceRegistry = tieredStorageResourceRegistry;
+    }
+
+    @Override
+    protected void setupInternal() throws IOException {
+        if (isReleased()) {
+            throw new IOException("Result partition has been released.");
+        }
+    }
+
+    @Override
+    public void setMetricGroup(TaskIOMetricGroup metrics) {
+        super.setMetricGroup(metrics);
+    }
+
+    @Override
+    public void emitRecord(ByteBuffer record, int consumerId) throws 
IOException {
+        resultPartitionBytes.inc(consumerId, record.remaining());
+        emit(record, consumerId, Buffer.DataType.DATA_BUFFER, false);
+    }
+
+    @Override
+    public void broadcastRecord(ByteBuffer record) throws IOException {
+        resultPartitionBytes.incAll(record.remaining());
+        broadcast(record, Buffer.DataType.DATA_BUFFER);
+    }
+
+    @Override
+    public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) 
throws IOException {
+        Buffer buffer = EventSerializer.toBuffer(event, isPriorityEvent);
+        try {
+            ByteBuffer serializedEvent = buffer.getNioBufferReadable();
+            broadcast(serializedEvent, buffer.getDataType());
+        } finally {
+            buffer.recycleBuffer();
+        }
+    }
+
+    private void broadcast(ByteBuffer record, Buffer.DataType dataType) throws 
IOException {
+        checkInProduceState();
+        emit(record, 0, dataType, true);
+    }
+
+    private void emit(
+            ByteBuffer record, int consumerId, Buffer.DataType dataType, 
boolean isBroadcast)
+            throws IOException {
+        tieredStorageProducerClient.write(
+                record, TieredStorageIdMappingUtils.convertId(consumerId), 
dataType, isBroadcast);
+    }
+
+    @Override
+    public ResultSubpartitionView createSubpartitionView(
+            int subpartitionId, BufferAvailabilityListener 
availabilityListener)
+            throws IOException {
+        checkState(!isReleased(), "ResultPartition already released.");
+        // TODO, create subpartition views
+        return null;
+    }
+
+    @Override
+    public void finish() throws IOException {
+        broadcastEvent(EndOfPartitionEvent.INSTANCE, false);
+        checkState(!isReleased(), "Result partition is already released.");
+        super.finish();
+    }
+
+    @Override
+    public void close() {
+        super.close();
+        tieredStorageProducerClient.close();
+    }
+
+    @Override
+    protected void releaseInternal() {
+        tieredStorageResourceRegistry.clearResourceFor(partitionId);
+    }
+
+    @Override
+    public void notifyEndOfData(StopMode mode) throws IOException {
+        if (!hasNotifiedEndOfUserRecords) {
+            broadcastEvent(new EndOfData(mode), false);
+            hasNotifiedEndOfUserRecords = true;
+        }
+    }
+
+    @Override
+    public void alignedBarrierTimeout(long checkpointId) throws IOException {
+        // Nothing to do.
+    }
+
+    @Override
+    public void abortCheckpoint(long checkpointId, CheckpointException cause) {
+        // Nothing to do.
+    }
+
+    @Override
+    public void flushAll() {
+        // Nothing to do.
+    }
+
+    @Override
+    public void flush(int subpartitionIndex) {
+        // Nothing to do.
+    }
+
+    @Override
+    public CompletableFuture<Void> getAllDataProcessedFuture() {
+        // Nothing to do.
+        return FutureUtils.completedVoidFuture();
+    }
+
+    @Override
+    public void onSubpartitionAllDataProcessed(int subpartition) {
+        // Nothing to do.
+    }
+
+    @Override
+    public int getNumberOfQueuedBuffers() {
+        // Nothing to do.
+        return 0;
+    }
+
+    @Override
+    public long getSizeOfQueuedBuffersUnsafe() {
+        // Nothing to do.
+        return 0;
+    }
+
+    @Override
+    public int getNumberOfQueuedBuffers(int targetSubpartition) {
+        // Nothing to do.
+        return 0;
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
index 7beb385d395..75441b7331f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.TieredInternalShuffleMaster;
 import 
org.apache.flink.runtime.shuffle.NettyShuffleDescriptor.LocalExecutionPartitionConnectionInfo;
 import 
org.apache.flink.runtime.shuffle.NettyShuffleDescriptor.NetworkPartitionConnectionInfo;
 import 
org.apache.flink.runtime.shuffle.NettyShuffleDescriptor.PartitionConnectionInfo;
@@ -49,6 +50,8 @@ public class NettyShuffleMaster implements 
ShuffleMaster<NettyShuffleDescriptor>
 
     private final int networkBufferSize;
 
+    private final TieredInternalShuffleMaster tieredInternalShuffleMaster;
+
     public NettyShuffleMaster(Configuration conf) {
         checkNotNull(conf);
         buffersPerInputChannel =
@@ -64,6 +67,7 @@ public class NettyShuffleMaster implements 
ShuffleMaster<NettyShuffleDescriptor>
         sortShuffleMinBuffers =
                 
conf.getInteger(NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS);
         networkBufferSize = ConfigurationParserUtils.getPageSize(conf);
+        tieredInternalShuffleMaster = new TieredInternalShuffleMaster(conf);
 
         checkArgument(
                 !maxRequiredBuffersPerGate.isPresent() || 
maxRequiredBuffersPerGate.get() >= 1,
@@ -96,11 +100,20 @@ public class NettyShuffleMaster implements 
ShuffleMaster<NettyShuffleDescriptor>
                                 producerDescriptor, 
partitionDescriptor.getConnectionIndex()),
                         resultPartitionID);
 
+        tieredInternalShuffleMaster.addPartition(jobID, resultPartitionID);
+
         return CompletableFuture.completedFuture(shuffleDeploymentDescriptor);
     }
 
     @Override
-    public void releasePartitionExternally(ShuffleDescriptor 
shuffleDescriptor) {}
+    public void releasePartitionExternally(ShuffleDescriptor 
shuffleDescriptor) {
+        
tieredInternalShuffleMaster.releasePartition(shuffleDescriptor.getResultPartitionID());
+    }
+
+    @Override
+    public void unregisterJob(JobID jobID) {
+        tieredInternalShuffleMaster.unregisterJob(jobID);
+    }
 
     private static PartitionConnectionInfo createConnectionInfo(
             ProducerDescriptor producerDescriptor, int connectionIndex) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingBufferAccumulator.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingBufferAccumulator.java
new file mode 100644
index 00000000000..9430f5f3b2e
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingBufferAccumulator.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.BufferAccumulator;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.function.BiConsumer;
+
+/** Test implementation for {@link BufferAccumulator}. */
+public class TestingBufferAccumulator implements BufferAccumulator {
+
+    @Override
+    public void setup(
+            int numSubpartitions,
+            BiConsumer<TieredStorageSubpartitionId, List<Buffer>> 
bufferFlusher) {}
+
+    @Override
+    public void receive(
+            ByteBuffer record, TieredStorageSubpartitionId subpartitionId, 
Buffer.DataType dataType)
+            throws IOException {}
+
+    @Override
+    public void close() {}
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartitionTest.java
new file mode 100644
index 00000000000..7e69f015366
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartitionTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle;
+
+import org.apache.flink.runtime.executiongraph.IOMetrics;
+import org.apache.flink.runtime.executiongraph.ResultPartitionBytes;
+import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
+import org.apache.flink.runtime.io.disk.FileChannelManager;
+import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import 
org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.TestingBufferAccumulator;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageProducerClient;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link TieredResultPartition}. */
+class TieredResultPartitionTest {
+
+    private static final int NUM_THREADS = 4;
+
+    private static final int NETWORK_BUFFER_SIZE = 1024;
+
+    private static final int NUM_TOTAL_BUFFERS = 1000;
+
+    private static final int NUM_TOTAL_BYTES_IN_READ_POOL = 32 * 1024 * 1024;
+
+    private FileChannelManager fileChannelManager;
+
+    private NetworkBufferPool globalPool;
+
+    private BatchShuffleReadBufferPool readBufferPool;
+
+    private ScheduledExecutorService readIOExecutor;
+
+    private TaskIOMetricGroup taskIOMetricGroup;
+
+    @TempDir public java.nio.file.Path tempDataPath;
+
+    @BeforeEach
+    void before() {
+        fileChannelManager =
+                new FileChannelManagerImpl(new String[] 
{tempDataPath.toString()}, "testing");
+        globalPool = new NetworkBufferPool(NUM_TOTAL_BUFFERS, 
NETWORK_BUFFER_SIZE);
+        readBufferPool =
+                new BatchShuffleReadBufferPool(NUM_TOTAL_BYTES_IN_READ_POOL, 
NETWORK_BUFFER_SIZE);
+        readIOExecutor =
+                new ScheduledThreadPoolExecutor(
+                        NUM_THREADS,
+                        new ExecutorThreadFactory("test-io-scheduler-thread"),
+                        (ignored, executor) -> {
+                            if (executor.isShutdown()) {
+                                // ignore rejected as shutdown.
+                            } else {
+                                throw new RejectedExecutionException();
+                            }
+                        });
+    }
+
+    @AfterEach
+    void after() throws Exception {
+        fileChannelManager.close();
+        globalPool.destroy();
+        readBufferPool.destroy();
+        readIOExecutor.shutdown();
+    }
+
+    @Test
+    void testClose() throws Exception {
+        final int numBuffers = 1;
+
+        BufferPool bufferPool = globalPool.createBufferPool(numBuffers, 
numBuffers);
+        TieredResultPartition partition = createTieredStoreResultPartition(1, 
bufferPool, false);
+
+        partition.close();
+        assertThat(bufferPool.isDestroyed()).isTrue();
+    }
+
+    @Test
+    @Timeout(30)
+    void testRelease() throws Exception {
+        final int numSubpartitions = 2;
+        final int numBuffers = 10;
+
+        BufferPool bufferPool = globalPool.createBufferPool(numBuffers, 
numBuffers);
+        TieredResultPartition partition =
+                createTieredStoreResultPartition(numSubpartitions, bufferPool, 
false);
+
+        partition.emitRecord(ByteBuffer.allocate(NETWORK_BUFFER_SIZE * 5), 1);
+        partition.close();
+        assertThat(bufferPool.isDestroyed()).isTrue();
+
+        partition.release();
+
+        while 
(checkNotNull(fileChannelManager.getPaths()[0].listFiles()).length != 0) {
+            Thread.sleep(10);
+        }
+
+        
assertThat(NUM_TOTAL_BUFFERS).isEqualTo(globalPool.getNumberOfAvailableMemorySegments());
+    }
+
+    @Test
+    void testCreateSubpartitionViewAfterRelease() throws Exception {
+        final int numBuffers = 10;
+        BufferPool bufferPool = globalPool.createBufferPool(numBuffers, 
numBuffers);
+        TieredResultPartition resultPartition =
+                createTieredStoreResultPartition(2, bufferPool, false);
+        resultPartition.release();
+        assertThatThrownBy(
+                        () ->
+                                resultPartition.createSubpartitionView(
+                                        0, new 
NoOpBufferAvailablityListener()))
+                .isInstanceOf(IllegalStateException.class);
+    }
+
+    @Test
+    void testEmitRecords() throws Exception {
+        BufferPool bufferPool = globalPool.createBufferPool(3, 3);
+        int bufferSize = NETWORK_BUFFER_SIZE;
+        try (TieredResultPartition partition =
+                createTieredStoreResultPartition(2, bufferPool, false)) {
+            partition.emitRecord(ByteBuffer.allocate(bufferSize), 0);
+            partition.broadcastRecord(ByteBuffer.allocate(bufferSize));
+            IOMetrics ioMetrics = taskIOMetricGroup.createSnapshot();
+            assertThat(ioMetrics.getResultPartitionBytes()).hasSize(1);
+            ResultPartitionBytes partitionBytes =
+                    
ioMetrics.getResultPartitionBytes().values().iterator().next();
+            assertThat(partitionBytes.getSubpartitionBytes())
+                    .containsExactly((long) 2 * bufferSize, bufferSize);
+        }
+    }
+
+    @Test
+    void testMetricsUpdateForBroadcastOnlyResultPartition() throws Exception {
+        BufferPool bufferPool = globalPool.createBufferPool(3, 3);
+        int bufferSize = NETWORK_BUFFER_SIZE;
+        try (TieredResultPartition partition =
+                createTieredStoreResultPartition(2, bufferPool, true)) {
+            partition.broadcastRecord(ByteBuffer.allocate(bufferSize));
+            IOMetrics ioMetrics = taskIOMetricGroup.createSnapshot();
+            assertThat(ioMetrics.getResultPartitionBytes()).hasSize(1);
+            ResultPartitionBytes partitionBytes =
+                    
ioMetrics.getResultPartitionBytes().values().iterator().next();
+            assertThat(partitionBytes.getSubpartitionBytes())
+                    .containsExactly(bufferSize, bufferSize);
+        }
+    }
+
+    private TieredResultPartition createTieredStoreResultPartition(
+            int numSubpartitions, BufferPool bufferPool, boolean 
isBroadcastOnly)
+            throws IOException {
+        TieredResultPartition tieredResultPartition =
+                new TieredResultPartition(
+                        "TieredStoreResultPartitionTest",
+                        0,
+                        new ResultPartitionID(),
+                        ResultPartitionType.HYBRID_SELECTIVE,
+                        numSubpartitions,
+                        numSubpartitions,
+                        new ResultPartitionManager(),
+                        new BufferCompressor(NETWORK_BUFFER_SIZE, "LZ4"),
+                        () -> bufferPool,
+                        new TieredStorageProducerClient(
+                                numSubpartitions,
+                                isBroadcastOnly,
+                                new TestingBufferAccumulator(),
+                                null,
+                                new ArrayList<>()),
+                        new TieredStorageResourceRegistry());
+        taskIOMetricGroup =
+                
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup();
+        tieredResultPartition.setup();
+        tieredResultPartition.setMetricGroup(taskIOMetricGroup);
+        return tieredResultPartition;
+    }
+}

Reply via email to