This is an automated email from the ASF dual-hosted git repository. jqin pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit ceda80842723f3f621279d44ddcbb5210e2a3525 Author: Kezhu Wang <[email protected]> AuthorDate: Mon Nov 9 10:45:58 2020 +0800 [FLINK-19717][connectors/common] Fix spurious InputStatus.END_OF_INPUT from SourceReaderBase.pollNext caused by split reader exception (#13776) --- .../connector/base/source/reader/SourceReaderBase.java | 2 ++ .../base/source/reader/fetcher/SplitFetcher.java | 16 ++++++++++++---- .../base/source/reader/fetcher/SplitFetcherManager.java | 4 ++-- .../base/source/reader/SourceReaderBaseTest.java | 7 ++++++- .../base/source/reader/fetcher/SplitFetcherTest.java | 4 +++- 5 files changed, 25 insertions(+), 8 deletions(-) diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java index 2c6c1812..5b4d6f7 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java @@ -266,6 +266,8 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt return InputStatus.NOTHING_AVAILABLE; } if (elementsQueue.isEmpty()) { + // We may reach here because of exceptional split fetcher, check it. + splitFetcherManager.checkErrors(); return InputStatus.END_OF_INPUT; } else { throw new IllegalStateException("Called 'finishedOrAvailableLater()' with shut-down fetchers but non-empty queue"); diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java index 86cff2d..6b523dc 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java @@ -22,7 +22,6 @@ import org.apache.flink.api.connector.source.SourceSplit; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; -import org.apache.flink.util.ExceptionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,6 +32,7 @@ import java.util.Map; import java.util.concurrent.BlockingDeque; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; /** * The internal fetcher runnable responsible for polling message from the external system. @@ -47,6 +47,7 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable { private final Map<String, SplitT> assignedSplits; private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue; private final SplitReader<E, SplitT> splitReader; + private final Consumer<Throwable> errorHandler; private final Runnable shutdownHook; private final AtomicBoolean wakeUp; private final AtomicBoolean closed; @@ -62,6 +63,7 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable { int id, FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, SplitReader<E, SplitT> splitReader, + Consumer<Throwable> errorHandler, Runnable shutdownHook) { this.id = id; @@ -69,6 +71,7 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable { this.elementsQueue = elementsQueue; this.assignedSplits = new HashMap<>(); this.splitReader = splitReader; + this.errorHandler = errorHandler; this.shutdownHook = shutdownHook; this.isIdle = true; this.wakeUp = new AtomicBoolean(false); @@ -91,14 +94,19 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable { while (!closed.get()) { runOnce(); } + } catch (Throwable t) { + errorHandler.accept(t); } finally { - LOG.info("Split fetcher {} exited.", id); - shutdownHook.run(); try { splitReader.close(); } catch (Exception e) { - ExceptionUtils.rethrow(e); + errorHandler.accept(e); } + LOG.info("Split fetcher {} exited.", id); + // This executes after possible errorHandler.accept(t). If these operations bear + // a happens-before relation, then we can checking side effect of errorHandler.accept(t) + // to know whether it happened after observing side effect of shutdownHook.run(). + shutdownHook.run(); } } diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java index efd60f4..2b5b331 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java @@ -23,7 +23,6 @@ import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.SourceReaderBase; import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; -import org.apache.flink.util.ThrowableCatchingRunnable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -112,7 +111,7 @@ public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> { public abstract void addSplits(List<SplitT> splitsToAdd); protected void startFetcher(SplitFetcher<E, SplitT> fetcher) { - executors.submit(new ThrowableCatchingRunnable(errorHandler, fetcher)); + executors.submit(fetcher); } /** @@ -133,6 +132,7 @@ public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> { fetcherId, elementsQueue, splitReader, + errorHandler, () -> fetchers.remove(fetcherId)); fetchers.put(fetcherId, splitFetcher); return splitFetcher; diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java index f14ddb1..dc8342f 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java @@ -20,6 +20,7 @@ package org.apache.flink.connector.base.source.reader; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.event.NoMoreSplitsEvent; import org.apache.flink.api.connector.source.mocks.MockSourceSplit; import org.apache.flink.api.connector.source.mocks.TestingReaderContext; import org.apache.flink.api.connector.source.mocks.TestingReaderOutput; @@ -33,6 +34,7 @@ import org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader; import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.core.io.InputStatus; import org.junit.Rule; import org.junit.Test; @@ -44,6 +46,7 @@ import java.util.Collections; import java.util.List; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; /** @@ -86,9 +89,11 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit> reader.addSplits(Collections.singletonList(getSplit(0, NUM_RECORDS_PER_SPLIT, Boundedness.CONTINUOUS_UNBOUNDED))); + reader.handleSourceEvents(new NoMoreSplitsEvent()); // This is not a real infinite loop, it is supposed to throw exception after two polls. while (true) { - reader.pollNext(output); + InputStatus inputStatus = reader.pollNext(output); + assertNotEquals(InputStatus.END_OF_INPUT, inputStatus); // Add a sleep to avoid tight loop. Thread.sleep(1); } diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java index d96d495..0b3cdc0 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java @@ -27,6 +27,7 @@ import org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader; import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; import org.apache.flink.core.testutils.CheckedThread; +import org.apache.flink.util.ExceptionUtils; import org.junit.Test; @@ -164,6 +165,7 @@ public class SplitFetcherTest { 0, elementQueue, new MockSplitReader(2, true), + ExceptionUtils::rethrow, () -> {}); // Prepare the splits. @@ -252,7 +254,7 @@ public class SplitFetcherTest { private static <E> SplitFetcher<E, TestingSourceSplit> createFetcher( final SplitReader<E, TestingSourceSplit> reader, final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> queue) { - return new SplitFetcher<>(0, queue, reader, () -> {}); + return new SplitFetcher<>(0, queue, reader, ExceptionUtils::rethrow, () -> {}); } private static <E> SplitFetcher<E, TestingSourceSplit> createFetcherWithSplit(
