This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7c55a64aaacc4c023cabbe04f1ee5ab79c466dc1
Author: Jiangjie (Becket) Qin <[email protected]>
AuthorDate: Mon Sep 7 21:12:55 2020 +0200

    [refactor][connectors] Backport of the connector-base exception handling 
from the Kafka Connector Pull Request
---
 .../SingleThreadMultiplexSourceReaderBase.java     | 26 ++++-----
 .../base/source/reader/fetcher/FetchTask.java      | 11 ++--
 .../base/source/reader/fetcher/SplitFetcher.java   | 34 +++++++-----
 .../source/reader/fetcher/SplitFetcherTask.java    |  5 +-
 .../source/reader/splitreader/SplitReader.java     |  4 +-
 .../source/reader/splitreader/SplitsAddition.java  |  5 ++
 .../base/source/reader/SourceReaderBaseTest.java   | 62 +++++++++++-----------
 .../base/source/reader/SourceReaderTestBase.java   | 13 +++--
 8 files changed, 90 insertions(+), 70 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
index 546e20a..3239f28 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
@@ -37,21 +37,21 @@ import java.util.function.Supplier;
  * @param <SplitStateT>
  */
 public abstract class SingleThreadMultiplexSourceReaderBase<E, T, SplitT 
extends SourceSplit, SplitStateT>
-               extends SourceReaderBase<E, T, SplitT, SplitStateT> {
+       extends SourceReaderBase<E, T, SplitT, SplitStateT> {
 
        public SingleThreadMultiplexSourceReaderBase(
-                       FutureNotifier futureNotifier,
-                       FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> 
elementsQueue,
-                       Supplier<SplitReader<E, SplitT>> splitFetcherSupplier,
-                       RecordEmitter<E, T, SplitStateT> recordEmitter,
-                       Configuration config,
-                       SourceReaderContext context) {
+               FutureNotifier futureNotifier,
+               FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> 
elementsQueue,
+               Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
+               RecordEmitter<E, T, SplitStateT> recordEmitter,
+               Configuration config,
+               SourceReaderContext context) {
                super(
-                               futureNotifier,
-                               elementsQueue,
-                               new 
SingleThreadFetcherManager<>(futureNotifier, elementsQueue, 
splitFetcherSupplier),
-                               recordEmitter,
-                               config,
-                               context);
+                       futureNotifier,
+                       elementsQueue,
+                       new SingleThreadFetcherManager<>(futureNotifier, 
elementsQueue, splitReaderSupplier),
+                       recordEmitter,
+                       config,
+                       context);
        }
 }
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java
index aff21fd..30835ce 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.concurrent.BlockingQueue;
 import java.util.function.Consumer;
