This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ceda80842723f3f621279d44ddcbb5210e2a3525
Author: Kezhu Wang <[email protected]>
AuthorDate: Mon Nov 9 10:45:58 2020 +0800

    [FLINK-19717][connectors/common] Fix spurious InputStatus.END_OF_INPUT from 
SourceReaderBase.pollNext caused by split reader exception (#13776)
---
 .../connector/base/source/reader/SourceReaderBase.java   |  2 ++
 .../base/source/reader/fetcher/SplitFetcher.java         | 16 ++++++++++++----
 .../base/source/reader/fetcher/SplitFetcherManager.java  |  4 ++--
 .../base/source/reader/SourceReaderBaseTest.java         |  7 ++++++-
 .../base/source/reader/fetcher/SplitFetcherTest.java     |  4 +++-
 5 files changed, 25 insertions(+), 8 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
index 2c6c1812..5b4d6f7 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
@@ -266,6 +266,8 @@ public abstract class SourceReaderBase<E, T, SplitT extends 
SourceSplit, SplitSt
                        return InputStatus.NOTHING_AVAILABLE;
                }
                if (elementsQueue.isEmpty()) {
+                       // We may reach here because of exceptional split 
fetcher, check it.
+                       splitFetcherManager.checkErrors();
                        return InputStatus.END_OF_INPUT;
                } else {
                        throw new IllegalStateException("Called 
'finishedOrAvailableLater()' with shut-down fetchers but non-empty queue");
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
index 86cff2d..6b523dc 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
-import org.apache.flink.util.ExceptionUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,6 +32,7 @@ import java.util.Map;
 import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
 
 /**
  * The internal fetcher runnable responsible for polling message from the 
external system.
@@ -47,6 +47,7 @@ public class SplitFetcher<E, SplitT extends SourceSplit> 
implements Runnable {
        private final Map<String, SplitT> assignedSplits;
        private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> 
elementsQueue;
        private final SplitReader<E, SplitT> splitReader;
+       private final Consumer<Throwable> errorHandler;
        private final Runnable shutdownHook;
        private final AtomicBoolean wakeUp;
        private final AtomicBoolean closed;
@@ -62,6 +63,7 @@ public class SplitFetcher<E, SplitT extends SourceSplit> 
implements Runnable {
                        int id,
                        FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> 
elementsQueue,
                        SplitReader<E, SplitT> splitReader,
+                       Consumer<Throwable> errorHandler,
                        Runnable shutdownHook) {
 
                this.id = id;
@@ -69,6 +71,7 @@ public class SplitFetcher<E, SplitT extends SourceSplit> 
implements Runnable {
                this.elementsQueue = elementsQueue;
                this.assignedSplits = new HashMap<>();
                this.splitReader = splitReader;
+               this.errorHandler = errorHandler;
                this.shutdownHook = shutdownHook;
                this.isIdle = true;
                this.wakeUp = new AtomicBoolean(false);
@@ -91,14 +94,19 @@ public class SplitFetcher<E, SplitT extends SourceSplit> 
implements Runnable {
                        while (!closed.get()) {
                                runOnce();
                        }
+               } catch (Throwable t) {
+                       errorHandler.accept(t);
                } finally {
-                       LOG.info("Split fetcher {} exited.", id);
-                       shutdownHook.run();
                        try {
                                splitReader.close();
                        } catch (Exception e) {
-                               ExceptionUtils.rethrow(e);
+                               errorHandler.accept(e);
                        }
+                       LOG.info("Split fetcher {} exited.", id);
+                       // This executes after possible errorHandler.accept(t). 
If these operations bear
+                       // a happens-before relation, then we can checking side 
effect of errorHandler.accept(t)
+                       // to know whether it happened after observing side 
effect of shutdownHook.run().
+                       shutdownHook.run();
                }
        }
 
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
index efd60f4..2b5b331 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
@@ -23,7 +23,6 @@ import 
org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.SourceReaderBase;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
-import org.apache.flink.util.ThrowableCatchingRunnable;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -112,7 +111,7 @@ public abstract class SplitFetcherManager<E, SplitT extends 
SourceSplit> {
        public abstract void addSplits(List<SplitT> splitsToAdd);
 
        protected void startFetcher(SplitFetcher<E, SplitT> fetcher) {
-               executors.submit(new ThrowableCatchingRunnable(errorHandler, 
fetcher));
+               executors.submit(fetcher);
        }
 
        /**
@@ -133,6 +132,7 @@ public abstract class SplitFetcherManager<E, SplitT extends 
SourceSplit> {
                        fetcherId,
                        elementsQueue,
                        splitReader,
+                       errorHandler,
                        () -> fetchers.remove(fetcherId));
                fetchers.put(fetcherId, splitFetcher);
                return splitFetcher;
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
index f14ddb1..dc8342f 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.connector.base.source.reader;
 
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.event.NoMoreSplitsEvent;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
 import org.apache.flink.api.connector.source.mocks.TestingReaderContext;
 import org.apache.flink.api.connector.source.mocks.TestingReaderOutput;
@@ -33,6 +34,7 @@ import 
org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
 import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.core.io.InputStatus;
 
 import org.junit.Rule;
 import org.junit.Test;
@@ -44,6 +46,7 @@ import java.util.Collections;
 import java.util.List;
 
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -86,9 +89,11 @@ public class SourceReaderBaseTest extends 
SourceReaderTestBase<MockSourceSplit>
                        reader.addSplits(Collections.singletonList(getSplit(0,
                                NUM_RECORDS_PER_SPLIT,
                                Boundedness.CONTINUOUS_UNBOUNDED)));
+                       reader.handleSourceEvents(new NoMoreSplitsEvent());
                        // This is not a real infinite loop, it is supposed to 
throw exception after two polls.
                        while (true) {
-                               reader.pollNext(output);
+                               InputStatus inputStatus = 
reader.pollNext(output);
+                               assertNotEquals(InputStatus.END_OF_INPUT, 
inputStatus);
                                // Add a sleep to avoid tight loop.
                                Thread.sleep(1);
                        }
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
index d96d495..0b3cdc0 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
 import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.util.ExceptionUtils;
 
 import org.junit.Test;
 
@@ -164,6 +165,7 @@ public class SplitFetcherTest {
                                                0,
                                                elementQueue,
                                                new MockSplitReader(2, true),
+                                               ExceptionUtils::rethrow,
                                                () -> {});
 
                // Prepare the splits.
@@ -252,7 +254,7 @@ public class SplitFetcherTest {
        private static <E> SplitFetcher<E, TestingSourceSplit> createFetcher(
                        final SplitReader<E, TestingSourceSplit> reader,
                        final 
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> queue) {
-               return new SplitFetcher<>(0, queue, reader, () -> {});
+               return new SplitFetcher<>(0, queue, reader, 
ExceptionUtils::rethrow, () -> {});
        }
 
        private static <E> SplitFetcher<E, TestingSourceSplit> 
createFetcherWithSplit(

Reply via email to