This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7c55a64aaacc4c023cabbe04f1ee5ab79c466dc1 Author: Jiangjie (Becket) Qin <[email protected]> AuthorDate: Mon Sep 7 21:12:55 2020 +0200 [refactor][connectors] Backport of the connector-base exception handling from the Kafka Connector Pull Request --- .../SingleThreadMultiplexSourceReaderBase.java | 26 ++++----- .../base/source/reader/fetcher/FetchTask.java | 11 ++-- .../base/source/reader/fetcher/SplitFetcher.java | 34 +++++++----- .../source/reader/fetcher/SplitFetcherTask.java | 5 +- .../source/reader/splitreader/SplitReader.java | 4 +- .../source/reader/splitreader/SplitsAddition.java | 5 ++ .../base/source/reader/SourceReaderBaseTest.java | 62 +++++++++++----------- .../base/source/reader/SourceReaderTestBase.java | 13 +++-- 8 files changed, 90 insertions(+), 70 deletions(-) diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java index 546e20a..3239f28 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java @@ -37,21 +37,21 @@ import java.util.function.Supplier; * @param <SplitStateT> */ public abstract class SingleThreadMultiplexSourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT> - extends SourceReaderBase<E, T, SplitT, SplitStateT> { + extends SourceReaderBase<E, T, SplitT, SplitStateT> { public SingleThreadMultiplexSourceReaderBase( - FutureNotifier futureNotifier, - FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, - Supplier<SplitReader<E, SplitT>> splitFetcherSupplier, - RecordEmitter<E, T, SplitStateT> recordEmitter, - Configuration config, - SourceReaderContext context) { + FutureNotifier futureNotifier, + FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, + Supplier<SplitReader<E, SplitT>> splitReaderSupplier, + RecordEmitter<E, T, SplitStateT> recordEmitter, + Configuration config, + SourceReaderContext context) { super( - futureNotifier, - elementsQueue, - new SingleThreadFetcherManager<>(futureNotifier, elementsQueue, splitFetcherSupplier), - recordEmitter, - config, - context); + futureNotifier, + elementsQueue, + new SingleThreadFetcherManager<>(futureNotifier, elementsQueue, splitReaderSupplier), + recordEmitter, + config, + context); } } diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java index aff21fd..30835ce 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java @@ -22,6 +22,7 @@ import org.apache.flink.api.connector.source.SourceSplit; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import java.io.IOException; import java.util.Collection; import java.util.concurrent.BlockingQueue; import java.util.function.Consumer; @@ -38,10 +39,10 @@ class FetchTask<E, SplitT extends SourceSplit> implements SplitFetcherTask { private volatile boolean wakeup; FetchTask( - SplitReader<E, SplitT> splitReader, - BlockingQueue<RecordsWithSplitIds<E>> elementsQueue, - Consumer<Collection<String>> splitFinishedCallback, - Thread runningThread) { + SplitReader<E, SplitT> splitReader, + BlockingQueue<RecordsWithSplitIds<E>> elementsQueue, + Consumer<Collection<String>> splitFinishedCallback, + Thread runningThread) { this.splitReader = splitReader; this.elementsQueue = elementsQueue; this.splitFinishedCallback = splitFinishedCallback; @@ -51,7 +52,7 @@ class FetchTask<E, SplitT extends SourceSplit> implements SplitFetcherTask { } @Override - public boolean run() throws InterruptedException { + public boolean run() throws InterruptedException, IOException { try { if (!isWakenUp() && lastRecords == null) { lastRecords = splitReader.fetch(); diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java index d006bb0..35deeba 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java @@ -26,6 +26,7 @@ import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -60,10 +61,10 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable { private volatile boolean isIdle; SplitFetcher( - int id, - BlockingQueue<RecordsWithSplitIds<E>> elementsQueue, - SplitReader<E, SplitT> splitReader, - Runnable shutdownHook) { + int id, + BlockingQueue<RecordsWithSplitIds<E>> elementsQueue, + SplitReader<E, SplitT> splitReader, + Runnable shutdownHook) { this.id = id; this.taskQueue = new LinkedBlockingDeque<>(); @@ -84,12 +85,12 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable { // Remove the split from the assignments if it is already done. runningThread = Thread.currentThread(); this.fetchTask = new FetchTask<>( - splitReader, - elementsQueue, - ids -> { - ids.forEach(assignedSplits::remove); - updateIsIdle(); - }, runningThread); + splitReader, + elementsQueue, + ids -> { + ids.forEach(this::removeAssignedSplit); + updateIsIdle(); + }, runningThread); while (!closed.get()) { runOnce(); } @@ -139,8 +140,11 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable { LOG.debug("Split fetcher has been waken up."); } else { throw new RuntimeException(String.format( - "SplitFetcher thread %d interrupted while polling the records", id), ie); + "SplitFetcher thread %d interrupted while polling the records", id), ie); } + } catch (IOException ioe) { + throw new RuntimeException(String.format( + "SplitFetcher thread %d received unexpected exception while polling the records", id), ioe); } // If the task is not null that means this task needs to be re-executed. This only // happens when the task is the fetching task or the task was interrupted. @@ -272,7 +276,7 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable { // Only enqueue unfinished non-fetch task. if (!closed.get() && isRunningTask(task) && task != fetchTask && !taskQueue.offerFirst(task)) { throw new RuntimeException( - "The task queue is full. This is only theoretically possible when really bad thing happens."); + "The task queue is full. This is only theoretically possible when really bad thing happens."); } if (task != null) { LOG.debug("Enqueued task {}", task); @@ -283,6 +287,12 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable { return task != null && task != WAKEUP_TASK; } + private void removeAssignedSplit(String splitId) { + assignedSplits.remove(splitId); + LOG.debug("Removed {} split from assigned splits. The assigned splits now are {}", splitId, assignedSplits); + + } + //--------------------- Helper class ------------------ private static class DummySplitFetcherTask implements SplitFetcherTask { diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java index 716d2e2..999601a 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java @@ -18,6 +18,8 @@ package org.apache.flink.connector.base.source.reader.fetcher; +import java.io.IOException; + /** * An interface similar to {@link Runnable} but allows throwing exceptions and wakeup. */ @@ -31,8 +33,9 @@ public interface SplitFetcherTask { * * @return whether the runnable has successfully finished running. * @throws InterruptedException when interrupted. + * @throws IOException when the performed I/O operation fails. */ - boolean run() throws InterruptedException; + boolean run() throws InterruptedException, IOException; /** * Wake up the running thread. diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java index 89cf81b..b980f7b 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java @@ -21,6 +21,7 @@ package org.apache.flink.connector.base.source.reader.splitreader; import org.apache.flink.api.connector.source.SourceSplit; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import java.io.IOException; import java.util.Queue; /** @@ -42,8 +43,9 @@ public interface SplitReader<E, SplitT extends SourceSplit> { * @return the Ids of the finished splits. * * @throws InterruptedException when interrupted + * @throws IOException when encountered IO errors, such as deserialization failures. */ - RecordsWithSplitIds<E> fetch() throws InterruptedException; + RecordsWithSplitIds<E> fetch() throws InterruptedException, IOException; /** * Handle the split changes. This call should be non-blocking. diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitsAddition.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitsAddition.java index ebd2330..e1a5650 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitsAddition.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitsAddition.java @@ -30,4 +30,9 @@ public class SplitsAddition<SplitT> extends SplitsChange<SplitT> { public SplitsAddition(List<SplitT> splits) { super(splits); } + + @Override + public String toString() { + return String.format("SplitAddition:[%s]", splits()); + } } diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java index 26504cb..a332efe 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java @@ -53,33 +53,33 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit> FutureNotifier futureNotifier = new FutureNotifier(); FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementsQueue = - new FutureCompletingBlockingQueue<>(futureNotifier); + new FutureCompletingBlockingQueue<>(futureNotifier); // We have to handle split changes first, otherwise fetch will not be called. try (MockSourceReader reader = new MockSourceReader( - futureNotifier, - elementsQueue, - () -> new SplitReader<int[], MockSourceSplit>() { - @Override - public RecordsWithSplitIds<int[]> fetch() { - throw new RuntimeException(errMsg); - } - - @Override - public void handleSplitsChanges(Queue<SplitsChange<MockSourceSplit>> splitsChanges) { - // We have to handle split changes first, otherwise fetch will not be called. - splitsChanges.clear(); - } - - @Override - public void wakeUp() { - } - }, - getConfig(), - null)) { + futureNotifier, + elementsQueue, + () -> new SplitReader<int[], MockSourceSplit>() { + @Override + public RecordsWithSplitIds<int[]> fetch() { + throw new RuntimeException(errMsg); + } + + @Override + public void handleSplitsChanges(Queue<SplitsChange<MockSourceSplit>> splitsChanges) { + // We have to handle split changes first, otherwise fetch will not be called. + splitsChanges.clear(); + } + + @Override + public void wakeUp() { + } + }, + getConfig(), + null)) { ValidatingSourceOutput output = new ValidatingSourceOutput(); reader.addSplits(Collections.singletonList(getSplit(0, - NUM_RECORDS_PER_SPLIT, - Boundedness.CONTINUOUS_UNBOUNDED))); + NUM_RECORDS_PER_SPLIT, + Boundedness.CONTINUOUS_UNBOUNDED))); // This is not a real infinite loop, it is supposed to throw exception after two polls. while (true) { reader.pollNext(output); @@ -95,15 +95,15 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit> protected MockSourceReader createReader() { FutureNotifier futureNotifier = new FutureNotifier(); FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementsQueue = - new FutureCompletingBlockingQueue<>(futureNotifier); + new FutureCompletingBlockingQueue<>(futureNotifier); MockSplitReader mockSplitReader = - new MockSplitReader(2, true, true); + new MockSplitReader(2, true, true); return new MockSourceReader( - futureNotifier, - elementsQueue, - () -> mockSplitReader, - getConfig(), - null); + futureNotifier, + elementsQueue, + () -> mockSplitReader, + getConfig(), + null); } @Override @@ -130,7 +130,7 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit> } @Override - protected long getIndex(MockSourceSplit split) { + protected long getNextRecordIndex(MockSourceSplit split) { return split.index(); } diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderTestBase.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderTestBase.java index 2acd4e1..22dda52 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderTestBase.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderTestBase.java @@ -130,14 +130,13 @@ public abstract class SourceReaderTestBase<SplitT extends SourceSplit> extends T ValidatingSourceOutput output = new ValidatingSourceOutput(); // Add a split to start the fetcher. List<SplitT> splits = getSplits(NUM_SPLITS, NUM_RECORDS_PER_SPLIT, Boundedness.CONTINUOUS_UNBOUNDED); - // Poll 5 records. That means split 0 and 1 will at index 2, split 1 will at index 1. try (SourceReader<Integer, SplitT> reader = consumeRecords(splits, output, NUM_SPLITS * NUM_RECORDS_PER_SPLIT)) { List<SplitT> state = reader.snapshotState(); assertEquals("The snapshot should only have 10 splits. ", NUM_SPLITS, state.size()); for (int i = 0; i < NUM_SPLITS; i++) { assertEquals("The first four splits should have been fully consumed.", - NUM_RECORDS_PER_SPLIT, getIndex(state.get(i))); + NUM_RECORDS_PER_SPLIT, getNextRecordIndex(state.get(i))); } } } @@ -150,12 +149,12 @@ public abstract class SourceReaderTestBase<SplitT extends SourceSplit> extends T protected abstract SplitT getSplit(int splitId, int numRecords, Boundedness boundedness); - protected abstract long getIndex(SplitT split); + protected abstract long getNextRecordIndex(SplitT split); private SourceReader<Integer, SplitT> consumeRecords( - List<SplitT> splits, - ValidatingSourceOutput output, - int n) throws Exception { + List<SplitT> splits, + ValidatingSourceOutput output, + int n) throws Exception { SourceReader<Integer, SplitT> reader = createReader(); // Add splits to start the fetcher. reader.addSplits(splits); @@ -194,7 +193,7 @@ public abstract class SourceReaderTestBase<SplitT extends SourceSplit> extends T public void validate() { assertEquals(String.format("Should be %d distinct elements in total", TOTAL_NUM_RECORDS), - TOTAL_NUM_RECORDS, consumedValues.size()); + TOTAL_NUM_RECORDS, consumedValues.size()); assertEquals(String.format("Should be %d elements in total", TOTAL_NUM_RECORDS), TOTAL_NUM_RECORDS, count); assertEquals("The min value should be 0", 0, min); assertEquals("The max value should be " + (TOTAL_NUM_RECORDS - 1), TOTAL_NUM_RECORDS - 1, max);
