This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6072f5f6191f26a32a24d30338c8e82142891d33 Author: Stephan Ewen <[email protected]> AuthorDate: Tue Sep 15 17:30:28 2020 +0200 [FLINK-17393][connectors] (follow-up) Wakeup the SplitFetchers more elegantly. - Remove the InterruptedException from methods where we now do not expect an InterruptedException to be thrown any more. - Removes the code that was previously clearing the interruption flag for "benign interruptions". - Adjust comments to reflect actual code model - Remove unused method - Adjust tests to not use arbitrary interruptions but local (in the wakeup scope only) --- .../base/source/reader/fetcher/AddSplitsTask.java | 2 +- .../base/source/reader/fetcher/FetchTask.java | 5 ++- .../base/source/reader/fetcher/SplitFetcher.java | 45 ++++------------------ .../source/reader/fetcher/SplitFetcherTask.java | 3 +- .../source/reader/splitreader/SplitReader.java | 3 +- .../source/reader/fetcher/SplitFetcherTest.java | 14 +++---- .../base/source/reader/mocks/MockSplitReader.java | 36 +++++++++++++---- .../source/reader/mocks/TestingSplitReader.java | 8 +++- .../connector/source/mocks/MockSourceSplit.java | 3 ++ 9 files changed, 59 insertions(+), 60 deletions(-) diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/AddSplitsTask.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/AddSplitsTask.java index 19b15b5..82e529a 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/AddSplitsTask.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/AddSplitsTask.java @@ -50,7 +50,7 @@ class AddSplitsTask<SplitT extends SourceSplit> implements SplitFetcherTask { } @Override - public boolean run() throws InterruptedException { + public boolean run() { if (!splitsChangesAdded) { splitsChanges.add(new SplitsAddition<>(splitsToAdd)); splitsToAdd.forEach(s -> assignedSplits.put(s.splitId(), s)); diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java index 530add1..743e763 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java @@ -52,7 +52,7 @@ class FetchTask<E, SplitT extends SourceSplit> implements SplitFetcherTask { } @Override - public boolean run() throws InterruptedException, IOException { + public boolean run() throws IOException { try { if (!isWakenUp() && lastRecords == null) { lastRecords = splitReader.fetch(); @@ -67,6 +67,9 @@ class FetchTask<E, SplitT extends SourceSplit> implements SplitFetcherTask { lastRecords = null; } } + } catch (InterruptedException e) { + // this should only happen on shutdown + throw new IOException("Source fetch execution was interrupted", e); } finally { // clean up the potential wakeup effect. It is possible that the fetcher is waken up // after the clean up. In that case, either the wakeup flag will be set or the diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java index 3beb0da..db5a203 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java @@ -27,7 +27,6 @@ import org.apache.flink.connector.base.source.reader.synchronization.FutureCompl import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -98,8 +97,6 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable { runOnce(); } } finally { - // Reset the interrupted flag so the shutdown hook do not got interrupted. - Thread.interrupted(); shutdownHook.run(); LOG.info("Split fetcher {} exited.", id); } @@ -116,38 +113,21 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable { } else { runningTask = taskQueue.take(); } - // Now the running task is not null. If wakeUp() is called after this point, the fetcher - // thread will not be interrupted. Instead task.wakeUp() will be called. On the other hand, - // If the wakeUp() call was make before this point, the wakeUp flag must have already been - // have been set, and the fetcher thread may or may not be interrupted, depending on - // whether the wakeUp() call was before or after the runningTask assignment. So the - // code does the following: - // 1. check and clear the interrupt flag on the fetcher thread to avoid interruption in - // later code. - // 2. check the wakeUp flag to avoid unnecessary task run. + // Now the running task is not null. If wakeUp() is called after this point, + // task.wakeUp() will be called. On the other hand, if the wakeUp() call was make before + // this point, the wakeUp flag must have already been set. The code hence checks the wakeUp + // flag first to avoid an unnecessary task run. // Note that the runningTask may still encounter the case that the task is waken up before // the it starts running. LOG.debug("Prepare to run {}", runningTask); - if (!Thread.interrupted() && !wakeUp.get() && runningTask.run()) { + if (!wakeUp.get() && runningTask.run()) { LOG.debug("Finished running task {}", runningTask); // the task has finished running. Set it to null so it won't be enqueued. runningTask = null; } - } catch (InterruptedException ie) { - if (closed.get()) { - // The fetcher is closed, just return; - return; - } else if (wakeUp.get()) { - // The fetcher thread has just been waken up. So ignore the interrupted exception - // and continue; - LOG.debug("Split fetcher has been waken up."); - } else { - throw new RuntimeException(String.format( - "SplitFetcher thread %d interrupted while polling the records", id), ie); - } - } catch (IOException ioe) { + } catch (Exception e) { throw new RuntimeException(String.format( - "SplitFetcher thread %d received unexpected exception while polling the records", id), ioe); + "SplitFetcher thread %d received unexpected exception while polling the records", id), e); } // If the task is not null that means this task needs to be re-executed. This only // happens when the task is the fetching task or the task was interrupted. @@ -156,9 +136,6 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable { // Set the running task to null. It is necessary for the shutdown method to avoid // unnecessarily interrupt the running task. runningTask = null; - // Clean the interrupt flag in case the running task was interrupted after it finishes - // running but before it was set to null. - Thread.interrupted(); // Set the wakeUp flag to false. wakeUp.set(false); LOG.debug("Cleaned wakeup flag."); @@ -290,12 +267,6 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable { return task != null && task != WAKEUP_TASK; } - private void removeAssignedSplit(String splitId) { - assignedSplits.remove(splitId); - LOG.debug("Removed {} split from assigned splits. The assigned splits now are {}", splitId, assignedSplits); - - } - private void checkAndSetIdle() { final boolean nowIdle = assignedSplits.isEmpty() && taskQueue.isEmpty() && splitChanges.isEmpty(); if (nowIdle) { @@ -317,7 +288,7 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable { } @Override - public boolean run() throws InterruptedException { + public boolean run() { return false; } diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java index 999601a..ebbc0a6 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java @@ -32,10 +32,9 @@ public interface SplitFetcherTask { * invocation is needed. * * @return whether the runnable has successfully finished running. - * @throws InterruptedException when interrupted. * @throws IOException when the performed I/O operation fails. */ - boolean run() throws InterruptedException, IOException; + boolean run() throws IOException; /** * Wake up the running thread. diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java index b980f7b..9114614 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java @@ -42,10 +42,9 @@ public interface SplitReader<E, SplitT extends SourceSplit> { * * @return the Ids of the finished splits. * - * @throws InterruptedException when interrupted * @throws IOException when encountered IO errors, such as deserialization failures. */ - RecordsWithSplitIds<E> fetch() throws InterruptedException, IOException; + RecordsWithSplitIds<E> fetch() throws IOException; /** * Handle the split changes. This call should be non-blocking. diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java index c25490b..e5f5faf 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java @@ -157,7 +157,7 @@ public class SplitFetcherTest { public void testWakeup() throws InterruptedException { final int numSplits = 3; final int numRecordsPerSplit = 10_000; - final int interruptRecordsInterval = 10; + final int wakeupRecordsInterval = 10; final int numTotalRecords = numRecordsPerSplit * numSplits; FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementQueue = @@ -189,16 +189,16 @@ public class SplitFetcherTest { // A thread waking up the split fetcher frequently. AtomicInteger wakeupTimes = new AtomicInteger(0); AtomicBoolean stop = new AtomicBoolean(false); - Thread interrupter = new Thread("Interrupter") { + Thread wakeUpCaller = new Thread("Wakeup Caller") { @Override public void run() { - int lastInterrupt = 0; + int lastWakeup = 0; while (recordsRead.size() < numTotalRecords && !stop.get()) { int numRecordsRead = recordsRead.size(); - if (numRecordsRead >= lastInterrupt + interruptRecordsInterval) { + if (numRecordsRead >= lastWakeup + wakeupRecordsInterval) { fetcher.wakeUp(false); wakeupTimes.incrementAndGet(); - lastInterrupt = numRecordsRead; + lastWakeup = numRecordsRead; } } } @@ -206,7 +206,7 @@ public class SplitFetcherTest { try { fetcherThread.start(); - interrupter.start(); + wakeUpCaller.start(); while (recordsRead.size() < numSplits * numRecordsPerSplit) { final RecordsWithSplitIds<int[]> nextBatch = elementQueue.take(); @@ -226,7 +226,7 @@ public class SplitFetcherTest { stop.set(true); fetcher.shutdown(); fetcherThread.join(); - interrupter.join(); + wakeUpCaller.join(); } } diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java index 00d4d71..4cb738a 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java @@ -43,7 +43,10 @@ public class MockSplitReader implements SplitReader<int[], MockSourceSplit> { private final int numRecordsPerSplitPerFetch; private final boolean blockingFetch; private final boolean handleSplitsInOneShot; - private volatile Thread runningThread; + + private final Object wakeupLock = new Object(); + private volatile Thread threadInBlocking; + private boolean wokenUp; public MockSplitReader( int numRecordsPerSplitPerFetch, @@ -52,14 +55,10 @@ public class MockSplitReader implements SplitReader<int[], MockSourceSplit> { this.numRecordsPerSplitPerFetch = numRecordsPerSplitPerFetch; this.blockingFetch = blockingFetch; this.handleSplitsInOneShot = handleSplitsInOneShot; - this.runningThread = null; } @Override - public RecordsWithSplitIds<int[]> fetch() throws InterruptedException { - if (runningThread == null) { - runningThread = Thread.currentThread(); - } + public RecordsWithSplitIds<int[]> fetch() { return getRecords(); } @@ -75,14 +74,26 @@ public class MockSplitReader implements SplitReader<int[], MockSourceSplit> { @Override public void wakeUp() { - if (blockingFetch && runningThread != null) { - runningThread.interrupt(); + synchronized (wakeupLock) { + wokenUp = true; + if (threadInBlocking != null) { + threadInBlocking.interrupt(); + } } } private RecordsBySplits<int[]> getRecords() { final RecordsBySplits.Builder<int[]> records = new RecordsBySplits.Builder<>(); + // after this locked section, the thread might be interrupted + synchronized (wakeupLock) { + if (wokenUp) { + wokenUp = false; + return records.build(); + } + threadInBlocking = Thread.currentThread(); + } + try { for (Map.Entry<String, MockSourceSplit> entry : splits.entrySet()) { MockSourceSplit split = entry.getValue(); @@ -102,7 +113,16 @@ public class MockSplitReader implements SplitReader<int[], MockSourceSplit> { if (!blockingFetch) { throw new RuntimeException("Caught unexpected interrupted exception."); } + } finally { + // after this locked section, the thread may not be interrupted any more + synchronized (wakeupLock) { + wokenUp = false; + //noinspection ResultOfMethodCallIgnored + Thread.interrupted(); + threadInBlocking = null; + } } + return records.build(); } } diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java index 0d202f7..5643088 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java @@ -42,13 +42,17 @@ public class TestingSplitReader<E, SplitT extends SourceSplit> implements SplitR } @Override - public RecordsWithSplitIds<E> fetch() throws InterruptedException, IOException { + public RecordsWithSplitIds<E> fetch() throws IOException { if (!fetches.isEmpty()) { return fetches.removeFirst(); } else { // block until woken up synchronized (fetches) { - fetches.wait(); + try { + fetches.wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } return null; } } diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceSplit.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceSplit.java index dc8ce80..878d674 100644 --- a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceSplit.java +++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceSplit.java @@ -34,6 +34,9 @@ import java.util.concurrent.LinkedBlockingQueue; * polled out of the queue since the creation of this split. */ public class MockSourceSplit implements SourceSplit, Serializable { + + private static final long serialVersionUID = 1L; + private final int id; private final BlockingQueue<Integer> records; private final int endIndex;
