This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6072f5f6191f26a32a24d30338c8e82142891d33
Author: Stephan Ewen <[email protected]>
AuthorDate: Tue Sep 15 17:30:28 2020 +0200

    [FLINK-17393][connectors] (follow-up) Wakeup the SplitFetchers more 
elegantly.
    
      - Remove the InterruptedException from methods where we now do not expect 
an InterruptedException
        to be thrown any more.
      - Removes the code that was previously clearing the interruption flag for 
"benign interruptions".
      - Adjust comments to reflect actual code model
      - Remove unused method
      - Adjust tests to not use arbitrary interruptions but local (in the 
wakeup scope only)
---
 .../base/source/reader/fetcher/AddSplitsTask.java  |  2 +-
 .../base/source/reader/fetcher/FetchTask.java      |  5 ++-
 .../base/source/reader/fetcher/SplitFetcher.java   | 45 ++++------------------
 .../source/reader/fetcher/SplitFetcherTask.java    |  3 +-
 .../source/reader/splitreader/SplitReader.java     |  3 +-
 .../source/reader/fetcher/SplitFetcherTest.java    | 14 +++----
 .../base/source/reader/mocks/MockSplitReader.java  | 36 +++++++++++++----
 .../source/reader/mocks/TestingSplitReader.java    |  8 +++-
 .../connector/source/mocks/MockSourceSplit.java    |  3 ++
 9 files changed, 59 insertions(+), 60 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/AddSplitsTask.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/AddSplitsTask.java
index 19b15b5..82e529a 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/AddSplitsTask.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/AddSplitsTask.java
@@ -50,7 +50,7 @@ class AddSplitsTask<SplitT extends SourceSplit> implements 
SplitFetcherTask {
        }
 
        @Override
