This is an automated email from the ASF dual-hosted git repository.

hxb pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.16 by this push:
     new d233b39be94 [FLINK-29641][tests] 
SortMergeResultPartitionReadSchedulerTest#testCreateSubpartitionReader is not 
stable
d233b39be94 is described below

commit d233b39be94f330dabba593ac3e709a73eb714d2
Author: Weijie Guo <res...@163.com>
AuthorDate: Tue Oct 18 11:05:24 2022 +0800

    [FLINK-29641][tests] 
SortMergeResultPartitionReadSchedulerTest#testCreateSubpartitionReader is not 
stable
    
    The newly created subpartition reader will trigger the IO thread to read 
data. If the data reading is completed, the running state will be set to false 
and the assertion will fail.
    
    This closes #21083.
---
 .../partition/SortMergeResultPartitionReadSchedulerTest.java     | 9 +++++++++
 1 file changed, 9 insertions(+)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
index a865f6c4915..62c76b9f1ec 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
+import 
org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
 import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.CompositeBuffer;
@@ -115,7 +116,13 @@ class SortMergeResultPartitionReadSchedulerTest {
     }
 
     @Test
+    @Timeout(60)
     void testCreateSubpartitionReader() throws Exception {
+        ManuallyTriggeredScheduledExecutorService ioExecutor =
+                new ManuallyTriggeredScheduledExecutorService();
+        readScheduler =
+                new SortMergeResultPartitionReadScheduler(bufferPool, 
ioExecutor, new Object());
+
         SortMergeSubpartitionReader subpartitionReader =
                 readScheduler.createSubpartitionReader(
                         new NoOpBufferAvailablityListener(), 0, 
partitionedFile);
@@ -123,9 +130,11 @@ class SortMergeResultPartitionReadSchedulerTest {
         assertThat(readScheduler.isRunning()).isTrue();
         assertThat(readScheduler.getDataFileChannel().isOpen()).isTrue();
         assertThat(readScheduler.getIndexFileChannel().isOpen()).isTrue();
+        assertThat(ioExecutor.numQueuedRunnables()).isEqualTo(1);
 
         int numBuffersRead = 0;
         while (numBuffersRead < numBuffersPerSubpartition) {
+            ioExecutor.triggerAll();
             ResultSubpartition.BufferAndBacklog bufferAndBacklog =
                     subpartitionReader.getNextBuffer();
             if (bufferAndBacklog != null) {

Reply via email to