This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2d074715a13a1a638ddd2a7dd138ae94d0ca04f8 Author: Jiangjie (Becket) Qin <[email protected]> AuthorDate: Sun Oct 25 11:07:27 2020 +0800 [FLINK-19698][connector/common] Add a close() method to the SplitReader. --- .../base/source/reader/fetcher/SplitFetcher.java | 10 ++++++++-- .../source/reader/splitreader/SplitReader.java | 7 +++++++ .../base/source/reader/SourceReaderBaseTest.java | 6 ++++-- .../reader/fetcher/SplitFetcherManagerTest.java | 22 ++++++++++++++++++++++ .../source/reader/fetcher/SplitFetcherTest.java | 9 +++++++++ .../base/source/reader/mocks/MockSplitReader.java | 3 +++ .../source/reader/mocks/TestingSplitReader.java | 20 ++++++++++++++++++++ 7 files changed, 73 insertions(+), 4 deletions(-) diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java index 633c452..96663ab 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java @@ -22,6 +22,7 @@ import org.apache.flink.api.connector.source.SourceSplit; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.util.ExceptionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,7 +50,7 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable { private final Runnable shutdownHook; private final AtomicBoolean wakeUp; private final AtomicBoolean closed; - private FetchTask<E, SplitT> fetchTask; + private final FetchTask<E, SplitT> fetchTask; private volatile SplitFetcherTask runningTask = null; /** Flag whether this fetcher has no work assigned at the moment. @@ -91,8 +92,13 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable { runOnce(); } } finally { - shutdownHook.run(); LOG.info("Split fetcher {} exited.", id); + shutdownHook.run(); + try { + splitReader.close(); + } catch (Exception e) { + ExceptionUtils.rethrow(e); + } } } diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java index 7504452..87deeb9 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java @@ -57,4 +57,11 @@ public interface SplitReader<E, SplitT extends SourceSplit> { * {@link #fetch()}. */ void wakeUp(); + + /** + * Close the split reader. + * + * @throws Exception if closing the split reader failed. + */ + void close() throws Exception; } diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java index da057da..f14ddb1 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java @@ -75,8 +75,10 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit> public void handleSplitsChanges(SplitsChange<MockSourceSplit> splitsChanges) {} @Override - public void wakeUp() { - } + public void wakeUp() {} + + @Override + public void close() throws Exception {} }, getConfig(), null)) { diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java index 6d7b8b1..3fd6a792 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java @@ -22,11 +22,13 @@ import org.apache.flink.api.connector.source.SourceSplit; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.mocks.TestingRecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.mocks.TestingSourceSplit; +import org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader; import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.junit.Test; import java.io.IOException; @@ -35,6 +37,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.Queue; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertSame; import static org.junit.Assert.fail; @@ -57,6 +60,22 @@ public class SplitFetcherManagerTest { ); } + @Test + public void testCloseFetcherWithException() throws Exception { + TestingSplitReader<Object, TestingSourceSplit> reader = new TestingSplitReader<>(); + reader.setCloseWithException(); + SplitFetcherManager<Object, TestingSourceSplit> fetcherManager = + createFetcher("test-split", new FutureCompletingBlockingQueue<>(), reader); + fetcherManager.close(1000L); + try { + fetcherManager.checkErrors(); + } catch (Exception e) { + assertEquals( + "Artificial exception on closing the split reader.", + ExceptionUtils.getRootCause(e).getMessage()); + } + } + // the final modifier is important so that '@SafeVarargs' is accepted on Java 8 @SuppressWarnings("FinalPrivateMethod") @SafeVarargs @@ -146,6 +165,9 @@ public class SplitFetcherManagerTest { @Override public void wakeUp() {} + @Override + public void close() throws Exception {} + public void awaitAllRecordsReturned() throws InterruptedException { inBlocking.await(); } diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java index 5082ebf..d96d495 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java @@ -227,6 +227,15 @@ public class SplitFetcherTest { } } + @Test + public void testClose() { + TestingSplitReader<Object, TestingSourceSplit> splitReader = new TestingSplitReader<>(); + final SplitFetcher<Object, TestingSourceSplit> fetcher = createFetcher(splitReader); + fetcher.shutdown(); + fetcher.run(); + assertTrue(splitReader.isClosed()); + } + // ------------------------------------------------------------------------ // testing utils // ------------------------------------------------------------------------ diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java index f3d15f6..e991b9c 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java @@ -77,6 +77,9 @@ public class MockSplitReader implements SplitReader<int[], MockSourceSplit> { } } + @Override + public void close() throws Exception {} + private RecordsBySplits<int[]> getRecords() { final RecordsBySplits.Builder<int[]> records = new RecordsBySplits.Builder<>(); diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java index 2e6c760..505e797 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java @@ -33,11 +33,15 @@ import java.util.Arrays; public class TestingSplitReader<E, SplitT extends SourceSplit> implements SplitReader<E, SplitT> { private final ArrayDeque<RecordsWithSplitIds<E>> fetches; + private volatile boolean closed; + private volatile boolean closeWithException; @SafeVarargs public TestingSplitReader(RecordsWithSplitIds<E>... fetches) { this.fetches = new ArrayDeque<>(fetches.length); this.fetches.addAll(Arrays.asList(fetches)); + this.closed = false; + this.closeWithException = false; } @Override @@ -66,4 +70,20 @@ public class TestingSplitReader<E, SplitT extends SourceSplit> implements SplitR fetches.notifyAll(); } } + + @Override + public void close() throws Exception { + if (closeWithException) { + throw new Exception("Artificial exception on closing the split reader."); + } + closed = true; + } + + public void setCloseWithException() { + closeWithException = true; + } + + public boolean isClosed() { + return closed; + } }