-       public boolean run() throws InterruptedException {
+       public boolean run() {
                if (!splitsChangesAdded) {
                        splitsChanges.add(new SplitsAddition<>(splitsToAdd));
                        splitsToAdd.forEach(s -> 
assignedSplits.put(s.splitId(), s));
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java
index 530add1..743e763 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java
@@ -52,7 +52,7 @@ class FetchTask<E, SplitT extends SourceSplit> implements 
SplitFetcherTask {
        }
 
        @Override
-       public boolean run() throws InterruptedException, IOException {
+       public boolean run() throws IOException {
                try {
                        if (!isWakenUp() && lastRecords == null) {
                                lastRecords = splitReader.fetch();
@@ -67,6 +67,9 @@ class FetchTask<E, SplitT extends SourceSplit> implements 
SplitFetcherTask {
                                        lastRecords = null;
                                }
                        }
+               } catch (InterruptedException e) {
+                       // this should only happen on shutdown
+                       throw new IOException("Source fetch execution was 
interrupted", e);
                } finally {
                        // clean up the potential wakeup effect. It is possible 
that the fetcher is waken up
                        // after the clean up. In that case, either the wakeup 
flag will be set or the
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
index 3beb0da..db5a203 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
@@ -27,7 +27,6 @@ import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompl
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -98,8 +97,6 @@ public class SplitFetcher<E, SplitT extends SourceSplit> 
implements Runnable {
                                runOnce();
                        }
                } finally {
-                       // Reset the interrupted flag so the shutdown hook do 
not got interrupted.
-                       Thread.interrupted();
                        shutdownHook.run();
                        LOG.info("Split fetcher {} exited.", id);
                }
@@ -116,38 +113,21 @@ public class SplitFetcher<E, SplitT extends SourceSplit> 
implements Runnable {
                        } else {
                                runningTask = taskQueue.take();
                        }
-                       // Now the running task is not null. If wakeUp() is 
called after this point, the fetcher
-                       // thread will not be interrupted. Instead 
task.wakeUp() will be called. On the other hand,
-                       // If the wakeUp() call was make before this point, the 
wakeUp flag must have already been
-                       // have been set, and the fetcher thread may or may not 
be interrupted, depending on
-                       // whether the wakeUp() call was before or after the 
runningTask assignment. So the
-                       // code does the following:
-                       // 1. check and clear the interrupt flag on the fetcher 
thread to avoid interruption in
-                       //    later code.
-                       // 2. check the wakeUp flag to avoid unnecessary task 
run.
+                       // Now the running task is not null. If wakeUp() is 
called after this point,
+                       // task.wakeUp() will be called. On the other hand, if 
the wakeUp() call was make before
+                       // this point, the wakeUp flag must have already been 
set. The code hence checks the wakeUp
+                       // flag first to avoid an unnecessary task run.
                        // Note that the runningTask may still encounter the 
case that the task is waken up before
                        // the it starts running.
                        LOG.debug("Prepare to run {}", runningTask);
-                       if (!Thread.interrupted() && !wakeUp.get() && 
runningTask.run()) {
+                       if (!wakeUp.get() && runningTask.run()) {
                                LOG.debug("Finished running task {}", 
runningTask);
                                // the task has finished running. Set it to 
null so it won't be enqueued.
                                runningTask = null;
                        }
-               } catch (InterruptedException ie) {
-                       if (closed.get()) {
-                               // The fetcher is closed, just return;
-                               return;
-                       } else if (wakeUp.get()) {
-                               // The fetcher thread has just been waken up. 
So ignore the interrupted exception
-                               // and continue;
-                               LOG.debug("Split fetcher has been waken up.");
-                       } else {
-                               throw new RuntimeException(String.format(
-                                       "SplitFetcher thread %d interrupted 
while polling the records", id), ie);
-                       }
-               } catch (IOException ioe) {
+               } catch (Exception e) {
                        throw new RuntimeException(String.format(
-                               "SplitFetcher thread %d received unexpected 
exception while polling the records", id), ioe);
+                               "SplitFetcher thread %d received unexpected 
exception while polling the records", id), e);
                }
                // If the task is not null that means this task needs to be 
re-executed. This only
                // happens when the task is the fetching task or the task was 
interrupted.
@@ -156,9 +136,6 @@ public class SplitFetcher<E, SplitT extends SourceSplit> 
implements Runnable {
                        // Set the running task to null. It is necessary for 
the shutdown method to avoid
                        // unnecessarily interrupt the running task.
                        runningTask = null;
-                       // Clean the interrupt flag in case the running task 
was interrupted after it finishes
-                       // running but before it was set to null.
-                       Thread.interrupted();
                        // Set the wakeUp flag to false.
                        wakeUp.set(false);
                        LOG.debug("Cleaned wakeup flag.");
@@ -290,12 +267,6 @@ public class SplitFetcher<E, SplitT extends SourceSplit> 
implements Runnable {
                return task != null && task != WAKEUP_TASK;
        }
 
-       private void removeAssignedSplit(String splitId) {
-               assignedSplits.remove(splitId);
-               LOG.debug("Removed {} split from assigned splits. The assigned 
splits now are {}", splitId, assignedSplits);
-
-       }
-
        private void checkAndSetIdle() {
                final boolean nowIdle = assignedSplits.isEmpty() && 
taskQueue.isEmpty() && splitChanges.isEmpty();
                if (nowIdle) {
@@ -317,7 +288,7 @@ public class SplitFetcher<E, SplitT extends SourceSplit> 
implements Runnable {
                }
 
                @Override
-               public boolean run() throws InterruptedException {
+               public boolean run() {
                        return false;
                }
 
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java
index 999601a..ebbc0a6 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java
@@ -32,10 +32,9 @@ public interface SplitFetcherTask {
         * invocation is needed.
         *
         * @return whether the runnable has successfully finished running.
-        * @throws InterruptedException when interrupted.
         * @throws IOException when the performed I/O operation fails.
         */
-       boolean run() throws InterruptedException, IOException;
+       boolean run() throws IOException;
 
        /**
         * Wake up the running thread.
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java
index b980f7b..9114614 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java
@@ -42,10 +42,9 @@ public interface SplitReader<E, SplitT extends SourceSplit> {
         *
         * @return the Ids of the finished splits.
         *
-        * @throws InterruptedException when interrupted
         * @throws IOException when encountered IO errors, such as 
deserialization failures.
         */
-       RecordsWithSplitIds<E> fetch() throws InterruptedException, IOException;
+       RecordsWithSplitIds<E> fetch() throws IOException;
 
        /**
         * Handle the split changes. This call should be non-blocking.
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
index c25490b..e5f5faf 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
@@ -157,7 +157,7 @@ public class SplitFetcherTest {
        public void testWakeup() throws InterruptedException {
                final int numSplits = 3;
                final int numRecordsPerSplit = 10_000;
-               final int interruptRecordsInterval = 10;
+               final int wakeupRecordsInterval = 10;
                final int numTotalRecords = numRecordsPerSplit * numSplits;
 
                FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> 
elementQueue =
@@ -189,16 +189,16 @@ public class SplitFetcherTest {
                // A thread waking up the split fetcher frequently.
                AtomicInteger wakeupTimes = new AtomicInteger(0);
                AtomicBoolean stop = new AtomicBoolean(false);
-               Thread interrupter = new Thread("Interrupter") {
+               Thread wakeUpCaller = new Thread("Wakeup Caller") {
                        @Override
                        public void run() {
-                               int lastInterrupt = 0;
+                               int lastWakeup = 0;
                                while (recordsRead.size() < numTotalRecords && 
!stop.get()) {
                                        int numRecordsRead = recordsRead.size();
-                                       if (numRecordsRead >= lastInterrupt + 
interruptRecordsInterval) {
+                                       if (numRecordsRead >= lastWakeup + 
wakeupRecordsInterval) {
                                                fetcher.wakeUp(false);
                                                wakeupTimes.incrementAndGet();
-                                               lastInterrupt = numRecordsRead;
+                                               lastWakeup = numRecordsRead;
                                        }
                                }
                        }
@@ -206,7 +206,7 @@ public class SplitFetcherTest {
 
                try {
                        fetcherThread.start();
-                       interrupter.start();
+                       wakeUpCaller.start();
 
                        while (recordsRead.size() < numSplits * 
numRecordsPerSplit) {
                                final RecordsWithSplitIds<int[]> nextBatch = 
elementQueue.take();
@@ -226,7 +226,7 @@ public class SplitFetcherTest {
                        stop.set(true);
                        fetcher.shutdown();
                        fetcherThread.join();
-                       interrupter.join();
+                       wakeUpCaller.join();
                }
        }
 
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java
index 00d4d71..4cb738a 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java
@@ -43,7 +43,10 @@ public class MockSplitReader implements SplitReader<int[], 
MockSourceSplit> {
        private final int numRecordsPerSplitPerFetch;
        private final boolean blockingFetch;
        private final boolean handleSplitsInOneShot;
-       private volatile Thread runningThread;
+
+       private final Object wakeupLock = new Object();
+       private volatile Thread threadInBlocking;
+       private boolean wokenUp;
 
        public MockSplitReader(
                        int numRecordsPerSplitPerFetch,
@@ -52,14 +55,10 @@ public class MockSplitReader implements SplitReader<int[], 
MockSourceSplit> {
                this.numRecordsPerSplitPerFetch = numRecordsPerSplitPerFetch;
                this.blockingFetch = blockingFetch;
                this.handleSplitsInOneShot = handleSplitsInOneShot;
-               this.runningThread = null;
        }
 
        @Override
-       public RecordsWithSplitIds<int[]> fetch() throws InterruptedException {
-               if (runningThread == null) {
-                       runningThread = Thread.currentThread();
-               }
+       public RecordsWithSplitIds<int[]> fetch() {
                return getRecords();
        }
 
@@ -75,14 +74,26 @@ public class MockSplitReader implements SplitReader<int[], 
MockSourceSplit> {
 
        @Override
        public void wakeUp() {
-               if (blockingFetch && runningThread != null) {
-                       runningThread.interrupt();
+               synchronized (wakeupLock) {
+                       wokenUp = true;
+                       if (threadInBlocking != null) {
+                               threadInBlocking.interrupt();
+                       }
                }
        }
 
        private RecordsBySplits<int[]> getRecords() {
                final RecordsBySplits.Builder<int[]> records = new 
RecordsBySplits.Builder<>();
 
+               // after this locked section, the thread might be interrupted
+               synchronized (wakeupLock) {
+                       if (wokenUp) {
+                               wokenUp = false;
+                               return records.build();
+                       }
+                       threadInBlocking = Thread.currentThread();
+               }
+
                try {
                        for (Map.Entry<String, MockSourceSplit> entry : 
splits.entrySet()) {
                                MockSourceSplit split = entry.getValue();
@@ -102,7 +113,16 @@ public class MockSplitReader implements SplitReader<int[], 
MockSourceSplit> {
                        if (!blockingFetch) {
                                throw new RuntimeException("Caught unexpected 
interrupted exception.");
                        }
+               } finally {
+                       // after this locked section, the thread may not be 
interrupted any more
+                       synchronized (wakeupLock) {
+                               wokenUp = false;
+                               //noinspection ResultOfMethodCallIgnored
+                               Thread.interrupted();
+                               threadInBlocking = null;
+                       }
                }
+
                return records.build();
        }
 }
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java
index 0d202f7..5643088 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java
@@ -42,13 +42,17 @@ public class TestingSplitReader<E, SplitT extends 
SourceSplit> implements SplitR
        }
 
        @Override
-       public RecordsWithSplitIds<E> fetch() throws InterruptedException, 
IOException {
+       public RecordsWithSplitIds<E> fetch() throws IOException {
                if (!fetches.isEmpty()) {
                        return fetches.removeFirst();
                } else {
                        // block until woken up
                        synchronized (fetches) {
-                               fetches.wait();
+                               try {
+                                       fetches.wait();
+                               } catch (InterruptedException e) {
+                                       Thread.currentThread().interrupt();
+                               }
                                return null;
                        }
                }
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceSplit.java
 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceSplit.java
index dc8ce80..878d674 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceSplit.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceSplit.java
@@ -34,6 +34,9 @@ import java.util.concurrent.LinkedBlockingQueue;
  * polled out of the queue since the creation of this split.
  */
 public class MockSourceSplit implements SourceSplit, Serializable {
+
+       private static final long serialVersionUID = 1L;
+
        private final int id;
        private final BlockingQueue<Integer> records;
        private final int endIndex;

Reply via email to