This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit dd2ac7af895ab117877a361fdafaeb19f8a65bd5 Author: Stephan Ewen <[email protected]> AuthorDate: Mon Oct 12 13:43:57 2020 +0200 [FLINK-19427][FLINK-19489][tests] Fix test conditions for 'SplitFetcherTest.testNotifiesWhenGoingIdleConcurrent()' The changed logic also fixes flaky thread shutdown logic as a side effect, because it no longer relies on thread interrupting. This closes #13593 --- .../source/reader/fetcher/SplitFetcherTest.java | 54 +++++++++++----------- 1 file changed, 26 insertions(+), 28 deletions(-) diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java index 5027e3f..5082ebf 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java @@ -120,16 +120,16 @@ public class SplitFetcherTest { final SplitFetcher<Object, TestingSourceSplit> fetcher = createFetcherWithSplit( "test-split", queue, new TestingSplitReader<>(finishedSplitFetch("test-split"))); - final QueueDrainerThread queueDrainer = new QueueDrainerThread(queue); + final QueueDrainerThread queueDrainer = new QueueDrainerThread(queue, fetcher, 1); queueDrainer.start(); - try { - fetcher.runOnce(); + fetcher.runOnce(); - assertTrue(queue.getAvailabilityFuture().isDone()); - } finally { - queueDrainer.shutdown(); - } + queueDrainer.sync(); + + // either we got the notification that the fetcher went idle after the queue was drained (thread finished) + // or the fetcher was already idle when the thread drained the queue (then we need no additional notification) + assertTrue(queue.getAvailabilityFuture().isDone() || queueDrainer.wasIdleWhenFinished()); } @Test @@ -139,18 +139,15 @@ public class SplitFetcherTest { final SplitFetcher<Object, TestingSourceSplit> fetcher = createFetcherWithSplit( "test-split", queue, new TestingSplitReader<>(finishedSplitFetch("test-split"))); - final QueueDrainerThread queueDrainer = new QueueDrainerThread(queue); + final QueueDrainerThread queueDrainer = new QueueDrainerThread(queue, fetcher, 1); queueDrainer.start(); final CompletableFuture<?> future = queue.getAvailabilityFuture(); - try { - fetcher.runOnce(); + fetcher.runOnce(); + assertTrue(future.isDone()); - assertTrue(future.isDone()); - } finally { - queueDrainer.shutdown(); - } + queueDrainer.sync(); } @Test @@ -274,31 +271,32 @@ public class SplitFetcherTest { private static final class QueueDrainerThread extends CheckedThread { private final FutureCompletingBlockingQueue<?> queue; - private volatile boolean running = true; + private final SplitFetcher<?, ?> fetcher; + private final int numFetchesToTake; - QueueDrainerThread(FutureCompletingBlockingQueue<?> queue) { + private volatile boolean wasIdleWhenFinished; + + QueueDrainerThread(FutureCompletingBlockingQueue<?> queue, SplitFetcher<?, ?> fetcher, int numFetchesToTake) { super("Queue Drainer"); setPriority(Thread.MAX_PRIORITY); this.queue = queue; + this.fetcher = fetcher; + this.numFetchesToTake = numFetchesToTake; } @Override public void go() throws Exception { - while (running) { - try { - queue.take(); - } - catch (InterruptedException ignored) { - Thread.currentThread().interrupt(); - // fall through the loop - } + int remaining = numFetchesToTake; + while (remaining > 0) { + remaining--; + queue.take(); } + + wasIdleWhenFinished = fetcher.isIdle(); } - public void shutdown() throws Exception { - running = false; - interrupt(); - sync(); + public boolean wasIdleWhenFinished() { + return wasIdleWhenFinished; } } }
