This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit dd2ac7af895ab117877a361fdafaeb19f8a65bd5
Author: Stephan Ewen <[email protected]>
AuthorDate: Mon Oct 12 13:43:57 2020 +0200

    [FLINK-19427][FLINK-19489][tests] Fix test conditions for 
'SplitFetcherTest.testNotifiesWhenGoingIdleConcurrent()'
    
    The changed logic also fixes flaky thread shutdown logic as a side effect, 
because it no longer relies
    on thread interrupting.
    
    This closes #13593
---
 .../source/reader/fetcher/SplitFetcherTest.java    | 54 +++++++++++-----------
 1 file changed, 26 insertions(+), 28 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
index 5027e3f..5082ebf 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
@@ -120,16 +120,16 @@ public class SplitFetcherTest {
                final SplitFetcher<Object, TestingSourceSplit> fetcher = 
createFetcherWithSplit(
                        "test-split", queue, new 
TestingSplitReader<>(finishedSplitFetch("test-split")));
 
-               final QueueDrainerThread queueDrainer = new 
QueueDrainerThread(queue);
+               final QueueDrainerThread queueDrainer = new 
QueueDrainerThread(queue, fetcher, 1);
                queueDrainer.start();
 
-               try {
-                       fetcher.runOnce();
+               fetcher.runOnce();
 
-                       assertTrue(queue.getAvailabilityFuture().isDone());
-               } finally {
-                       queueDrainer.shutdown();
-               }
+               queueDrainer.sync();
+
+               // either we got the notification that the fetcher went idle 
after the queue was drained (thread finished)
+               // or the fetcher was already idle when the thread drained the 
queue (then we need no additional notification)
+               assertTrue(queue.getAvailabilityFuture().isDone() || 
queueDrainer.wasIdleWhenFinished());
        }
 
        @Test
@@ -139,18 +139,15 @@ public class SplitFetcherTest {
                final SplitFetcher<Object, TestingSourceSplit> fetcher = 
createFetcherWithSplit(
                                "test-split", queue, new 
TestingSplitReader<>(finishedSplitFetch("test-split")));
 
-               final QueueDrainerThread queueDrainer = new 
QueueDrainerThread(queue);
+               final QueueDrainerThread queueDrainer = new 
QueueDrainerThread(queue, fetcher, 1);
                queueDrainer.start();
 
                final CompletableFuture<?> future = 
queue.getAvailabilityFuture();
 
-               try {
-                       fetcher.runOnce();
+               fetcher.runOnce();
+               assertTrue(future.isDone());
 
-                       assertTrue(future.isDone());
-               } finally {
-                       queueDrainer.shutdown();
-               }
+               queueDrainer.sync();
        }
 
        @Test
@@ -274,31 +271,32 @@ public class SplitFetcherTest {
        private static final class QueueDrainerThread extends CheckedThread {
 
                private final FutureCompletingBlockingQueue<?> queue;
-               private volatile boolean running = true;
+               private final SplitFetcher<?, ?> fetcher;
+               private final int numFetchesToTake;
 
-               QueueDrainerThread(FutureCompletingBlockingQueue<?> queue) {
+               private volatile boolean wasIdleWhenFinished;
+
+               QueueDrainerThread(FutureCompletingBlockingQueue<?> queue, 
SplitFetcher<?, ?> fetcher, int numFetchesToTake) {
                        super("Queue Drainer");
                        setPriority(Thread.MAX_PRIORITY);
                        this.queue = queue;
+                       this.fetcher = fetcher;
+                       this.numFetchesToTake = numFetchesToTake;
                }
 
                @Override
                public void go() throws Exception {
-                       while (running) {
-                               try {
-                                       queue.take();
-                               }
-                               catch (InterruptedException ignored) {
-                                       Thread.currentThread().interrupt();
-                                       // fall through the loop
-                               }
+                       int remaining = numFetchesToTake;
+                       while (remaining > 0) {
+                               remaining--;
+                               queue.take();
                        }
+
+                       wasIdleWhenFinished = fetcher.isIdle();
                }
 
-               public void shutdown() throws Exception {
-                       running = false;
-                       interrupt();
-                       sync();
+               public boolean wasIdleWhenFinished() {
+                       return wasIdleWhenFinished;
                }
        }
 }

Reply via email to