This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9cde5322969d7c92a76d615576f1cdd52454b33e
Author: Weijie Guo <[email protected]>
AuthorDate: Thu Jul 28 17:41:17 2022 +0800

    [hotfix] Rename HsResultPartitionReadScheduler to HsFileDataManager
     Rename HsResultPartitionReadScheduler to HsFileDataManager as it plays the 
same role of FileDataManager mentioned in FLIP.
---
 ...onReadScheduler.java => HsFileDataManager.java} | 20 ++++----
 ...hedulerTest.java => HsFileDataManagerTest.java} | 60 +++++++++++-----------
 2 files changed, 40 insertions(+), 40 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionReadScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
similarity index 95%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionReadScheduler.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
index ef742b61dee..dab671aa723 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionReadScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
@@ -53,12 +53,12 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
- * IO scheduler for HsResultPartition, which schedules {@link 
HsSubpartitionFileReaderImpl} for
+ * File data manager for HsResultPartition, which schedules {@link 
HsSubpartitionFileReaderImpl} for
  * loading data w.r.t. their offset in the file.
  */
 @ThreadSafe
-public class HsResultPartitionReadScheduler implements Runnable, 
BufferRecycler {
-    private static final Logger LOG = 
LoggerFactory.getLogger(HsResultPartitionReadScheduler.class);
+public class HsFileDataManager implements Runnable, BufferRecycler {
+    private static final Logger LOG = 
LoggerFactory.getLogger(HsFileDataManager.class);
 
     /** Executor to run the shuffle data reading task. */
     private final Executor ioExecutor;
@@ -76,8 +76,8 @@ public class HsResultPartitionReadScheduler implements 
Runnable, BufferRecycler
     private final Object lock = new Object();
 
     /**
-     * A {@link CompletableFuture} to be completed when this read scheduler 
including all resources
-     * is released.
+     * A {@link CompletableFuture} to be completed when this data manager 
including all resources is
+     * released.
      */
     @GuardedBy("lock")
     private final CompletableFuture<?> releaseFuture = new 
CompletableFuture<>();
@@ -112,14 +112,14 @@ public class HsResultPartitionReadScheduler implements 
Runnable, BufferRecycler
     @GuardedBy("lock")
     private volatile int numRequestedBuffers;
 
-    /** Whether this read scheduler has been released or not. */
+    /** Whether this file data manager has been released or not. */
     @GuardedBy("lock")
     private volatile boolean isReleased;
 
     @GuardedBy("lock")
     private FileChannel dataFileChannel;
 
-    public HsResultPartitionReadScheduler(
+    public HsFileDataManager(
             BatchShuffleReadBufferPool bufferPool,
             Executor ioExecutor,
             HsFileDataIndex dataIndex,
@@ -149,7 +149,7 @@ public class HsResultPartitionReadScheduler implements 
Runnable, BufferRecycler
     public HsSubpartitionFileReader registerNewSubpartition(
             int subpartitionId, HsSubpartitionViewInternalOperations 
operation) throws IOException {
         synchronized (lock) {
-            checkState(!isReleased, "HsResultPartitionReadScheduler is already 
released.");
+            checkState(!isReleased, "HsFileDataManager is already released.");
             lazyInitialize();
 
             HsSubpartitionFileReader subpartitionReader =
@@ -168,8 +168,8 @@ public class HsResultPartitionReadScheduler implements 
Runnable, BufferRecycler
     }
 
     /**
-     * Releases this read scheduler and returns a {@link CompletableFuture} 
which will be completed
-     * when all resources are released.
+     * Releases this file data manager and returns a {@link CompletableFuture} 
which will be
+     * completed when all resources are released.
      */
     public CompletableFuture<?> release() {
         synchronized (lock) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionReadSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java
similarity index 88%
rename from 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionReadSchedulerTest.java
rename to 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java
index c943665b3b0..1a39e14ca02 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionReadSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java
@@ -51,9 +51,9 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-/** Tests for {@link HsResultPartitionReadScheduler}. */
+/** Tests for {@link HsFileDataManager}. */
 @ExtendWith(TestLoggerExtension.class)
-class HsResultPartitionReadSchedulerTest {
+class HsFileDataManagerTest {
     private static final int BUFFER_SIZE = 1024;
 
     private static final int NUM_SUBPARTITIONS = 10;
@@ -70,7 +70,7 @@ class HsResultPartitionReadSchedulerTest {
 
     private Path dataFilePath;
 
-    private HsResultPartitionReadScheduler readScheduler;
+    private HsFileDataManager fileDataManager;
 
     private TestingSubpartitionViewInternalOperation subpartitionViewOperation;
 
@@ -85,8 +85,8 @@ class HsResultPartitionReadSchedulerTest {
         dataFilePath = Files.createFile(tempDir.resolve(".data"));
         dataFileChannel = openFileChannel(dataFilePath);
         factory = new TestingHsSubpartitionFileReader.Factory();
-        readScheduler =
-                new HsResultPartitionReadScheduler(
+        fileDataManager =
+                new HsFileDataManager(
                         bufferPool,
                         ioExecutor,
                         new HsFileDataIndexImpl(NUM_SUBPARTITIONS),
@@ -117,7 +117,7 @@ class HsResultPartitionReadSchedulerTest {
 
         assertThat(reader.readBuffers).isEmpty();
 
-        readScheduler.registerNewSubpartition(0, subpartitionViewOperation);
+        fileDataManager.registerNewSubpartition(0, subpartitionViewOperation);
 
         ioExecutor.trigger();
 
@@ -136,15 +136,15 @@ class HsResultPartitionReadSchedulerTest {
 
         factory.allReaders.add(reader);
 
-        readScheduler.registerNewSubpartition(0, subpartitionViewOperation);
+        fileDataManager.registerNewSubpartition(0, subpartitionViewOperation);
 
         ioExecutor.trigger();
 
         assertThat(reader.readBuffers).hasSize(BUFFER_POOL_SIZE);
         assertThat(bufferPool.getAvailableBuffers()).isZero();
 
-        readScheduler.recycle(reader.readBuffers.poll());
-        readScheduler.recycle(reader.readBuffers.poll());
+        fileDataManager.recycle(reader.readBuffers.poll());
+        fileDataManager.recycle(reader.readBuffers.poll());
 
         // recycle buffer will push new runnable to ioExecutor.
         ioExecutor.trigger();
@@ -163,12 +163,12 @@ class HsResultPartitionReadSchedulerTest {
                     assertThat(prepareForSchedulingFinished).isCompleted();
                     assertThat(requestedBuffers).hasSize(BUFFER_POOL_SIZE);
                     assertThat(bufferPool.getAvailableBuffers()).isEqualTo(0);
-                    // read one buffer, return another buffer to scheduler.
+                    // read one buffer, return another buffer to data manager.
                     readBuffers.add(requestedBuffers.poll());
                 });
         factory.allReaders.add(reader);
 
-        readScheduler.registerNewSubpartition(0, subpartitionViewOperation);
+        fileDataManager.registerNewSubpartition(0, subpartitionViewOperation);
 
         ioExecutor.trigger();
 
@@ -176,7 +176,7 @@ class HsResultPartitionReadSchedulerTest {
         assertThat(bufferPool.getAvailableBuffers()).isEqualTo(1);
     }
 
-    /** Test scheduler will schedule readers in order. */
+    /** Test file data manager will schedule readers in order. */
     @Test
     void testScheduleReadersOrdered() throws Exception {
         TestingHsSubpartitionFileReader reader1 = new 
TestingHsSubpartitionFileReader();
@@ -201,8 +201,8 @@ class HsResultPartitionReadSchedulerTest {
         factory.allReaders.add(reader1);
         factory.allReaders.add(reader2);
 
-        readScheduler.registerNewSubpartition(0, subpartitionViewOperation);
-        readScheduler.registerNewSubpartition(1, subpartitionViewOperation);
+        fileDataManager.registerNewSubpartition(0, subpartitionViewOperation);
+        fileDataManager.registerNewSubpartition(1, subpartitionViewOperation);
 
         // trigger run.
         ioExecutor.trigger();
@@ -218,8 +218,8 @@ class HsResultPartitionReadSchedulerTest {
         bufferPool.requestBuffers();
         assertThat(bufferPool.getAvailableBuffers()).isZero();
 
-        readScheduler =
-                new HsResultPartitionReadScheduler(
+        fileDataManager =
+                new HsFileDataManager(
                         bufferPool,
                         ioExecutor,
                         new HsFileDataIndexImpl(NUM_SUBPARTITIONS),
@@ -237,7 +237,7 @@ class HsResultPartitionReadSchedulerTest {
         reader.setFailConsumer((cause::complete));
         factory.allReaders.add(reader);
 
-        readScheduler.registerNewSubpartition(0, subpartitionViewOperation);
+        fileDataManager.registerNewSubpartition(0, subpartitionViewOperation);
 
         ioExecutor.trigger();
 
@@ -263,7 +263,7 @@ class HsResultPartitionReadSchedulerTest {
                 });
         factory.allReaders.add(reader);
 
-        readScheduler.registerNewSubpartition(0, subpartitionViewOperation);
+        fileDataManager.registerNewSubpartition(0, subpartitionViewOperation);
 
         ioExecutor.trigger();
 
@@ -275,7 +275,7 @@ class HsResultPartitionReadSchedulerTest {
 
     // ----------------------- test release 
---------------------------------------
 
-    /** Test scheduler release when reader is reading buffers. */
+    /** Test file data manager release when reader is reading buffers. */
     @Test
     @Timeout(10)
     void testReleasedWhenReading() throws Exception {
@@ -284,14 +284,14 @@ class HsResultPartitionReadSchedulerTest {
         CompletableFuture<Throwable> cause = new CompletableFuture<>();
         reader.setFailConsumer((cause::complete));
         CompletableFuture<Void> readBufferStart = new CompletableFuture<>();
-        CompletableFuture<Void> schedulerReleasedFinish = new 
CompletableFuture<>();
+        CompletableFuture<Void> releasedFinish = new CompletableFuture<>();
         reader.setReadBuffersConsumer(
                 (requestedBuffers, readBuffers) -> {
                     try {
                         readBufferStart.complete(null);
-                        schedulerReleasedFinish.get();
+                        releasedFinish.get();
                     } catch (Exception e) {
-                        // re-throw all exception to IOException caught by 
read scheduler.
+                        // re-throw all exception to IOException caught by 
file data manager.
                         throw new IOException(e);
                     }
                 });
@@ -302,13 +302,13 @@ class HsResultPartitionReadSchedulerTest {
                     @Override
                     public void go() throws Exception {
                         readBufferStart.get();
-                        readScheduler.release();
-                        schedulerReleasedFinish.complete(null);
+                        fileDataManager.release();
+                        releasedFinish.complete(null);
                     }
                 };
         releaseThread.start();
 
-        readScheduler.registerNewSubpartition(0, subpartitionViewOperation);
+        fileDataManager.registerNewSubpartition(0, subpartitionViewOperation);
 
         ioExecutor.trigger();
 
@@ -320,20 +320,20 @@ class HsResultPartitionReadSchedulerTest {
                 .hasMessageContaining("Result partition has been already 
released.");
     }
 
-    /** Test scheduler was released, but receive new subpartition reader 
registration. */
+    /** Test file data manager was released, but receive new subpartition 
reader registration. */
     @Test
-    void testRegisterSubpartitionReaderAfterSchedulerReleased() {
+    void testRegisterSubpartitionReaderAfterReleased() {
         TestingHsSubpartitionFileReader reader = new 
TestingHsSubpartitionFileReader();
         factory.allReaders.add(reader);
 
-        readScheduler.release();
+        fileDataManager.release();
         assertThatThrownBy(
                         () -> {
-                            readScheduler.registerNewSubpartition(0, 
subpartitionViewOperation);
+                            fileDataManager.registerNewSubpartition(0, 
subpartitionViewOperation);
                             ioExecutor.trigger();
                         })
                 .isInstanceOf(IllegalStateException.class)
-                .hasMessageContaining("HsResultPartitionReadScheduler is 
already released.");
+                .hasMessageContaining("HsFileDataManager is already 
released.");
     }
 
     private static FileChannel openFileChannel(Path path) throws IOException {

Reply via email to