This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9cde5322969d7c92a76d615576f1cdd52454b33e Author: Weijie Guo <[email protected]> AuthorDate: Thu Jul 28 17:41:17 2022 +0800 [hotfix] Rename HsResultPartitionReadScheduler to HsFileDataManager Rename HsResultPartitionReadScheduler to HsFileDataManager as it plays the same role of FileDataManager mentioned in FLIP. --- ...onReadScheduler.java => HsFileDataManager.java} | 20 ++++---- ...hedulerTest.java => HsFileDataManagerTest.java} | 60 +++++++++++----------- 2 files changed, 40 insertions(+), 40 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionReadScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java similarity index 95% rename from flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionReadScheduler.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java index ef742b61dee..dab671aa723 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionReadScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java @@ -53,12 +53,12 @@ import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; /** - * IO scheduler for HsResultPartition, which schedules {@link HsSubpartitionFileReaderImpl} for + * File data manager for HsResultPartition, which schedules {@link HsSubpartitionFileReaderImpl} for * loading data w.r.t. their offset in the file. */ @ThreadSafe -public class HsResultPartitionReadScheduler implements Runnable, BufferRecycler { - private static final Logger LOG = LoggerFactory.getLogger(HsResultPartitionReadScheduler.class); +public class HsFileDataManager implements Runnable, BufferRecycler { + private static final Logger LOG = LoggerFactory.getLogger(HsFileDataManager.class); /** Executor to run the shuffle data reading task. */ private final Executor ioExecutor; @@ -76,8 +76,8 @@ public class HsResultPartitionReadScheduler implements Runnable, BufferRecycler private final Object lock = new Object(); /** - * A {@link CompletableFuture} to be completed when this read scheduler including all resources - * is released. + * A {@link CompletableFuture} to be completed when this data manager including all resources is + * released. */ @GuardedBy("lock") private final CompletableFuture<?> releaseFuture = new CompletableFuture<>(); @@ -112,14 +112,14 @@ public class HsResultPartitionReadScheduler implements Runnable, BufferRecycler @GuardedBy("lock") private volatile int numRequestedBuffers; - /** Whether this read scheduler has been released or not. */ + /** Whether this file data manager has been released or not. */ @GuardedBy("lock") private volatile boolean isReleased; @GuardedBy("lock") private FileChannel dataFileChannel; - public HsResultPartitionReadScheduler( + public HsFileDataManager( BatchShuffleReadBufferPool bufferPool, Executor ioExecutor, HsFileDataIndex dataIndex, @@ -149,7 +149,7 @@ public class HsResultPartitionReadScheduler implements Runnable, BufferRecycler public HsSubpartitionFileReader registerNewSubpartition( int subpartitionId, HsSubpartitionViewInternalOperations operation) throws IOException { synchronized (lock) { - checkState(!isReleased, "HsResultPartitionReadScheduler is already released."); + checkState(!isReleased, "HsFileDataManager is already released."); lazyInitialize(); HsSubpartitionFileReader subpartitionReader = @@ -168,8 +168,8 @@ public class HsResultPartitionReadScheduler implements Runnable, BufferRecycler } /** - * Releases this read scheduler and returns a {@link CompletableFuture} which will be completed - * when all resources are released. + * Releases this file data manager and returns a {@link CompletableFuture} which will be + * completed when all resources are released. */ public CompletableFuture<?> release() { synchronized (lock) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionReadSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java similarity index 88% rename from flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionReadSchedulerTest.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java index c943665b3b0..1a39e14ca02 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionReadSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java @@ -51,9 +51,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -/** Tests for {@link HsResultPartitionReadScheduler}. */ +/** Tests for {@link HsFileDataManager}. */ @ExtendWith(TestLoggerExtension.class) -class HsResultPartitionReadSchedulerTest { +class HsFileDataManagerTest { private static final int BUFFER_SIZE = 1024; private static final int NUM_SUBPARTITIONS = 10; @@ -70,7 +70,7 @@ class HsResultPartitionReadSchedulerTest { private Path dataFilePath; - private HsResultPartitionReadScheduler readScheduler; + private HsFileDataManager fileDataManager; private TestingSubpartitionViewInternalOperation subpartitionViewOperation; @@ -85,8 +85,8 @@ class HsResultPartitionReadSchedulerTest { dataFilePath = Files.createFile(tempDir.resolve(".data")); dataFileChannel = openFileChannel(dataFilePath); factory = new TestingHsSubpartitionFileReader.Factory(); - readScheduler = - new HsResultPartitionReadScheduler( + fileDataManager = + new HsFileDataManager( bufferPool, ioExecutor, new HsFileDataIndexImpl(NUM_SUBPARTITIONS), @@ -117,7 +117,7 @@ class HsResultPartitionReadSchedulerTest { assertThat(reader.readBuffers).isEmpty(); - readScheduler.registerNewSubpartition(0, subpartitionViewOperation); + fileDataManager.registerNewSubpartition(0, subpartitionViewOperation); ioExecutor.trigger(); @@ -136,15 +136,15 @@ class HsResultPartitionReadSchedulerTest { factory.allReaders.add(reader); - readScheduler.registerNewSubpartition(0, subpartitionViewOperation); + fileDataManager.registerNewSubpartition(0, subpartitionViewOperation); ioExecutor.trigger(); assertThat(reader.readBuffers).hasSize(BUFFER_POOL_SIZE); assertThat(bufferPool.getAvailableBuffers()).isZero(); - readScheduler.recycle(reader.readBuffers.poll()); - readScheduler.recycle(reader.readBuffers.poll()); + fileDataManager.recycle(reader.readBuffers.poll()); + fileDataManager.recycle(reader.readBuffers.poll()); // recycle buffer will push new runnable to ioExecutor. ioExecutor.trigger(); @@ -163,12 +163,12 @@ class HsResultPartitionReadSchedulerTest { assertThat(prepareForSchedulingFinished).isCompleted(); assertThat(requestedBuffers).hasSize(BUFFER_POOL_SIZE); assertThat(bufferPool.getAvailableBuffers()).isEqualTo(0); - // read one buffer, return another buffer to scheduler. + // read one buffer, return another buffer to data manager. readBuffers.add(requestedBuffers.poll()); }); factory.allReaders.add(reader); - readScheduler.registerNewSubpartition(0, subpartitionViewOperation); + fileDataManager.registerNewSubpartition(0, subpartitionViewOperation); ioExecutor.trigger(); @@ -176,7 +176,7 @@ class HsResultPartitionReadSchedulerTest { assertThat(bufferPool.getAvailableBuffers()).isEqualTo(1); } - /** Test scheduler will schedule readers in order. */ + /** Test file data manager will schedule readers in order. */ @Test void testScheduleReadersOrdered() throws Exception { TestingHsSubpartitionFileReader reader1 = new TestingHsSubpartitionFileReader(); @@ -201,8 +201,8 @@ class HsResultPartitionReadSchedulerTest { factory.allReaders.add(reader1); factory.allReaders.add(reader2); - readScheduler.registerNewSubpartition(0, subpartitionViewOperation); - readScheduler.registerNewSubpartition(1, subpartitionViewOperation); + fileDataManager.registerNewSubpartition(0, subpartitionViewOperation); + fileDataManager.registerNewSubpartition(1, subpartitionViewOperation); // trigger run. ioExecutor.trigger(); @@ -218,8 +218,8 @@ class HsResultPartitionReadSchedulerTest { bufferPool.requestBuffers(); assertThat(bufferPool.getAvailableBuffers()).isZero(); - readScheduler = - new HsResultPartitionReadScheduler( + fileDataManager = + new HsFileDataManager( bufferPool, ioExecutor, new HsFileDataIndexImpl(NUM_SUBPARTITIONS), @@ -237,7 +237,7 @@ class HsResultPartitionReadSchedulerTest { reader.setFailConsumer((cause::complete)); factory.allReaders.add(reader); - readScheduler.registerNewSubpartition(0, subpartitionViewOperation); + fileDataManager.registerNewSubpartition(0, subpartitionViewOperation); ioExecutor.trigger(); @@ -263,7 +263,7 @@ class HsResultPartitionReadSchedulerTest { }); factory.allReaders.add(reader); - readScheduler.registerNewSubpartition(0, subpartitionViewOperation); + fileDataManager.registerNewSubpartition(0, subpartitionViewOperation); ioExecutor.trigger(); @@ -275,7 +275,7 @@ class HsResultPartitionReadSchedulerTest { // ----------------------- test release --------------------------------------- - /** Test scheduler release when reader is reading buffers. */ + /** Test file data manager release when reader is reading buffers. */ @Test @Timeout(10) void testReleasedWhenReading() throws Exception { @@ -284,14 +284,14 @@ class HsResultPartitionReadSchedulerTest { CompletableFuture<Throwable> cause = new CompletableFuture<>(); reader.setFailConsumer((cause::complete)); CompletableFuture<Void> readBufferStart = new CompletableFuture<>(); - CompletableFuture<Void> schedulerReleasedFinish = new CompletableFuture<>(); + CompletableFuture<Void> releasedFinish = new CompletableFuture<>(); reader.setReadBuffersConsumer( (requestedBuffers, readBuffers) -> { try { readBufferStart.complete(null); - schedulerReleasedFinish.get(); + releasedFinish.get(); } catch (Exception e) { - // re-throw all exception to IOException caught by read scheduler. + // re-throw all exception to IOException caught by file data manager. throw new IOException(e); } }); @@ -302,13 +302,13 @@ class HsResultPartitionReadSchedulerTest { @Override public void go() throws Exception { readBufferStart.get(); - readScheduler.release(); - schedulerReleasedFinish.complete(null); + fileDataManager.release(); + releasedFinish.complete(null); } }; releaseThread.start(); - readScheduler.registerNewSubpartition(0, subpartitionViewOperation); + fileDataManager.registerNewSubpartition(0, subpartitionViewOperation); ioExecutor.trigger(); @@ -320,20 +320,20 @@ class HsResultPartitionReadSchedulerTest { .hasMessageContaining("Result partition has been already released."); } - /** Test scheduler was released, but receive new subpartition reader registration. */ + /** Test file data manager was released, but receive new subpartition reader registration. */ @Test - void testRegisterSubpartitionReaderAfterSchedulerReleased() { + void testRegisterSubpartitionReaderAfterReleased() { TestingHsSubpartitionFileReader reader = new TestingHsSubpartitionFileReader(); factory.allReaders.add(reader); - readScheduler.release(); + fileDataManager.release(); assertThatThrownBy( () -> { - readScheduler.registerNewSubpartition(0, subpartitionViewOperation); + fileDataManager.registerNewSubpartition(0, subpartitionViewOperation); ioExecutor.trigger(); }) .isInstanceOf(IllegalStateException.class) - .hasMessageContaining("HsResultPartitionReadScheduler is already released."); + .hasMessageContaining("HsFileDataManager is already released."); } private static FileChannel openFileChannel(Path path) throws IOException {
