reswqa commented on code in PR #22851:
URL: https://github.com/apache/flink/pull/22851#discussion_r1253009389


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java:
##########
@@ -30,18 +32,38 @@ public class TieredStorageConfiguration {
     // TODO, after implementing the tier factory, add appreciate 
implementations to the array.
     private static final TierFactory[] DEFAULT_MEMORY_DISK_TIER_FACTORIES = 
new TierFactory[0];
 
+    /** If the remote storage tier is not used, this field may be null. */
+    @Nullable private final String remoteStorageBasePath;
+
+    public TieredStorageConfiguration(String remoteStorageBasePath) {

Review Comment:
   ```suggestion
       public TieredStorageConfiguration(@Nullable String 
remoteStorageBasePath) {
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageUtils.java:
##########
@@ -25,11 +25,9 @@
 import java.nio.ByteBuffer;
 import java.util.List;
 
-/** Utils for reading or writing from tiered storage. */
+/** Utils for reading or writing to tiered storage. */

Review Comment:
   ```suggestion
   /** Utils for reading from or writing to tiered storage. */
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/SegmentPartitionFileWriter.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FatalExitExceptionHandler;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import 
org.apache.flink.shaded.guava31.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.generateBufferWithHeaders;
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.SegmentPartitionFile.getSegmentPath;
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.SegmentPartitionFile.writeSegmentFinishFile;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The implementation of {@link PartitionFileWriter} with segment file mode. 
In this mode, each
+ * segment of one subpartition is written to an independent file.
+ *
+ * <p>After finishing writing a segment, a segment-finish file is written to 
ensure the downstream
+ * reads only when the entire segment file is written, avoiding partial data 
reads. The downstream
+ * can determine if the current segment is complete by checking for the 
existence of the
+ * segment-finish file.
+ *
+ * <p>To minimize the number of files, each subpartition uses only a single 
segment-finish file. For
+ * instance, if segment-finish file 5 exists, it indicates that segments 1 to 
5 have all been
+ * finished.
+ */
+public class SegmentPartitionFileWriter implements PartitionFileWriter {
+
+    private final ExecutorService ioExecutor =
+            Executors.newSingleThreadExecutor(
+                    new ThreadFactoryBuilder()
+                            .setNameFormat("Hash partition file flush thread")

Review Comment:
   ```suggestion
                               .setNameFormat("Segment partition file flush 
thread")
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/SegmentPartitionFileWriter.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FatalExitExceptionHandler;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import 
org.apache.flink.shaded.guava31.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.generateBufferWithHeaders;
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.SegmentPartitionFile.getSegmentPath;
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.SegmentPartitionFile.writeSegmentFinishFile;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The implementation of {@link PartitionFileWriter} with segment file mode. 
In this mode, each
+ * segment of one subpartition is written to an independent file.
+ *
+ * <p>After finishing writing a segment, a segment-finish file is written to 
ensure the downstream
+ * reads only when the entire segment file is written, avoiding partial data 
reads. The downstream
+ * can determine if the current segment is complete by checking for the 
existence of the
+ * segment-finish file.
+ *
+ * <p>To minimize the number of files, each subpartition uses only a single 
segment-finish file. For
+ * instance, if segment-finish file 5 exists, it indicates that segments 1 to 
5 have all been
+ * finished.
+ */
+public class SegmentPartitionFileWriter implements PartitionFileWriter {
+
+    private final ExecutorService ioExecutor =
+            Executors.newSingleThreadExecutor(
+                    new ThreadFactoryBuilder()
+                            .setNameFormat("Hash partition file flush thread")
+                            
.setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE)
+                            .build());
+    private final String basePath;
+
+    private final WritableByteChannel[] subpartitionChannels;
+
+    private volatile boolean isReleased;
+
+    SegmentPartitionFileWriter(String basePath, int numSubpartitions) {
+        this.basePath = basePath;
+        this.subpartitionChannels = new WritableByteChannel[numSubpartitions];
+        Arrays.fill(subpartitionChannels, null);
+    }
+
+    @Override
+    public CompletableFuture<Void> write(
+            TieredStoragePartitionId partitionId, 
List<SubpartitionBufferContext> buffersToWrite) {
+        List<CompletableFuture<Void>> completableFutures = new ArrayList<>();
+        buffersToWrite.forEach(
+                subpartitionBuffers -> {
+                    int subpartitionId = 
subpartitionBuffers.getSubpartitionId();
+                    List<SegmentBufferContext> multiSegmentBuffers =
+                            subpartitionBuffers.getSegmentBufferContexts();
+                    multiSegmentBuffers.forEach(
+                            segmentBuffers -> {
+                                CompletableFuture<Void> flushSuccessNotifier =
+                                        new CompletableFuture<>();
+                                ioExecutor.execute(
+                                        () ->
+                                                flushOrFinishSegment(
+                                                        partitionId,
+                                                        subpartitionId,
+                                                        segmentBuffers,
+                                                        flushSuccessNotifier));
+                                completableFutures.add(flushSuccessNotifier);
+                            });
+                });
+        return FutureUtils.waitForAll(completableFutures);
+    }
+
+    @Override
+    public void release() {
+        if (isReleased) {
+            return;
+        }
+        isReleased = true;
+        try {
+            ioExecutor.shutdown();
+            if (!ioExecutor.awaitTermination(5L, TimeUnit.MINUTES)) {
+                throw new TimeoutException("Timeout to shutdown the flush 
thread.");
+            }
+            for (WritableByteChannel writeChannel : subpartitionChannels) {
+                if (writeChannel != null) {
+                    writeChannel.close();
+                }
+            }
+        } catch (Exception e) {
+            ExceptionUtils.rethrow(e);
+        }
+    }
+
+    // ------------------------------------------------------------------------
+    //  Internal Methods
+    // ------------------------------------------------------------------------
+
+    private void flushOrFinishSegment(
+            TieredStoragePartitionId partitionId,
+            int subpartitionId,
+            SegmentBufferContext segmentBuffers,

Review Comment:
   ```suggestion
               SegmentBufferContext segmentBufferContext,
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/SubpartitionRemoteCacheManager.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileWriter;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import net.jcip.annotations.GuardedBy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This {@link SubpartitionRemoteCacheManager} is responsible for managing the 
buffers in a single
+ * subpartition.
+ */
+class SubpartitionRemoteCacheManager {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SubpartitionRemoteCacheManager.class);
+
+    private final TieredStoragePartitionId partitionId;
+
+    private final int subpartitionId;
+
+    private final PartitionFileWriter partitionFileWriter;
+
+    /**
+     * All the buffers. The first field of the tuple is the buffer, while the 
second field of the
+     * buffer is the buffer index.
+     *
+     * <p>Note that this field can be accessed by the task thread or the write 
IO thread, so the
+     * thread safety should be ensured.
+     */
+    private final Deque<Tuple2<Buffer, Integer>> allBuffers = new 
LinkedList<>();
+
+    private CompletableFuture<Void> flushCompletableFuture = 
FutureUtils.completedVoidFuture();
+
+    /**
+     * Record the segment id that is writing to.
+     *
+     * <p>Note that when flushing buffers, this can be touched by task thread 
or the flushing
+     * thread, so the thread safety should be ensured.
+     */
+    @GuardedBy("allBuffers")
+    private int segmentId = -1;
+
+    /**
+     * Record the buffer index in the {@link SubpartitionRemoteCacheManager}. 
Each time a new buffer
+     * is added to the {@code allBuffers}, this field is increased by one.
+     *
+     * <p>Note that the field can only be touched by the task thread, so this 
field need not be
+     * guarded by any lock or synchronizations.
+     */
+    private int bufferIndex;
+
+    public SubpartitionRemoteCacheManager(
+            TieredStoragePartitionId partitionId,
+            int subpartitionId,
+            TieredStorageMemoryManager storageMemoryManager,
+            PartitionFileWriter partitionFileWriter) {
+        this.partitionId = partitionId;
+        this.subpartitionId = subpartitionId;
+        this.partitionFileWriter = partitionFileWriter;
+        storageMemoryManager.listenBufferReclaimRequest(this::flushBuffers);
+    }
+
+    // ------------------------------------------------------------------------
+    //  Called by RemoteCacheManager
+    // ------------------------------------------------------------------------
+
+    void startSegment(int segmentId) {
+        synchronized (allBuffers) {
+            checkState(allBuffers.isEmpty(), "There are un-flushed buffers.");
+            this.segmentId = segmentId;
+        }
+    }
+
+    void addBuffer(Buffer buffer) {
+        Tuple2<Buffer, Integer> toAddBuffer = new Tuple2<>(buffer, 
bufferIndex++);
+        synchronized (allBuffers) {
+            allBuffers.add(toAddBuffer);
+        }
+    }
+
+    void finishSegment(int segmentId) {
+        synchronized (allBuffers) {
+            checkState(this.segmentId == segmentId, "Wrong segment id.");
+        }
+        // Flush the buffers belonging to the current segment
+        flushBuffers();
+
+        PartitionFileWriter.SubpartitionBufferContext bufferContext =
+                new PartitionFileWriter.SubpartitionBufferContext(
+                        subpartitionId,
+                        Collections.singletonList(
+                                new PartitionFileWriter.SegmentBufferContext(
+                                        segmentId, Collections.emptyList(), 
true)));
+        // Notify the partition file writer that the segment is finished 
through writing the
+        // buffer context
+        flushCompletableFuture =
+                partitionFileWriter.write(partitionId, 
Collections.singletonList(bufferContext));
+        checkState(allBuffers.isEmpty(), "Leaking buffers.");
+    }
+
+    void close() {
+        // Wait the flushing buffers to be completed before closed
+        try {
+            flushCompletableFuture.get();
+        } catch (Exception e) {
+            LOG.error("Failed to flush the buffers.", e);
+            ExceptionUtils.rethrow(e);
+        }
+        flushBuffers();
+    }
+
+    /** Release all buffers. */
+    void release() {
+        recycleBuffers();
+        checkState(allBuffers.isEmpty(), "Leaking buffers.");
+        partitionFileWriter.release();
+    }
+
+    // ------------------------------------------------------------------------
+    //  Internal Methods
+    // ------------------------------------------------------------------------
+
+    private void flushBuffers() {
+        synchronized (allBuffers) {
+            List<Tuple2<Buffer, Integer>> allBuffersToFlush = new 
ArrayList<>(allBuffers);
+            allBuffers.clear();
+            if (allBuffersToFlush.isEmpty()) {
+                return;
+            }
+
+            PartitionFileWriter.SubpartitionBufferContext 
subpartitionBufferContext =
+                    new PartitionFileWriter.SubpartitionBufferContext(
+                            subpartitionId,
+                            Collections.singletonList(
+                                    new 
PartitionFileWriter.SegmentBufferContext(
+                                            segmentId, allBuffersToFlush, 
false)));
+            flushCompletableFuture =
+                    partitionFileWriter.write(
+                            partitionId, 
Collections.singletonList(subpartitionBufferContext));
+        }
+    }
+
+    private void recycleBuffers() {
+        synchronized (allBuffers) {
+            for (Tuple2<Buffer, Integer> bufferAndIndex : allBuffers) {
+                Buffer buffer = bufferAndIndex.f0;
+                if (!buffer.isRecycled()) {

Review Comment:
   Under what circumstances has this buffer already been recycled?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/SubpartitionRemoteCacheManager.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileWriter;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import net.jcip.annotations.GuardedBy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This {@link SubpartitionRemoteCacheManager} is responsible for managing the 
buffers in a single
+ * subpartition.
+ */
+class SubpartitionRemoteCacheManager {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SubpartitionRemoteCacheManager.class);
+
+    private final TieredStoragePartitionId partitionId;
+
+    private final int subpartitionId;
+
+    private final PartitionFileWriter partitionFileWriter;
+
+    /**
+     * All the buffers. The first field of the tuple is the buffer, while the 
second field of the
+     * buffer is the buffer index.
+     *
+     * <p>Note that this field can be accessed by the task thread or the write 
IO thread, so the

Review Comment:
   Which thread is `write IO thread`? 🤔 



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/SegmentPartitionFileTest.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Random;
+
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.SegmentPartitionFile.SEGMENT_FILE_PREFIX;
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.SegmentPartitionFile.SEGMENT_FINISH_DIR_NAME;
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.SegmentPartitionFile.TIERED_STORAGE_DIR;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link SegmentPartitionFile}. */
+class SegmentPartitionFileTest {
+
+    @TempDir Path tempFolder;

Review Comment:
   ```suggestion
       @TempDir File tempFolder;
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/SubpartitionRemoteCacheManager.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileWriter;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import net.jcip.annotations.GuardedBy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This {@link SubpartitionRemoteCacheManager} is responsible for managing the 
buffers in a single
+ * subpartition.
+ */
+class SubpartitionRemoteCacheManager {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SubpartitionRemoteCacheManager.class);
+
+    private final TieredStoragePartitionId partitionId;
+
+    private final int subpartitionId;
+
+    private final PartitionFileWriter partitionFileWriter;
+
+    /**
+     * All the buffers. The first field of the tuple is the buffer, while the 
second field of the
+     * buffer is the buffer index.
+     *
+     * <p>Note that this field can be accessed by the task thread or the write 
IO thread, so the
+     * thread safety should be ensured.
+     */
+    private final Deque<Tuple2<Buffer, Integer>> allBuffers = new 
LinkedList<>();
+
+    private CompletableFuture<Void> flushCompletableFuture = 
FutureUtils.completedVoidFuture();
+
+    /**
+     * Record the segment id that is writing to.
+     *
+     * <p>Note that when flushing buffers, this can be touched by task thread 
or the flushing
+     * thread, so the thread safety should be ensured.
+     */
+    @GuardedBy("allBuffers")
+    private int segmentId = -1;
+
+    /**
+     * Record the buffer index in the {@link SubpartitionRemoteCacheManager}. 
Each time a new buffer
+     * is added to the {@code allBuffers}, this field is increased by one.
+     *
+     * <p>Note that the field can only be touched by the task thread, so this 
field need not be
+     * guarded by any lock or synchronizations.
+     */
+    private int bufferIndex;
+
+    public SubpartitionRemoteCacheManager(
+            TieredStoragePartitionId partitionId,
+            int subpartitionId,
+            TieredStorageMemoryManager storageMemoryManager,
+            PartitionFileWriter partitionFileWriter) {
+        this.partitionId = partitionId;
+        this.subpartitionId = subpartitionId;
+        this.partitionFileWriter = partitionFileWriter;
+        storageMemoryManager.listenBufferReclaimRequest(this::flushBuffers);
+    }
+
+    // ------------------------------------------------------------------------
+    //  Called by RemoteCacheManager
+    // ------------------------------------------------------------------------
+
+    void startSegment(int segmentId) {
+        synchronized (allBuffers) {
+            checkState(allBuffers.isEmpty(), "There are un-flushed buffers.");
+            this.segmentId = segmentId;
+        }
+    }
+
+    void addBuffer(Buffer buffer) {
+        Tuple2<Buffer, Integer> toAddBuffer = new Tuple2<>(buffer, 
bufferIndex++);
+        synchronized (allBuffers) {
+            allBuffers.add(toAddBuffer);
+        }
+    }
+
+    void finishSegment(int segmentId) {
+        synchronized (allBuffers) {
+            checkState(this.segmentId == segmentId, "Wrong segment id.");
+        }
+        // Flush the buffers belonging to the current segment
+        flushBuffers();
+
+        PartitionFileWriter.SubpartitionBufferContext bufferContext =
+                new PartitionFileWriter.SubpartitionBufferContext(
+                        subpartitionId,
+                        Collections.singletonList(
+                                new PartitionFileWriter.SegmentBufferContext(
+                                        segmentId, Collections.emptyList(), 
true)));
+        // Notify the partition file writer that the segment is finished 
through writing the
+        // buffer context
+        flushCompletableFuture =
+                partitionFileWriter.write(partitionId, 
Collections.singletonList(bufferContext));
+        checkState(allBuffers.isEmpty(), "Leaking buffers.");
+    }
+
+    void close() {
+        // Wait the flushing buffers to be completed before closed
+        try {
+            flushCompletableFuture.get();
+        } catch (Exception e) {
+            LOG.error("Failed to flush the buffers.", e);
+            ExceptionUtils.rethrow(e);
+        }
+        flushBuffers();
+    }
+
+    /** Release all buffers. */
+    void release() {
+        recycleBuffers();
+        checkState(allBuffers.isEmpty(), "Leaking buffers.");
+        partitionFileWriter.release();
+    }
+
+    // ------------------------------------------------------------------------
+    //  Internal Methods
+    // ------------------------------------------------------------------------
+
+    private void flushBuffers() {
+        synchronized (allBuffers) {
+            List<Tuple2<Buffer, Integer>> allBuffersToFlush = new 
ArrayList<>(allBuffers);
+            allBuffers.clear();
+            if (allBuffersToFlush.isEmpty()) {
+                return;
+            }
+
+            PartitionFileWriter.SubpartitionBufferContext 
subpartitionBufferContext =
+                    new PartitionFileWriter.SubpartitionBufferContext(
+                            subpartitionId,
+                            Collections.singletonList(
+                                    new 
PartitionFileWriter.SegmentBufferContext(
+                                            segmentId, allBuffersToFlush, 
false)));
+            flushCompletableFuture =
+                    partitionFileWriter.write(
+                            partitionId, 
Collections.singletonList(subpartitionBufferContext));
+        }
+    }
+
+    private void recycleBuffers() {
+        synchronized (allBuffers) {
+            for (Tuple2<Buffer, Integer> bufferAndIndex : allBuffers) {
+                Buffer buffer = bufferAndIndex.f0;
+                if (!buffer.isRecycled()) {

Review Comment:
   If this check is not necessary ,we can refactor this loop by:
   ```java
   while (!allBuffers.isEmpty()) {
                   allBuffers.poll().f0.recycleBuffer();
               }
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/SegmentPartitionFileReader.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+
+import java.io.IOException;
+
+/** THe implementation of {@link PartitionFileReader} with segment file mode. 
*/
+public class SegmentPartitionFileReader implements PartitionFileReader {
+
+    public SegmentPartitionFileReader(String dataFilePath) {
+        // TODO, implement the HashPartitionFileReader
+    }
+
+    @Override
+    public Buffer readBuffer(
+            TieredStoragePartitionId partitionId,
+            TieredStorageSubpartitionId subpartitionId,
+            int segmentId,
+            int bufferIndex,
+            MemorySegment memorySegment,
+            BufferRecycler recycler)
+            throws IOException {
+        // TODO, implement the HashPartitionFileReader

Review Comment:
   ```suggestion
           // TODO, implement the SegmentPartitionFileReader
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/SegmentPartitionFileReader.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+
+import java.io.IOException;
+
+/** THe implementation of {@link PartitionFileReader} with segment file mode. 
*/
+public class SegmentPartitionFileReader implements PartitionFileReader {
+
+    public SegmentPartitionFileReader(String dataFilePath) {
+        // TODO, implement the HashPartitionFileReader

Review Comment:
   ```suggestion
           // TODO, implement the SegmentPartitionFileReader
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/SegmentPartitionFileWriter.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FatalExitExceptionHandler;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import 
org.apache.flink.shaded.guava31.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.generateBufferWithHeaders;
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.SegmentPartitionFile.getSegmentPath;
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.SegmentPartitionFile.writeSegmentFinishFile;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The implementation of {@link PartitionFileWriter} with segment file mode. 
In this mode, each
+ * segment of one subpartition is written to an independent file.
+ *
+ * <p>After finishing writing a segment, a segment-finish file is written to 
ensure the downstream
+ * reads only when the entire segment file is written, avoiding partial data 
reads. The downstream
+ * can determine if the current segment is complete by checking for the 
existence of the
+ * segment-finish file.
+ *
+ * <p>To minimize the number of files, each subpartition uses only a single 
segment-finish file. For
+ * instance, if segment-finish file 5 exists, it indicates that segments 1 to 
5 have all been
+ * finished.
+ */
+public class SegmentPartitionFileWriter implements PartitionFileWriter {
+
+    private final ExecutorService ioExecutor =
+            Executors.newSingleThreadExecutor(
+                    new ThreadFactoryBuilder()
+                            .setNameFormat("Hash partition file flush thread")
+                            
.setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE)
+                            .build());
+    private final String basePath;
+
+    private final WritableByteChannel[] subpartitionChannels;
+
+    private volatile boolean isReleased;
+
+    SegmentPartitionFileWriter(String basePath, int numSubpartitions) {
+        this.basePath = basePath;
+        this.subpartitionChannels = new WritableByteChannel[numSubpartitions];
+        Arrays.fill(subpartitionChannels, null);
+    }
+
+    @Override
+    public CompletableFuture<Void> write(
+            TieredStoragePartitionId partitionId, 
List<SubpartitionBufferContext> buffersToWrite) {
+        List<CompletableFuture<Void>> completableFutures = new ArrayList<>();
+        buffersToWrite.forEach(
+                subpartitionBuffers -> {
+                    int subpartitionId = 
subpartitionBuffers.getSubpartitionId();
+                    List<SegmentBufferContext> multiSegmentBuffers =
+                            subpartitionBuffers.getSegmentBufferContexts();
+                    multiSegmentBuffers.forEach(
+                            segmentBuffers -> {
+                                CompletableFuture<Void> flushSuccessNotifier =
+                                        new CompletableFuture<>();
+                                ioExecutor.execute(
+                                        () ->
+                                                flushOrFinishSegment(
+                                                        partitionId,
+                                                        subpartitionId,
+                                                        segmentBuffers,
+                                                        flushSuccessNotifier));
+                                completableFutures.add(flushSuccessNotifier);
+                            });
+                });
+        return FutureUtils.waitForAll(completableFutures);
+    }
+
+    @Override
+    public void release() {
+        if (isReleased) {
+            return;
+        }
+        isReleased = true;
+        try {
+            ioExecutor.shutdown();
+            if (!ioExecutor.awaitTermination(5L, TimeUnit.MINUTES)) {
+                throw new TimeoutException("Timeout to shutdown the flush 
thread.");
+            }
+            for (WritableByteChannel writeChannel : subpartitionChannels) {
+                if (writeChannel != null) {
+                    writeChannel.close();
+                }
+            }
+        } catch (Exception e) {
+            ExceptionUtils.rethrow(e);
+        }
+    }
+
+    // ------------------------------------------------------------------------
+    //  Internal Methods
+    // ------------------------------------------------------------------------
+
+    private void flushOrFinishSegment(
+            TieredStoragePartitionId partitionId,
+            int subpartitionId,
+            SegmentBufferContext segmentBuffers,
+            CompletableFuture<Void> flushSuccessNotifier) {
+        int segmentId = segmentBuffers.getSegmentId();
+        List<Tuple2<Buffer, Integer>> buffersToFlush = 
segmentBuffers.getBufferAndIndexes();
+        boolean isSegmentFinished = segmentBuffers.isSegmentFinished();
+        checkState(!buffersToFlush.isEmpty() || isSegmentFinished);
+
+        if (buffersToFlush.size() > 0) {
+            flush(partitionId, subpartitionId, segmentId, buffersToFlush, 
flushSuccessNotifier);
+        }
+        if (isSegmentFinished) {
+            writeFinishSegmentFile(partitionId, subpartitionId, segmentId, 
flushSuccessNotifier);
+        }
+    }
+
+    /** This method is only called by the flushing thread. */
+    private void flush(
+            TieredStoragePartitionId partitionId,
+            int subpartitionId,
+            int segmentId,
+            List<Tuple2<Buffer, Integer>> buffersToFlush,
+            CompletableFuture<Void> flushSuccessNotifier) {
+        try {
+            writeBuffers(
+                    partitionId,
+                    subpartitionId,
+                    segmentId,
+                    buffersToFlush,
+                    getTotalBytes(buffersToFlush));
+            buffersToFlush.forEach(bufferToFlush -> 
bufferToFlush.f0.recycleBuffer());
+            flushSuccessNotifier.complete(null);
+        } catch (IOException exception) {
+            ExceptionUtils.rethrow(exception);
+        }
+    }
+
+    /**
+     * Writing a segment-finish file when the current segment is complete. The 
downstream can
+     * determine if the current segment is complete by checking for the 
existence of the
+     * segment-finish file.
+     *
+     * <p>Note that the method is only called by the flushing thread.
+     */
+    private void writeFinishSegmentFile(

Review Comment:
   ```suggestion
       private void writeSegmentFinishFile(
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/SegmentPartitionFileReader.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+
+import java.io.IOException;
+
+/** THe implementation of {@link PartitionFileReader} with segment file mode. 
*/

Review Comment:
   ```suggestion
   /** The implementation of {@link PartitionFileReader} with segment file 
mode. */
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/SegmentPartitionFileWriter.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FatalExitExceptionHandler;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import 
org.apache.flink.shaded.guava31.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.generateBufferWithHeaders;
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.SegmentPartitionFile.getSegmentPath;
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.SegmentPartitionFile.writeSegmentFinishFile;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The implementation of {@link PartitionFileWriter} with segment file mode. 
In this mode, each
+ * segment of one subpartition is written to an independent file.
+ *
+ * <p>After finishing writing a segment, a segment-finish file is written to 
ensure the downstream
+ * reads only when the entire segment file is written, avoiding partial data 
reads. The downstream
+ * can determine if the current segment is complete by checking for the 
existence of the
+ * segment-finish file.
+ *
+ * <p>To minimize the number of files, each subpartition uses only a single 
segment-finish file. For
+ * instance, if segment-finish file 5 exists, it indicates that segments 1 to 
5 have all been
+ * finished.
+ */
+public class SegmentPartitionFileWriter implements PartitionFileWriter {
+
+    private final ExecutorService ioExecutor =
+            Executors.newSingleThreadExecutor(
+                    new ThreadFactoryBuilder()
+                            .setNameFormat("Hash partition file flush thread")
+                            
.setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE)
+                            .build());
+    private final String basePath;
+
+    private final WritableByteChannel[] subpartitionChannels;
+
+    private volatile boolean isReleased;
+
+    SegmentPartitionFileWriter(String basePath, int numSubpartitions) {
+        this.basePath = basePath;
+        this.subpartitionChannels = new WritableByteChannel[numSubpartitions];
+        Arrays.fill(subpartitionChannels, null);
+    }
+
+    @Override
+    public CompletableFuture<Void> write(
+            TieredStoragePartitionId partitionId, 
List<SubpartitionBufferContext> buffersToWrite) {
+        List<CompletableFuture<Void>> completableFutures = new ArrayList<>();
+        buffersToWrite.forEach(
+                subpartitionBuffers -> {
+                    int subpartitionId = 
subpartitionBuffers.getSubpartitionId();
+                    List<SegmentBufferContext> multiSegmentBuffers =
+                            subpartitionBuffers.getSegmentBufferContexts();
+                    multiSegmentBuffers.forEach(
+                            segmentBuffers -> {
+                                CompletableFuture<Void> flushSuccessNotifier =
+                                        new CompletableFuture<>();
+                                ioExecutor.execute(
+                                        () ->
+                                                flushOrFinishSegment(
+                                                        partitionId,
+                                                        subpartitionId,
+                                                        segmentBuffers,
+                                                        flushSuccessNotifier));
+                                completableFutures.add(flushSuccessNotifier);
+                            });
+                });
+        return FutureUtils.waitForAll(completableFutures);
+    }
+
+    @Override
+    public void release() {
+        if (isReleased) {
+            return;
+        }
+        isReleased = true;
+        try {
+            ioExecutor.shutdown();
+            if (!ioExecutor.awaitTermination(5L, TimeUnit.MINUTES)) {
+                throw new TimeoutException("Timeout to shutdown the flush 
thread.");
+            }
+            for (WritableByteChannel writeChannel : subpartitionChannels) {
+                if (writeChannel != null) {
+                    writeChannel.close();
+                }
+            }
+        } catch (Exception e) {
+            ExceptionUtils.rethrow(e);
+        }
+    }
+
+    // ------------------------------------------------------------------------
+    //  Internal Methods
+    // ------------------------------------------------------------------------
+
+    private void flushOrFinishSegment(
+            TieredStoragePartitionId partitionId,
+            int subpartitionId,
+            SegmentBufferContext segmentBuffers,
+            CompletableFuture<Void> flushSuccessNotifier) {
+        int segmentId = segmentBuffers.getSegmentId();
+        List<Tuple2<Buffer, Integer>> buffersToFlush = 
segmentBuffers.getBufferAndIndexes();
+        boolean isSegmentFinished = segmentBuffers.isSegmentFinished();
+        checkState(!buffersToFlush.isEmpty() || isSegmentFinished);
+
+        if (buffersToFlush.size() > 0) {
+            flush(partitionId, subpartitionId, segmentId, buffersToFlush, 
flushSuccessNotifier);
+        }
+        if (isSegmentFinished) {
+            writeFinishSegmentFile(partitionId, subpartitionId, segmentId, 
flushSuccessNotifier);
+        }
+    }
+
+    /** This method is only called by the flushing thread. */
+    private void flush(
+            TieredStoragePartitionId partitionId,
+            int subpartitionId,
+            int segmentId,
+            List<Tuple2<Buffer, Integer>> buffersToFlush,
+            CompletableFuture<Void> flushSuccessNotifier) {
+        try {
+            writeBuffers(
+                    partitionId,
+                    subpartitionId,
+                    segmentId,
+                    buffersToFlush,
+                    getTotalBytes(buffersToFlush));
+            buffersToFlush.forEach(bufferToFlush -> 
bufferToFlush.f0.recycleBuffer());
+            flushSuccessNotifier.complete(null);
+        } catch (IOException exception) {
+            ExceptionUtils.rethrow(exception);
+        }
+    }
+
+    /**
+     * Writing a segment-finish file when the current segment is complete. The 
downstream can
+     * determine if the current segment is complete by checking for the 
existence of the
+     * segment-finish file.
+     *
+     * <p>Note that the method is only called by the flushing thread.
+     */
+    private void writeFinishSegmentFile(
+            TieredStoragePartitionId partitionId,
+            int subpartitionId,
+            int segmentId,
+            CompletableFuture<Void> flushSuccessNotifier) {
+        try {
+            writeSegmentFinishFile(basePath, partitionId, subpartitionId, 
segmentId);
+            WritableByteChannel channel = subpartitionChannels[subpartitionId];
+            if (channel != null) {
+                channel.close();
+                subpartitionChannels[subpartitionId] = null;
+            }
+        } catch (IOException exception) {
+            ExceptionUtils.rethrow(exception);
+        }
+        flushSuccessNotifier.complete(null);

Review Comment:
   Can be completed in `flushOrFinishSegment`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/SegmentPartitionFileWriter.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FatalExitExceptionHandler;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import 
org.apache.flink.shaded.guava31.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.generateBufferWithHeaders;
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.SegmentPartitionFile.getSegmentPath;
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.SegmentPartitionFile.writeSegmentFinishFile;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The implementation of {@link PartitionFileWriter} with segment file mode. 
In this mode, each
+ * segment of one subpartition is written to an independent file.
+ *
+ * <p>After finishing writing a segment, a segment-finish file is written to 
ensure the downstream
+ * reads only when the entire segment file is written, avoiding partial data 
reads. The downstream
+ * can determine if the current segment is complete by checking for the 
existence of the
+ * segment-finish file.
+ *
+ * <p>To minimize the number of files, each subpartition uses only a single 
segment-finish file. For
+ * instance, if segment-finish file 5 exists, it indicates that segments 1 to 
5 have all been
+ * finished.
+ */
+public class SegmentPartitionFileWriter implements PartitionFileWriter {
+
+    private final ExecutorService ioExecutor =
+            Executors.newSingleThreadExecutor(
+                    new ThreadFactoryBuilder()
+                            .setNameFormat("Hash partition file flush thread")
+                            
.setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE)
+                            .build());
+    private final String basePath;
+
+    private final WritableByteChannel[] subpartitionChannels;
+
+    private volatile boolean isReleased;
+
+    SegmentPartitionFileWriter(String basePath, int numSubpartitions) {
+        this.basePath = basePath;
+        this.subpartitionChannels = new WritableByteChannel[numSubpartitions];
+        Arrays.fill(subpartitionChannels, null);
+    }
+
+    @Override
+    public CompletableFuture<Void> write(
+            TieredStoragePartitionId partitionId, 
List<SubpartitionBufferContext> buffersToWrite) {
+        List<CompletableFuture<Void>> completableFutures = new ArrayList<>();
+        buffersToWrite.forEach(
+                subpartitionBuffers -> {
+                    int subpartitionId = 
subpartitionBuffers.getSubpartitionId();
+                    List<SegmentBufferContext> multiSegmentBuffers =
+                            subpartitionBuffers.getSegmentBufferContexts();
+                    multiSegmentBuffers.forEach(
+                            segmentBuffers -> {
+                                CompletableFuture<Void> flushSuccessNotifier =
+                                        new CompletableFuture<>();
+                                ioExecutor.execute(
+                                        () ->
+                                                flushOrFinishSegment(
+                                                        partitionId,
+                                                        subpartitionId,
+                                                        segmentBuffers,
+                                                        flushSuccessNotifier));
+                                completableFutures.add(flushSuccessNotifier);
+                            });
+                });
+        return FutureUtils.waitForAll(completableFutures);
+    }
+
+    @Override
+    public void release() {
+        if (isReleased) {
+            return;
+        }
+        isReleased = true;
+        try {
+            ioExecutor.shutdown();
+            if (!ioExecutor.awaitTermination(5L, TimeUnit.MINUTES)) {
+                throw new TimeoutException("Timeout to shutdown the flush 
thread.");
+            }
+            for (WritableByteChannel writeChannel : subpartitionChannels) {
+                if (writeChannel != null) {
+                    writeChannel.close();
+                }
+            }
+        } catch (Exception e) {
+            ExceptionUtils.rethrow(e);
+        }
+    }
+
+    // ------------------------------------------------------------------------
+    //  Internal Methods
+    // ------------------------------------------------------------------------
+
+    private void flushOrFinishSegment(
+            TieredStoragePartitionId partitionId,
+            int subpartitionId,
+            SegmentBufferContext segmentBuffers,
+            CompletableFuture<Void> flushSuccessNotifier) {
+        int segmentId = segmentBuffers.getSegmentId();
+        List<Tuple2<Buffer, Integer>> buffersToFlush = 
segmentBuffers.getBufferAndIndexes();
+        boolean isSegmentFinished = segmentBuffers.isSegmentFinished();
+        checkState(!buffersToFlush.isEmpty() || isSegmentFinished);

Review Comment:
   Do we require this two condition to be mutually exclusive?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/SegmentPartitionFileWriter.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FatalExitExceptionHandler;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import 
org.apache.flink.shaded.guava31.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.generateBufferWithHeaders;
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.SegmentPartitionFile.getSegmentPath;
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.SegmentPartitionFile.writeSegmentFinishFile;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The implementation of {@link PartitionFileWriter} with segment file mode. 
In this mode, each
+ * segment of one subpartition is written to an independent file.
+ *
+ * <p>After finishing writing a segment, a segment-finish file is written to 
ensure the downstream
+ * reads only when the entire segment file is written, avoiding partial data 
reads. The downstream
+ * can determine if the current segment is complete by checking for the 
existence of the
+ * segment-finish file.
+ *
+ * <p>To minimize the number of files, each subpartition uses only a single 
segment-finish file. For
+ * instance, if segment-finish file 5 exists, it indicates that segments 1 to 
5 have all been
+ * finished.
+ */
+public class SegmentPartitionFileWriter implements PartitionFileWriter {
+
+    private final ExecutorService ioExecutor =
+            Executors.newSingleThreadExecutor(
+                    new ThreadFactoryBuilder()
+                            .setNameFormat("Hash partition file flush thread")
+                            
.setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE)
+                            .build());
+    private final String basePath;
+
+    private final WritableByteChannel[] subpartitionChannels;
+
+    private volatile boolean isReleased;
+
+    SegmentPartitionFileWriter(String basePath, int numSubpartitions) {
+        this.basePath = basePath;
+        this.subpartitionChannels = new WritableByteChannel[numSubpartitions];
+        Arrays.fill(subpartitionChannels, null);
+    }
+
+    @Override
+    public CompletableFuture<Void> write(
+            TieredStoragePartitionId partitionId, 
List<SubpartitionBufferContext> buffersToWrite) {
+        List<CompletableFuture<Void>> completableFutures = new ArrayList<>();
+        buffersToWrite.forEach(
+                subpartitionBuffers -> {
+                    int subpartitionId = 
subpartitionBuffers.getSubpartitionId();
+                    List<SegmentBufferContext> multiSegmentBuffers =
+                            subpartitionBuffers.getSegmentBufferContexts();
+                    multiSegmentBuffers.forEach(
+                            segmentBuffers -> {
+                                CompletableFuture<Void> flushSuccessNotifier =
+                                        new CompletableFuture<>();
+                                ioExecutor.execute(
+                                        () ->
+                                                flushOrFinishSegment(
+                                                        partitionId,
+                                                        subpartitionId,
+                                                        segmentBuffers,
+                                                        flushSuccessNotifier));
+                                completableFutures.add(flushSuccessNotifier);
+                            });
+                });
+        return FutureUtils.waitForAll(completableFutures);
+    }
+
+    @Override
+    public void release() {
+        if (isReleased) {
+            return;
+        }
+        isReleased = true;
+        try {
+            ioExecutor.shutdown();
+            if (!ioExecutor.awaitTermination(5L, TimeUnit.MINUTES)) {
+                throw new TimeoutException("Timeout to shutdown the flush 
thread.");
+            }
+            for (WritableByteChannel writeChannel : subpartitionChannels) {
+                if (writeChannel != null) {
+                    writeChannel.close();
+                }
+            }
+        } catch (Exception e) {
+            ExceptionUtils.rethrow(e);
+        }
+    }
+
+    // ------------------------------------------------------------------------
+    //  Internal Methods
+    // ------------------------------------------------------------------------
+
+    private void flushOrFinishSegment(
+            TieredStoragePartitionId partitionId,
+            int subpartitionId,
+            SegmentBufferContext segmentBuffers,
+            CompletableFuture<Void> flushSuccessNotifier) {
+        int segmentId = segmentBuffers.getSegmentId();
+        List<Tuple2<Buffer, Integer>> buffersToFlush = 
segmentBuffers.getBufferAndIndexes();
+        boolean isSegmentFinished = segmentBuffers.isSegmentFinished();
+        checkState(!buffersToFlush.isEmpty() || isSegmentFinished);
+
+        if (buffersToFlush.size() > 0) {
+            flush(partitionId, subpartitionId, segmentId, buffersToFlush, 
flushSuccessNotifier);
+        }
+        if (isSegmentFinished) {
+            writeFinishSegmentFile(partitionId, subpartitionId, segmentId, 
flushSuccessNotifier);
+        }
+    }
+
+    /** This method is only called by the flushing thread. */
+    private void flush(
+            TieredStoragePartitionId partitionId,
+            int subpartitionId,
+            int segmentId,
+            List<Tuple2<Buffer, Integer>> buffersToFlush,
+            CompletableFuture<Void> flushSuccessNotifier) {
+        try {
+            writeBuffers(
+                    partitionId,
+                    subpartitionId,
+                    segmentId,
+                    buffersToFlush,
+                    getTotalBytes(buffersToFlush));
+            buffersToFlush.forEach(bufferToFlush -> 
bufferToFlush.f0.recycleBuffer());
+            flushSuccessNotifier.complete(null);
+        } catch (IOException exception) {
+            ExceptionUtils.rethrow(exception);
+        }
+    }
+
+    /**
+     * Writing a segment-finish file when the current segment is complete. The 
downstream can
+     * determine if the current segment is complete by checking for the 
existence of the
+     * segment-finish file.
+     *
+     * <p>Note that the method is only called by the flushing thread.
+     */
+    private void writeFinishSegmentFile(
+            TieredStoragePartitionId partitionId,
+            int subpartitionId,
+            int segmentId,
+            CompletableFuture<Void> flushSuccessNotifier) {
+        try {
+            writeSegmentFinishFile(basePath, partitionId, subpartitionId, 
segmentId);
+            WritableByteChannel channel = subpartitionChannels[subpartitionId];
+            if (channel != null) {
+                channel.close();
+                subpartitionChannels[subpartitionId] = null;
+            }
+        } catch (IOException exception) {
+            ExceptionUtils.rethrow(exception);
+        }
+        flushSuccessNotifier.complete(null);
+    }
+
+    private long getTotalBytes(List<Tuple2<Buffer, Integer>> buffersToFlush) {
+        long expectedBytes = 0;
+        for (Tuple2<Buffer, Integer> bufferToFlush : buffersToFlush) {
+            Buffer buffer = bufferToFlush.f0;
+            int numBytes = buffer.readableBytes() + 
BufferReaderWriterUtil.HEADER_LENGTH;
+            expectedBytes += numBytes;
+        }
+        return expectedBytes;
+    }
+
+    private void writeBuffers(
+            TieredStoragePartitionId partitionId,
+            int subpartitionId,
+            int segmentId,
+            List<Tuple2<Buffer, Integer>> buffersToFlush,
+            long expectedBytes)
+            throws IOException {
+        ByteBuffer[] bufferWithHeaders = 
generateBufferWithHeaders(buffersToFlush);
+        WritableByteChannel currentChannel = 
subpartitionChannels[subpartitionId];
+        if (currentChannel == null) {
+            Path writingSegmentPath =
+                    getSegmentPath(basePath, partitionId, subpartitionId, 
segmentId);
+            FileSystem fs = writingSegmentPath.getFileSystem();
+            currentChannel =
+                    Channels.newChannel(
+                            fs.create(writingSegmentPath, 
FileSystem.WriteMode.NO_OVERWRITE));
+            SegmentPartitionFile.writeBuffers(currentChannel, expectedBytes, 
bufferWithHeaders);
+            subpartitionChannels[subpartitionId] = currentChannel;

Review Comment:
   ```java
   private void writeBuffers(
               TieredStoragePartitionId partitionId,
               int subpartitionId,
               int segmentId,
               List<Tuple2<Buffer, Integer>> buffersToFlush,
               long expectedBytes)
               throws IOException {
           WritableByteChannel currentChannel =
                   getOrInitSubpartitionChannel(partitionId, subpartitionId, 
segmentId);
           SegmentPartitionFile.writeBuffers(
                   currentChannel, expectedBytes, 
generateBufferWithHeaders(buffersToFlush));
       }
   
       private WritableByteChannel getOrInitSubpartitionChannel(
               TieredStoragePartitionId partitionId, int subpartitionId, int 
segmentId)
               throws IOException {
           WritableByteChannel currentChannel = 
subpartitionChannels[subpartitionId];
           if (currentChannel == null) {
               Path writingSegmentPath =
                       getSegmentPath(basePath, partitionId, subpartitionId, 
segmentId);
               FileSystem fs = writingSegmentPath.getFileSystem();
               currentChannel =
                       Channels.newChannel(
                               fs.create(writingSegmentPath, 
FileSystem.WriteMode.NO_OVERWRITE));
               subpartitionChannels[subpartitionId] = currentChannel;
           }
           return currentChannel;
       }
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/SegmentPartitionFile.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The partition file with segment file mode. In this mode, each segment of 
one subpartition is
+ * written to an independent file.
+ */
+public class SegmentPartitionFile {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SegmentPartitionFile.class);
+
+    static final String TIERED_STORAGE_DIR = "tiered-storage";
+
+    static final String SEGMENT_FILE_PREFIX = "seg-";
+
+    static final String SEGMENT_FINISH_DIR_NAME = "FINISH";
+
+    public static SegmentPartitionFileWriter createPartitionFileWriter(
+            String dataFilePath, int numSubpartitions) {
+        return new SegmentPartitionFileWriter(dataFilePath, numSubpartitions);
+    }
+
+    public static SegmentPartitionFileReader createPartitionFileReader(String 
dataFilePath) {
+        return new SegmentPartitionFileReader(dataFilePath);
+    }
+
+    // ------------------------------------------------------------------------
+    //  File-related utilities
+    // ------------------------------------------------------------------------
+
+    public static String getTieredStoragePath(String basePath) {
+        return String.format("%s/%s", basePath, TIERED_STORAGE_DIR);
+    }
+
+    public static String getPartitionPath(TieredStoragePartitionId 
partitionId, String basePath) {
+        if (basePath == null) {
+            return null;
+        }
+
+        while (basePath.endsWith("/") && basePath.length() > 1) {
+            basePath = basePath.substring(0, basePath.length() - 1);
+        }
+        return String.format("%s/%s", basePath, 
TieredStorageIdMappingUtils.convertId(partitionId));
+    }
+
+    public static String getSubpartitionPath(
+            String basePath, TieredStoragePartitionId partitionId, int 
subpartitionId) {
+        while (basePath.endsWith("/") && basePath.length() > 1) {
+            basePath = basePath.substring(0, basePath.length() - 1);
+        }
+        return String.format(
+                "%s/%s/%s",
+                basePath, TieredStorageIdMappingUtils.convertId(partitionId), 
subpartitionId);
+    }
+
+    public static Path getSegmentPath(
+            String basePath,
+            TieredStoragePartitionId partitionId,
+            int subpartitionId,
+            long segmentId) {
+        String subpartitionPath = getSubpartitionPath(basePath, partitionId, 
subpartitionId);
+        return new Path(subpartitionPath, SEGMENT_FILE_PREFIX + segmentId);
+    }
+
+    public static Path getSegmentFinishDirPath(
+            String basePath, TieredStoragePartitionId partitionId, int 
subpartitionId) {
+        String subpartitionPath = getSubpartitionPath(basePath, partitionId, 
subpartitionId);
+        return new Path(subpartitionPath, SEGMENT_FINISH_DIR_NAME);
+    }
+
+    public static void writeBuffers(
+            WritableByteChannel writeChannel, long expectedBytes, ByteBuffer[] 
bufferWithHeaders)
+            throws IOException {
+        int writeSize = 0;
+        for (ByteBuffer bufferWithHeader : bufferWithHeaders) {
+            writeSize += writeChannel.write(bufferWithHeader);
+        }
+        checkState(writeSize == expectedBytes, "Wong number of written 
bytes.");
+    }
+
+    public static void writeSegmentFinishFile(
+            String basePath,
+            TieredStoragePartitionId partitionId,
+            int subpartitionId,
+            int segmentId)
+            throws IOException {
+        Path segmentFinishDir = getSegmentFinishDirPath(basePath, partitionId, 
subpartitionId);
+        FileSystem fs = segmentFinishDir.getFileSystem();
+        Path segmentFinishFile = new Path(segmentFinishDir, 
String.valueOf(segmentId));
+        if (!fs.exists(segmentFinishDir)) {
+            fs.mkdirs(segmentFinishDir);
+            OutputStream outputStream =
+                    fs.create(segmentFinishFile, 
FileSystem.WriteMode.OVERWRITE);
+            outputStream.close();
+            return;
+        }
+
+        FileStatus[] files = fs.listStatus(segmentFinishDir);
+        if (files.length == 0) {
+            OutputStream outputStream =
+                    fs.create(segmentFinishFile, 
FileSystem.WriteMode.OVERWRITE);
+            outputStream.close();
+        } else {
+            // Note that this check requires the file system to ensure that 
only one file is in the
+            // directory can be accessed when renaming a file.
+            checkState(files.length == 1, "Wong number of segment-finish 
files.");
+            fs.rename(files[0].getPath(), segmentFinishFile);
+        }
+    }
+
+    public static void deletePathQuietly(String toDelete) {
+        if (toDelete == null) {

Review Comment:
   When will this be null.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/SegmentPartitionFile.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The partition file with segment file mode. In this mode, each segment of 
one subpartition is
+ * written to an independent file.
+ */
+public class SegmentPartitionFile {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SegmentPartitionFile.class);
+
+    static final String TIERED_STORAGE_DIR = "tiered-storage";
+
+    static final String SEGMENT_FILE_PREFIX = "seg-";
+
+    static final String SEGMENT_FINISH_DIR_NAME = "FINISH";
+
+    public static SegmentPartitionFileWriter createPartitionFileWriter(
+            String dataFilePath, int numSubpartitions) {
+        return new SegmentPartitionFileWriter(dataFilePath, numSubpartitions);
+    }
+
+    public static SegmentPartitionFileReader createPartitionFileReader(String 
dataFilePath) {
+        return new SegmentPartitionFileReader(dataFilePath);
+    }
+
+    // ------------------------------------------------------------------------
+    //  File-related utilities
+    // ------------------------------------------------------------------------
+
+    public static String getTieredStoragePath(String basePath) {
+        return String.format("%s/%s", basePath, TIERED_STORAGE_DIR);
+    }
+
+    public static String getPartitionPath(TieredStoragePartitionId 
partitionId, String basePath) {
+        if (basePath == null) {
+            return null;
+        }
+
+        while (basePath.endsWith("/") && basePath.length() > 1) {
+            basePath = basePath.substring(0, basePath.length() - 1);
+        }
+        return String.format("%s/%s", basePath, 
TieredStorageIdMappingUtils.convertId(partitionId));
+    }
+
+    public static String getSubpartitionPath(
+            String basePath, TieredStoragePartitionId partitionId, int 
subpartitionId) {
+        while (basePath.endsWith("/") && basePath.length() > 1) {
+            basePath = basePath.substring(0, basePath.length() - 1);
+        }
+        return String.format(
+                "%s/%s/%s",
+                basePath, TieredStorageIdMappingUtils.convertId(partitionId), 
subpartitionId);
+    }
+
+    public static Path getSegmentPath(
+            String basePath,
+            TieredStoragePartitionId partitionId,
+            int subpartitionId,
+            long segmentId) {
+        String subpartitionPath = getSubpartitionPath(basePath, partitionId, 
subpartitionId);
+        return new Path(subpartitionPath, SEGMENT_FILE_PREFIX + segmentId);
+    }
+
+    public static Path getSegmentFinishDirPath(
+            String basePath, TieredStoragePartitionId partitionId, int 
subpartitionId) {
+        String subpartitionPath = getSubpartitionPath(basePath, partitionId, 
subpartitionId);
+        return new Path(subpartitionPath, SEGMENT_FINISH_DIR_NAME);
+    }
+
+    public static void writeBuffers(
+            WritableByteChannel writeChannel, long expectedBytes, ByteBuffer[] 
bufferWithHeaders)
+            throws IOException {
+        int writeSize = 0;
+        for (ByteBuffer bufferWithHeader : bufferWithHeaders) {
+            writeSize += writeChannel.write(bufferWithHeader);
+        }
+        checkState(writeSize == expectedBytes, "Wong number of written 
bytes.");
+    }
+
+    public static void writeSegmentFinishFile(
+            String basePath,
+            TieredStoragePartitionId partitionId,
+            int subpartitionId,
+            int segmentId)
+            throws IOException {
+        Path segmentFinishDir = getSegmentFinishDirPath(basePath, partitionId, 
subpartitionId);
+        FileSystem fs = segmentFinishDir.getFileSystem();
+        Path segmentFinishFile = new Path(segmentFinishDir, 
String.valueOf(segmentId));
+        if (!fs.exists(segmentFinishDir)) {
+            fs.mkdirs(segmentFinishDir);
+            OutputStream outputStream =
+                    fs.create(segmentFinishFile, 
FileSystem.WriteMode.OVERWRITE);
+            outputStream.close();
+            return;
+        }
+
+        FileStatus[] files = fs.listStatus(segmentFinishDir);
+        if (files.length == 0) {
+            OutputStream outputStream =
+                    fs.create(segmentFinishFile, 
FileSystem.WriteMode.OVERWRITE);
+            outputStream.close();
+        } else {
+            // Note that this check requires the file system to ensure that 
only one file is in the
+            // directory can be accessed when renaming a file.
+            checkState(files.length == 1, "Wong number of segment-finish 
files.");
+            fs.rename(files[0].getPath(), segmentFinishFile);

Review Comment:
   IIUC, Each subpartition only have sinlge segment-finish file to minimize the 
number of files. But we absolutely need to add some necessary explanations 
here, otherwise the renaming here will be confusing.
   



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/SubpartitionRemoteCacheManager.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileWriter;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import net.jcip.annotations.GuardedBy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This {@link SubpartitionRemoteCacheManager} is responsible for managing the 
buffers in a single
+ * subpartition.
+ */
+class SubpartitionRemoteCacheManager {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SubpartitionRemoteCacheManager.class);
+
+    private final TieredStoragePartitionId partitionId;
+
+    private final int subpartitionId;
+
+    private final PartitionFileWriter partitionFileWriter;
+
+    /**
+     * All the buffers. The first field of the tuple is the buffer, while the 
second field of the
+     * buffer is the buffer index.

Review Comment:
   ```suggestion
        * tuple is the buffer index.
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/SubpartitionRemoteCacheManager.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileWriter;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import net.jcip.annotations.GuardedBy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This {@link SubpartitionRemoteCacheManager} is responsible for managing the 
buffers in a single
+ * subpartition.
+ */
+class SubpartitionRemoteCacheManager {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SubpartitionRemoteCacheManager.class);
+
+    private final TieredStoragePartitionId partitionId;
+
+    private final int subpartitionId;
+
+    private final PartitionFileWriter partitionFileWriter;
+
+    /**
+     * All the buffers. The first field of the tuple is the buffer, while the 
second field of the
+     * buffer is the buffer index.
+     *
+     * <p>Note that this field can be accessed by the task thread or the write 
IO thread, so the
+     * thread safety should be ensured.
+     */
+    private final Deque<Tuple2<Buffer, Integer>> allBuffers = new 
LinkedList<>();
+
+    private CompletableFuture<Void> flushCompletableFuture = 
FutureUtils.completedVoidFuture();
+
+    /**
+     * Record the segment id that is writing to.
+     *
+     * <p>Note that when flushing buffers, this can be touched by task thread 
or the flushing
+     * thread, so the thread safety should be ensured.
+     */
+    @GuardedBy("allBuffers")
+    private int segmentId = -1;
+
+    /**
+     * Record the buffer index in the {@link SubpartitionRemoteCacheManager}. 
Each time a new buffer
+     * is added to the {@code allBuffers}, this field is increased by one.
+     *
+     * <p>Note that the field can only be touched by the task thread, so this 
field need not be
+     * guarded by any lock or synchronizations.
+     */
+    private int bufferIndex;
+
+    public SubpartitionRemoteCacheManager(
+            TieredStoragePartitionId partitionId,
+            int subpartitionId,
+            TieredStorageMemoryManager storageMemoryManager,
+            PartitionFileWriter partitionFileWriter) {
+        this.partitionId = partitionId;
+        this.subpartitionId = subpartitionId;
+        this.partitionFileWriter = partitionFileWriter;
+        storageMemoryManager.listenBufferReclaimRequest(this::flushBuffers);
+    }
+
+    // ------------------------------------------------------------------------
+    //  Called by RemoteCacheManager
+    // ------------------------------------------------------------------------
+
+    void startSegment(int segmentId) {
+        synchronized (allBuffers) {
+            checkState(allBuffers.isEmpty(), "There are un-flushed buffers.");
+            this.segmentId = segmentId;
+        }
+    }
+
+    void addBuffer(Buffer buffer) {
+        Tuple2<Buffer, Integer> toAddBuffer = new Tuple2<>(buffer, 
bufferIndex++);
+        synchronized (allBuffers) {
+            allBuffers.add(toAddBuffer);
+        }
+    }
+
+    void finishSegment(int segmentId) {
+        synchronized (allBuffers) {
+            checkState(this.segmentId == segmentId, "Wrong segment id.");

Review Comment:
   Is this just a sanity-check? This lock will result in not necessary overhead.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/SubpartitionRemoteCacheManager.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileWriter;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import net.jcip.annotations.GuardedBy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This {@link SubpartitionRemoteCacheManager} is responsible for managing the 
buffers in a single
+ * subpartition.
+ */
+class SubpartitionRemoteCacheManager {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SubpartitionRemoteCacheManager.class);
+
+    private final TieredStoragePartitionId partitionId;
+
+    private final int subpartitionId;
+
+    private final PartitionFileWriter partitionFileWriter;
+
+    /**
+     * All the buffers. The first field of the tuple is the buffer, while the 
second field of the
+     * buffer is the buffer index.
+     *
+     * <p>Note that this field can be accessed by the task thread or the write 
IO thread, so the
+     * thread safety should be ensured.
+     */
+    private final Deque<Tuple2<Buffer, Integer>> allBuffers = new 
LinkedList<>();
+
+    private CompletableFuture<Void> flushCompletableFuture = 
FutureUtils.completedVoidFuture();
+
+    /**
+     * Record the segment id that is writing to.
+     *
+     * <p>Note that when flushing buffers, this can be touched by task thread 
or the flushing
+     * thread, so the thread safety should be ensured.
+     */
+    @GuardedBy("allBuffers")
+    private int segmentId = -1;
+
+    /**
+     * Record the buffer index in the {@link SubpartitionRemoteCacheManager}. 
Each time a new buffer
+     * is added to the {@code allBuffers}, this field is increased by one.
+     *
+     * <p>Note that the field can only be touched by the task thread, so this 
field need not be
+     * guarded by any lock or synchronizations.
+     */
+    private int bufferIndex;
+
+    public SubpartitionRemoteCacheManager(
+            TieredStoragePartitionId partitionId,
+            int subpartitionId,
+            TieredStorageMemoryManager storageMemoryManager,
+            PartitionFileWriter partitionFileWriter) {
+        this.partitionId = partitionId;
+        this.subpartitionId = subpartitionId;
+        this.partitionFileWriter = partitionFileWriter;
+        storageMemoryManager.listenBufferReclaimRequest(this::flushBuffers);
+    }
+
+    // ------------------------------------------------------------------------
+    //  Called by RemoteCacheManager
+    // ------------------------------------------------------------------------
+
+    void startSegment(int segmentId) {
+        synchronized (allBuffers) {
+            checkState(allBuffers.isEmpty(), "There are un-flushed buffers.");
+            this.segmentId = segmentId;
+        }
+    }
+
+    void addBuffer(Buffer buffer) {
+        Tuple2<Buffer, Integer> toAddBuffer = new Tuple2<>(buffer, 
bufferIndex++);
+        synchronized (allBuffers) {
+            allBuffers.add(toAddBuffer);
+        }
+    }
+
+    void finishSegment(int segmentId) {
+        synchronized (allBuffers) {
+            checkState(this.segmentId == segmentId, "Wrong segment id.");
+        }
+        // Flush the buffers belonging to the current segment
+        flushBuffers();
+
+        PartitionFileWriter.SubpartitionBufferContext bufferContext =
+                new PartitionFileWriter.SubpartitionBufferContext(
+                        subpartitionId,
+                        Collections.singletonList(
+                                new PartitionFileWriter.SegmentBufferContext(
+                                        segmentId, Collections.emptyList(), 
true)));
+        // Notify the partition file writer that the segment is finished 
through writing the
+        // buffer context
+        flushCompletableFuture =
+                partitionFileWriter.write(partitionId, 
Collections.singletonList(bufferContext));
+        checkState(allBuffers.isEmpty(), "Leaking buffers.");
+    }
+
+    void close() {
+        // Wait the flushing buffers to be completed before closed
+        try {
+            flushCompletableFuture.get();
+        } catch (Exception e) {
+            LOG.error("Failed to flush the buffers.", e);
+            ExceptionUtils.rethrow(e);
+        }
+        flushBuffers();
+    }
+
+    /** Release all buffers. */
+    void release() {
+        recycleBuffers();
+        checkState(allBuffers.isEmpty(), "Leaking buffers.");

Review Comment:
   Is this access to `allBuffers` thread safe?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/SubpartitionRemoteCacheManager.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileWriter;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import net.jcip.annotations.GuardedBy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This {@link SubpartitionRemoteCacheManager} is responsible for managing the 
buffers in a single
+ * subpartition.
+ */
+class SubpartitionRemoteCacheManager {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SubpartitionRemoteCacheManager.class);
+
+    private final TieredStoragePartitionId partitionId;
+
+    private final int subpartitionId;
+
+    private final PartitionFileWriter partitionFileWriter;
+
+    /**
+     * All the buffers. The first field of the tuple is the buffer, while the 
second field of the
+     * buffer is the buffer index.
+     *
+     * <p>Note that this field can be accessed by the task thread or the write 
IO thread, so the
+     * thread safety should be ensured.
+     */
+    private final Deque<Tuple2<Buffer, Integer>> allBuffers = new 
LinkedList<>();

Review Comment:
   ```suggestion
       @GuardedBy("allBuffers")
       private final Deque<Tuple2<Buffer, Integer>> allBuffers = new 
LinkedList<>();
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/SegmentPartitionFileTest.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Random;
+
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.SegmentPartitionFile.SEGMENT_FILE_PREFIX;
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.SegmentPartitionFile.SEGMENT_FINISH_DIR_NAME;
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.SegmentPartitionFile.TIERED_STORAGE_DIR;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link SegmentPartitionFile}. */
+class SegmentPartitionFileTest {
+
+    @TempDir Path tempFolder;
+
+    @Test
+    void testGetTieredStoragePath() {
+        String tieredStoragePath =
+                
SegmentPartitionFile.getTieredStoragePath(tempFolder.toFile().getPath());
+        assertThat(tieredStoragePath)
+                .isEqualTo(new File(tempFolder.toFile().getPath(), 
TIERED_STORAGE_DIR).getPath());
+    }
+
+    @Test
+    void testGetPartitionPath() {
+        TieredStoragePartitionId partitionId =
+                TieredStorageIdMappingUtils.convertId(new ResultPartitionID());
+        String partitionPath =
+                SegmentPartitionFile.getPartitionPath(partitionId, 
tempFolder.toFile().getPath());
+
+        File partitionFile =
+                new File(
+                        tempFolder.toFile().getPath(),
+                        
TieredStorageIdMappingUtils.convertId(partitionId).toString());
+        assertThat(partitionPath).isEqualTo(partitionFile.getPath());
+    }
+
+    @Test
+    void testGetSubpartitionPath() {
+        TieredStoragePartitionId partitionId =
+                TieredStorageIdMappingUtils.convertId(new ResultPartitionID());
+        int subpartitionId = 0;
+
+        String subpartitionPath =
+                SegmentPartitionFile.getSubpartitionPath(
+                        tempFolder.toFile().getPath(), partitionId, 
subpartitionId);
+        File partitionFile =
+                new File(
+                        tempFolder.toFile().getPath(),
+                        
TieredStorageIdMappingUtils.convertId(partitionId).toString());
+        assertThat(subpartitionPath)
+                .isEqualTo(new File(partitionFile, 
String.valueOf(subpartitionId)).toString());
+    }
+
+    @Test
+    void testGetSegmentPath() {
+        TieredStoragePartitionId partitionId =
+                TieredStorageIdMappingUtils.convertId(new ResultPartitionID());
+        int subpartitionId = 0;
+        int segmentId = 1;
+
+        String segmentPath =
+                SegmentPartitionFile.getSegmentPath(
+                                tempFolder.toFile().getPath(),
+                                partitionId,
+                                subpartitionId,
+                                segmentId)
+                        .toString();
+
+        File partitionFile =
+                new File(
+                        tempFolder.toFile().getPath(),
+                        
TieredStorageIdMappingUtils.convertId(partitionId).toString());
+        File subpartitionFile = new File(partitionFile, 
String.valueOf(subpartitionId));
+        assertThat(segmentPath)
+                .isEqualTo(new File(subpartitionFile, SEGMENT_FILE_PREFIX + 
segmentId).toString());
+    }
+
+    @Test
+    void testGetSegmentFinishDirPath() {
+        TieredStoragePartitionId partitionId =
+                TieredStorageIdMappingUtils.convertId(new ResultPartitionID());
+        int subpartitionId = 0;
+
+        String segmentFinishDirPath =
+                SegmentPartitionFile.getSegmentFinishDirPath(
+                                tempFolder.toFile().getPath(), partitionId, 
subpartitionId)
+                        .getPath();
+        File expectedSegmentFinishDir =
+                getSegmentFinishDir(tempFolder.toFile().getPath(), 
partitionId, subpartitionId);
+        
assertThat(segmentFinishDirPath).isEqualTo(expectedSegmentFinishDir.getPath());
+    }
+
+    @Test
+    void testWriteBuffers() throws IOException {
+        Random random = new Random();
+        int numBuffers = 20;
+        int bufferSizeBytes = 10;
+
+        File testFile = new File(tempFolder.toFile().getPath(), "testFile");
+        org.apache.flink.core.fs.Path testPath =
+                org.apache.flink.core.fs.Path.fromLocalFile(testFile);
+        FileSystem fs = testPath.getFileSystem();
+        WritableByteChannel currentChannel =
+                Channels.newChannel(fs.create(testPath, 
FileSystem.WriteMode.NO_OVERWRITE));
+
+        ByteBuffer[] toWriteBuffers = new ByteBuffer[numBuffers];
+        for (int i = 0; i < numBuffers; i++) {
+            byte[] bytes = new byte[bufferSizeBytes];
+            random.nextBytes(bytes);
+            toWriteBuffers[i] = ByteBuffer.wrap(bytes);
+        }
+        int numExpectedBytes = numBuffers * bufferSizeBytes;
+        SegmentPartitionFile.writeBuffers(currentChannel, numExpectedBytes, 
toWriteBuffers);
+
+        byte[] bytesRead = Files.readAllBytes(testFile.toPath());
+        assertThat(bytesRead).hasSize(numExpectedBytes);
+    }
+
+    @Test
+    void testWriteSegmentFinishFile() throws IOException {
+        TieredStoragePartitionId partitionId =
+                TieredStorageIdMappingUtils.convertId(new ResultPartitionID());
+        int subpartitionId = 0;
+        int segmentId = 1;
+        int newSegmentId = 5;
+
+        writeAndCheckSegmentFinishFile(
+                tempFolder.toFile().getPath(), partitionId, subpartitionId, 
segmentId);
+        writeAndCheckSegmentFinishFile(
+                tempFolder.toFile().getPath(), partitionId, subpartitionId, 
newSegmentId);
+    }
+
+    @Test
+    void testDeletePathQuietly() throws IOException {
+        File testFile = new File(tempFolder.toFile().getPath(), "testFile");
+        Files.createFile(testFile.toPath());
+        assertThat(testFile.exists()).isTrue();

Review Comment:
   ```suggestion
           assertThat(testFile).exists();
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/SegmentPartitionFileTest.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Random;
+
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.SegmentPartitionFile.SEGMENT_FILE_PREFIX;
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.SegmentPartitionFile.SEGMENT_FINISH_DIR_NAME;
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.SegmentPartitionFile.TIERED_STORAGE_DIR;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link SegmentPartitionFile}. */
+class SegmentPartitionFileTest {
+
+    @TempDir Path tempFolder;
+
+    @Test
+    void testGetTieredStoragePath() {
+        String tieredStoragePath =
+                
SegmentPartitionFile.getTieredStoragePath(tempFolder.toFile().getPath());
+        assertThat(tieredStoragePath)
+                .isEqualTo(new File(tempFolder.toFile().getPath(), 
TIERED_STORAGE_DIR).getPath());
+    }
+
+    @Test
+    void testGetPartitionPath() {
+        TieredStoragePartitionId partitionId =
+                TieredStorageIdMappingUtils.convertId(new ResultPartitionID());
+        String partitionPath =
+                SegmentPartitionFile.getPartitionPath(partitionId, 
tempFolder.toFile().getPath());
+
+        File partitionFile =
+                new File(
+                        tempFolder.toFile().getPath(),
+                        
TieredStorageIdMappingUtils.convertId(partitionId).toString());
+        assertThat(partitionPath).isEqualTo(partitionFile.getPath());
+    }
+
+    @Test
+    void testGetSubpartitionPath() {
+        TieredStoragePartitionId partitionId =
+                TieredStorageIdMappingUtils.convertId(new ResultPartitionID());
+        int subpartitionId = 0;
+
+        String subpartitionPath =
+                SegmentPartitionFile.getSubpartitionPath(
+                        tempFolder.toFile().getPath(), partitionId, 
subpartitionId);
+        File partitionFile =
+                new File(
+                        tempFolder.toFile().getPath(),
+                        
TieredStorageIdMappingUtils.convertId(partitionId).toString());
+        assertThat(subpartitionPath)
+                .isEqualTo(new File(partitionFile, 
String.valueOf(subpartitionId)).toString());
+    }
+
+    @Test
+    void testGetSegmentPath() {
+        TieredStoragePartitionId partitionId =
+                TieredStorageIdMappingUtils.convertId(new ResultPartitionID());
+        int subpartitionId = 0;
+        int segmentId = 1;
+
+        String segmentPath =
+                SegmentPartitionFile.getSegmentPath(
+                                tempFolder.toFile().getPath(),
+                                partitionId,
+                                subpartitionId,
+                                segmentId)
+                        .toString();
+
+        File partitionFile =
+                new File(
+                        tempFolder.toFile().getPath(),
+                        
TieredStorageIdMappingUtils.convertId(partitionId).toString());
+        File subpartitionFile = new File(partitionFile, 
String.valueOf(subpartitionId));
+        assertThat(segmentPath)
+                .isEqualTo(new File(subpartitionFile, SEGMENT_FILE_PREFIX + 
segmentId).toString());
+    }
+
+    @Test
+    void testGetSegmentFinishDirPath() {
+        TieredStoragePartitionId partitionId =
+                TieredStorageIdMappingUtils.convertId(new ResultPartitionID());
+        int subpartitionId = 0;
+
+        String segmentFinishDirPath =
+                SegmentPartitionFile.getSegmentFinishDirPath(
+                                tempFolder.toFile().getPath(), partitionId, 
subpartitionId)
+                        .getPath();
+        File expectedSegmentFinishDir =
+                getSegmentFinishDir(tempFolder.toFile().getPath(), 
partitionId, subpartitionId);
+        
assertThat(segmentFinishDirPath).isEqualTo(expectedSegmentFinishDir.getPath());
+    }
+
+    @Test
+    void testWriteBuffers() throws IOException {
+        Random random = new Random();
+        int numBuffers = 20;
+        int bufferSizeBytes = 10;
+
+        File testFile = new File(tempFolder.toFile().getPath(), "testFile");
+        org.apache.flink.core.fs.Path testPath =
+                org.apache.flink.core.fs.Path.fromLocalFile(testFile);
+        FileSystem fs = testPath.getFileSystem();
+        WritableByteChannel currentChannel =
+                Channels.newChannel(fs.create(testPath, 
FileSystem.WriteMode.NO_OVERWRITE));
+
+        ByteBuffer[] toWriteBuffers = new ByteBuffer[numBuffers];
+        for (int i = 0; i < numBuffers; i++) {
+            byte[] bytes = new byte[bufferSizeBytes];
+            random.nextBytes(bytes);
+            toWriteBuffers[i] = ByteBuffer.wrap(bytes);
+        }
+        int numExpectedBytes = numBuffers * bufferSizeBytes;
+        SegmentPartitionFile.writeBuffers(currentChannel, numExpectedBytes, 
toWriteBuffers);
+
+        byte[] bytesRead = Files.readAllBytes(testFile.toPath());
+        assertThat(bytesRead).hasSize(numExpectedBytes);
+    }
+
+    @Test
+    void testWriteSegmentFinishFile() throws IOException {
+        TieredStoragePartitionId partitionId =
+                TieredStorageIdMappingUtils.convertId(new ResultPartitionID());
+        int subpartitionId = 0;
+        int segmentId = 1;
+        int newSegmentId = 5;
+
+        writeAndCheckSegmentFinishFile(
+                tempFolder.toFile().getPath(), partitionId, subpartitionId, 
segmentId);
+        writeAndCheckSegmentFinishFile(
+                tempFolder.toFile().getPath(), partitionId, subpartitionId, 
newSegmentId);
+    }
+
+    @Test
+    void testDeletePathQuietly() throws IOException {
+        File testFile = new File(tempFolder.toFile().getPath(), "testFile");
+        Files.createFile(testFile.toPath());
+        assertThat(testFile.exists()).isTrue();
+        SegmentPartitionFile.deletePathQuietly(testFile.getPath());
+        assertThat(testFile.exists()).isFalse();
+    }
+
+    private static void writeAndCheckSegmentFinishFile(
+            String baseDir, TieredStoragePartitionId partitionId, int 
subpartitionId, int segmentId)
+            throws IOException {
+
+        SegmentPartitionFile.writeSegmentFinishFile(
+                baseDir, partitionId, subpartitionId, segmentId);
+        File segmentFinishDir = getSegmentFinishDir(baseDir, partitionId, 
subpartitionId);
+        assertThat(segmentFinishDir.isDirectory()).isTrue();
+        File[] segmentFinishFiles = segmentFinishDir.listFiles();
+        assertThat(segmentFinishFiles).hasSize(1);
+        assertThat(segmentFinishFiles[0].isFile()).isTrue();

Review Comment:
   ```suggestion
           assertThat(segmentFinishFiles[0]).isFile();
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/SegmentPartitionFileTest.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Random;
+
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.SegmentPartitionFile.SEGMENT_FILE_PREFIX;
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.SegmentPartitionFile.SEGMENT_FINISH_DIR_NAME;
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.SegmentPartitionFile.TIERED_STORAGE_DIR;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link SegmentPartitionFile}. */
+class SegmentPartitionFileTest {
+
+    @TempDir Path tempFolder;
+
+    @Test
+    void testGetTieredStoragePath() {
+        String tieredStoragePath =
+                
SegmentPartitionFile.getTieredStoragePath(tempFolder.toFile().getPath());
+        assertThat(tieredStoragePath)
+                .isEqualTo(new File(tempFolder.toFile().getPath(), 
TIERED_STORAGE_DIR).getPath());
+    }
+
+    @Test
+    void testGetPartitionPath() {
+        TieredStoragePartitionId partitionId =
+                TieredStorageIdMappingUtils.convertId(new ResultPartitionID());
+        String partitionPath =
+                SegmentPartitionFile.getPartitionPath(partitionId, 
tempFolder.toFile().getPath());
+
+        File partitionFile =
+                new File(
+                        tempFolder.toFile().getPath(),
+                        
TieredStorageIdMappingUtils.convertId(partitionId).toString());
+        assertThat(partitionPath).isEqualTo(partitionFile.getPath());
+    }
+
+    @Test
+    void testGetSubpartitionPath() {
+        TieredStoragePartitionId partitionId =
+                TieredStorageIdMappingUtils.convertId(new ResultPartitionID());
+        int subpartitionId = 0;
+
+        String subpartitionPath =
+                SegmentPartitionFile.getSubpartitionPath(
+                        tempFolder.toFile().getPath(), partitionId, 
subpartitionId);
+        File partitionFile =
+                new File(
+                        tempFolder.toFile().getPath(),
+                        
TieredStorageIdMappingUtils.convertId(partitionId).toString());
+        assertThat(subpartitionPath)
+                .isEqualTo(new File(partitionFile, 
String.valueOf(subpartitionId)).toString());
+    }
+
+    @Test
+    void testGetSegmentPath() {
+        TieredStoragePartitionId partitionId =
+                TieredStorageIdMappingUtils.convertId(new ResultPartitionID());
+        int subpartitionId = 0;
+        int segmentId = 1;
+
+        String segmentPath =
+                SegmentPartitionFile.getSegmentPath(
+                                tempFolder.toFile().getPath(),
+                                partitionId,
+                                subpartitionId,
+                                segmentId)
+                        .toString();
+
+        File partitionFile =
+                new File(
+                        tempFolder.toFile().getPath(),
+                        
TieredStorageIdMappingUtils.convertId(partitionId).toString());
+        File subpartitionFile = new File(partitionFile, 
String.valueOf(subpartitionId));
+        assertThat(segmentPath)
+                .isEqualTo(new File(subpartitionFile, SEGMENT_FILE_PREFIX + 
segmentId).toString());
+    }
+
+    @Test
+    void testGetSegmentFinishDirPath() {
+        TieredStoragePartitionId partitionId =
+                TieredStorageIdMappingUtils.convertId(new ResultPartitionID());
+        int subpartitionId = 0;
+
+        String segmentFinishDirPath =
+                SegmentPartitionFile.getSegmentFinishDirPath(
+                                tempFolder.toFile().getPath(), partitionId, 
subpartitionId)
+                        .getPath();
+        File expectedSegmentFinishDir =
+                getSegmentFinishDir(tempFolder.toFile().getPath(), 
partitionId, subpartitionId);
+        
assertThat(segmentFinishDirPath).isEqualTo(expectedSegmentFinishDir.getPath());
+    }
+
+    @Test
+    void testWriteBuffers() throws IOException {
+        Random random = new Random();
+        int numBuffers = 20;
+        int bufferSizeBytes = 10;
+
+        File testFile = new File(tempFolder.toFile().getPath(), "testFile");
+        org.apache.flink.core.fs.Path testPath =
+                org.apache.flink.core.fs.Path.fromLocalFile(testFile);
+        FileSystem fs = testPath.getFileSystem();
+        WritableByteChannel currentChannel =
+                Channels.newChannel(fs.create(testPath, 
FileSystem.WriteMode.NO_OVERWRITE));
+
+        ByteBuffer[] toWriteBuffers = new ByteBuffer[numBuffers];
+        for (int i = 0; i < numBuffers; i++) {
+            byte[] bytes = new byte[bufferSizeBytes];
+            random.nextBytes(bytes);
+            toWriteBuffers[i] = ByteBuffer.wrap(bytes);
+        }
+        int numExpectedBytes = numBuffers * bufferSizeBytes;
+        SegmentPartitionFile.writeBuffers(currentChannel, numExpectedBytes, 
toWriteBuffers);
+
+        byte[] bytesRead = Files.readAllBytes(testFile.toPath());
+        assertThat(bytesRead).hasSize(numExpectedBytes);
+    }
+
+    @Test
+    void testWriteSegmentFinishFile() throws IOException {
+        TieredStoragePartitionId partitionId =
+                TieredStorageIdMappingUtils.convertId(new ResultPartitionID());
+        int subpartitionId = 0;
+        int segmentId = 1;
+        int newSegmentId = 5;
+
+        writeAndCheckSegmentFinishFile(
+                tempFolder.toFile().getPath(), partitionId, subpartitionId, 
segmentId);
+        writeAndCheckSegmentFinishFile(
+                tempFolder.toFile().getPath(), partitionId, subpartitionId, 
newSegmentId);
+    }
+
+    @Test
+    void testDeletePathQuietly() throws IOException {
+        File testFile = new File(tempFolder.toFile().getPath(), "testFile");
+        Files.createFile(testFile.toPath());
+        assertThat(testFile.exists()).isTrue();
+        SegmentPartitionFile.deletePathQuietly(testFile.getPath());
+        assertThat(testFile.exists()).isFalse();

Review Comment:
   ```suggestion
           assertThat(testFile).doesNotExist();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to