This is an automated email from the ASF dual-hosted git repository. hxb pushed a commit to branch release-1.16 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.16 by this push: new d233b39be94 [FLINK-29641][tests] SortMergeResultPartitionReadSchedulerTest#testCreateSubpartitionReader is not stable d233b39be94 is described below commit d233b39be94f330dabba593ac3e709a73eb714d2 Author: Weijie Guo <res...@163.com> AuthorDate: Tue Oct 18 11:05:24 2022 +0800 [FLINK-29641][tests] SortMergeResultPartitionReadSchedulerTest#testCreateSubpartitionReader is not stable The newly created subpartition reader will trigger the IO thread to read data. If the data reading is completed, the running state will be set to false and the assertion will fail. This closes #21083. --- .../partition/SortMergeResultPartitionReadSchedulerTest.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java index a865f6c4915..62c76b9f1ec 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService; import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.CompositeBuffer; @@ -115,7 +116,13 @@ class SortMergeResultPartitionReadSchedulerTest { } @Test + @Timeout(60) void testCreateSubpartitionReader() throws Exception { + ManuallyTriggeredScheduledExecutorService ioExecutor = + new ManuallyTriggeredScheduledExecutorService(); + readScheduler = + new SortMergeResultPartitionReadScheduler(bufferPool, ioExecutor, new Object()); + SortMergeSubpartitionReader subpartitionReader = readScheduler.createSubpartitionReader( new NoOpBufferAvailablityListener(), 0, partitionedFile); @@ -123,9 +130,11 @@ class SortMergeResultPartitionReadSchedulerTest { assertThat(readScheduler.isRunning()).isTrue(); assertThat(readScheduler.getDataFileChannel().isOpen()).isTrue(); assertThat(readScheduler.getIndexFileChannel().isOpen()).isTrue(); + assertThat(ioExecutor.numQueuedRunnables()).isEqualTo(1); int numBuffersRead = 0; while (numBuffersRead < numBuffersPerSubpartition) { + ioExecutor.triggerAll(); ResultSubpartition.BufferAndBacklog bufferAndBacklog = subpartitionReader.getNextBuffer(); if (bufferAndBacklog != null) {