@@ -38,10 +39,10 @@ class FetchTask<E, SplitT extends SourceSplit> implements 
SplitFetcherTask {
        private volatile boolean wakeup;
 
        FetchTask(
-                       SplitReader<E, SplitT> splitReader,
-                       BlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
-                       Consumer<Collection<String>> splitFinishedCallback,
-                       Thread runningThread) {
+               SplitReader<E, SplitT> splitReader,
+               BlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
+               Consumer<Collection<String>> splitFinishedCallback,
+               Thread runningThread) {
                this.splitReader = splitReader;
                this.elementsQueue = elementsQueue;
                this.splitFinishedCallback = splitFinishedCallback;
@@ -51,7 +52,7 @@ class FetchTask<E, SplitT extends SourceSplit> implements 
SplitFetcherTask {
        }
 
        @Override
-       public boolean run() throws InterruptedException {
+       public boolean run() throws InterruptedException, IOException {
                try {
                        if (!isWakenUp() && lastRecords == null) {
                                lastRecords = splitReader.fetch();
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
index d006bb0..35deeba 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -60,10 +61,10 @@ public class SplitFetcher<E, SplitT extends SourceSplit> 
implements Runnable {
        private volatile boolean isIdle;
 
        SplitFetcher(
-                       int id,
-                       BlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
-                       SplitReader<E, SplitT> splitReader,
-                       Runnable shutdownHook) {
+               int id,
+               BlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
+               SplitReader<E, SplitT> splitReader,
+               Runnable shutdownHook) {
 
                this.id = id;
                this.taskQueue = new LinkedBlockingDeque<>();
@@ -84,12 +85,12 @@ public class SplitFetcher<E, SplitT extends SourceSplit> 
implements Runnable {
                        // Remove the split from the assignments if it is 
already done.
                        runningThread = Thread.currentThread();
                        this.fetchTask = new FetchTask<>(
-                                       splitReader,
-                                       elementsQueue,
-                                       ids -> {
-                                               
ids.forEach(assignedSplits::remove);
-                                               updateIsIdle();
-                                       }, runningThread);
+                               splitReader,
+                               elementsQueue,
+                               ids -> {
+                                       ids.forEach(this::removeAssignedSplit);
+                                       updateIsIdle();
+                               }, runningThread);
                        while (!closed.get()) {
                                runOnce();
                        }
@@ -139,8 +140,11 @@ public class SplitFetcher<E, SplitT extends SourceSplit> 
implements Runnable {
                                LOG.debug("Split fetcher has been waken up.");
                        } else {
                                throw new RuntimeException(String.format(
-                                               "SplitFetcher thread %d 
interrupted while polling the records", id), ie);
+                                       "SplitFetcher thread %d interrupted 
while polling the records", id), ie);
                        }
+               } catch (IOException ioe) {
+                       throw new RuntimeException(String.format(
+                               "SplitFetcher thread %d received unexpected 
exception while polling the records", id), ioe);
                }
                // If the task is not null that means this task needs to be 
re-executed. This only
                // happens when the task is the fetching task or the task was 
interrupted.
@@ -272,7 +276,7 @@ public class SplitFetcher<E, SplitT extends SourceSplit> 
implements Runnable {
                // Only enqueue unfinished non-fetch task.
                if (!closed.get() && isRunningTask(task) && task != fetchTask 
&& !taskQueue.offerFirst(task)) {
                        throw new RuntimeException(
-                                       "The task queue is full. This is only 
theoretically possible when really bad thing happens.");
+                               "The task queue is full. This is only 
theoretically possible when really bad thing happens.");
                }
                if (task != null) {
                        LOG.debug("Enqueued task {}", task);
@@ -283,6 +287,12 @@ public class SplitFetcher<E, SplitT extends SourceSplit> 
implements Runnable {
                return task != null && task != WAKEUP_TASK;
        }
 
+       private void removeAssignedSplit(String splitId) {
+               assignedSplits.remove(splitId);
+               LOG.debug("Removed {} split from assigned splits. The assigned 
splits now are {}", splitId, assignedSplits);
+
+       }
+
        //--------------------- Helper class ------------------
 
        private static class DummySplitFetcherTask implements SplitFetcherTask {
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java
index 716d2e2..999601a 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.connector.base.source.reader.fetcher;
 
+import java.io.IOException;
+
 /**
  * An interface similar to {@link Runnable} but allows throwing exceptions and 
wakeup.
  */
@@ -31,8 +33,9 @@ public interface SplitFetcherTask {
         *
         * @return whether the runnable has successfully finished running.
         * @throws InterruptedException when interrupted.
+        * @throws IOException when the performed I/O operation fails.
         */
-       boolean run() throws InterruptedException;
+       boolean run() throws InterruptedException, IOException;
 
        /**
         * Wake up the running thread.
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java
index 89cf81b..b980f7b 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java
@@ -21,6 +21,7 @@ package 
org.apache.flink.connector.base.source.reader.splitreader;
 import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 
+import java.io.IOException;
 import java.util.Queue;
 
 /**
@@ -42,8 +43,9 @@ public interface SplitReader<E, SplitT extends SourceSplit> {
         * @return the Ids of the finished splits.
         *
         * @throws InterruptedException when interrupted
+        * @throws IOException when encountered IO errors, such as 
deserialization failures.
         */
-       RecordsWithSplitIds<E> fetch() throws InterruptedException;
+       RecordsWithSplitIds<E> fetch() throws InterruptedException, IOException;
 
        /**
         * Handle the split changes. This call should be non-blocking.
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitsAddition.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitsAddition.java
index ebd2330..e1a5650 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitsAddition.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitsAddition.java
@@ -30,4 +30,9 @@ public class SplitsAddition<SplitT> extends 
SplitsChange<SplitT> {
        public SplitsAddition(List<SplitT> splits) {
                super(splits);
        }
+
+       @Override
+       public String toString() {
+               return String.format("SplitAddition:[%s]", splits());
+       }
 }
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
index 26504cb..a332efe 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
@@ -53,33 +53,33 @@ public class SourceReaderBaseTest extends 
SourceReaderTestBase<MockSourceSplit>
 
                FutureNotifier futureNotifier = new FutureNotifier();
                FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> 
elementsQueue =
-                               new 
FutureCompletingBlockingQueue<>(futureNotifier);
+                       new FutureCompletingBlockingQueue<>(futureNotifier);
                // We have to handle split changes first, otherwise fetch will 
not be called.
                try (MockSourceReader reader = new MockSourceReader(
-                               futureNotifier,
-                               elementsQueue,
-                               () -> new SplitReader<int[], MockSourceSplit>() 
{
-                                       @Override
-                                       public RecordsWithSplitIds<int[]> 
fetch() {
-                                               throw new 
RuntimeException(errMsg);
-                                       }
-
-                                       @Override
-                                       public void 
handleSplitsChanges(Queue<SplitsChange<MockSourceSplit>> splitsChanges) {
-                                               // We have to handle split 
changes first, otherwise fetch will not be called.
-                                               splitsChanges.clear();
-                                       }
-
-                                       @Override
-                                       public void wakeUp() {
-                                       }
-                               },
-                               getConfig(),
-                               null)) {
+                       futureNotifier,
+                       elementsQueue,
+                       () -> new SplitReader<int[], MockSourceSplit>() {
+                               @Override
+                               public RecordsWithSplitIds<int[]> fetch() {
+                                       throw new RuntimeException(errMsg);
+                               }
+
+                               @Override
+                               public void 
handleSplitsChanges(Queue<SplitsChange<MockSourceSplit>> splitsChanges) {
+                                       // We have to handle split changes 
first, otherwise fetch will not be called.
+                                       splitsChanges.clear();
+                               }
+
+                               @Override
+                               public void wakeUp() {
+                               }
+                       },
+                       getConfig(),
+                       null)) {
                        ValidatingSourceOutput output = new 
ValidatingSourceOutput();
                        reader.addSplits(Collections.singletonList(getSplit(0,
-                                       NUM_RECORDS_PER_SPLIT,
-                                       Boundedness.CONTINUOUS_UNBOUNDED)));
+                               NUM_RECORDS_PER_SPLIT,
+                               Boundedness.CONTINUOUS_UNBOUNDED)));
                        // This is not a real infinite loop, it is supposed to 
throw exception after two polls.
                        while (true) {
                                reader.pollNext(output);
@@ -95,15 +95,15 @@ public class SourceReaderBaseTest extends 
SourceReaderTestBase<MockSourceSplit>
        protected MockSourceReader createReader() {
                FutureNotifier futureNotifier = new FutureNotifier();
                FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> 
elementsQueue =
-                               new 
FutureCompletingBlockingQueue<>(futureNotifier);
+                       new FutureCompletingBlockingQueue<>(futureNotifier);
                MockSplitReader mockSplitReader =
-                               new MockSplitReader(2, true, true);
+                       new MockSplitReader(2, true, true);
                return new MockSourceReader(
-                               futureNotifier,
-                               elementsQueue,
-                               () -> mockSplitReader,
-                               getConfig(),
-                               null);
+                       futureNotifier,
+                       elementsQueue,
+                       () -> mockSplitReader,
+                       getConfig(),
+                       null);
        }
 
        @Override
@@ -130,7 +130,7 @@ public class SourceReaderBaseTest extends 
SourceReaderTestBase<MockSourceSplit>
        }
 
        @Override
-       protected long getIndex(MockSourceSplit split) {
+       protected long getNextRecordIndex(MockSourceSplit split) {
                return split.index();
        }
 
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderTestBase.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderTestBase.java
index 2acd4e1..22dda52 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderTestBase.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderTestBase.java
@@ -130,14 +130,13 @@ public abstract class SourceReaderTestBase<SplitT extends 
SourceSplit> extends T
                ValidatingSourceOutput output = new ValidatingSourceOutput();
                // Add a split to start the fetcher.
                List<SplitT> splits = getSplits(NUM_SPLITS, 
NUM_RECORDS_PER_SPLIT, Boundedness.CONTINUOUS_UNBOUNDED);
-               // Poll 5 records. That means split 0 and 1 will at index 2, 
split 1 will at index 1.
                try (SourceReader<Integer, SplitT> reader =
                                consumeRecords(splits, output, NUM_SPLITS * 
NUM_RECORDS_PER_SPLIT)) {
                        List<SplitT> state = reader.snapshotState();
                        assertEquals("The snapshot should only have 10 splits. 
", NUM_SPLITS, state.size());
                        for (int i = 0; i < NUM_SPLITS; i++) {
                                assertEquals("The first four splits should have 
been fully consumed.",
-                                               NUM_RECORDS_PER_SPLIT, 
getIndex(state.get(i)));
+                                       NUM_RECORDS_PER_SPLIT, 
getNextRecordIndex(state.get(i)));
                        }
                }
        }
@@ -150,12 +149,12 @@ public abstract class SourceReaderTestBase<SplitT extends 
SourceSplit> extends T
 
        protected abstract SplitT getSplit(int splitId, int numRecords, 
Boundedness boundedness);
 
-       protected abstract long getIndex(SplitT split);
+       protected abstract long getNextRecordIndex(SplitT split);
 
        private SourceReader<Integer, SplitT> consumeRecords(
-                       List<SplitT> splits,
-                       ValidatingSourceOutput output,
-                       int n) throws Exception {
+               List<SplitT> splits,
+               ValidatingSourceOutput output,
+               int n) throws Exception {
                SourceReader<Integer, SplitT> reader = createReader();
                // Add splits to start the fetcher.
                reader.addSplits(splits);
@@ -194,7 +193,7 @@ public abstract class SourceReaderTestBase<SplitT extends 
SourceSplit> extends T
                public void validate() {
 
                        assertEquals(String.format("Should be %d distinct 
elements in total", TOTAL_NUM_RECORDS),
-                                       TOTAL_NUM_RECORDS, 
consumedValues.size());
+                               TOTAL_NUM_RECORDS, consumedValues.size());
                        assertEquals(String.format("Should be %d elements in 
total", TOTAL_NUM_RECORDS), TOTAL_NUM_RECORDS, count);
                        assertEquals("The min value should be 0", 0, min);
                        assertEquals("The max value should be " + 
(TOTAL_NUM_RECORDS - 1), TOTAL_NUM_RECORDS - 1, max);

Reply via email to