This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2d074715a13a1a638ddd2a7dd138ae94d0ca04f8
Author: Jiangjie (Becket) Qin <[email protected]>
AuthorDate: Sun Oct 25 11:07:27 2020 +0800

    [FLINK-19698][connector/common] Add a close() method to the SplitReader.
---
 .../base/source/reader/fetcher/SplitFetcher.java   | 10 ++++++++--
 .../source/reader/splitreader/SplitReader.java     |  7 +++++++
 .../base/source/reader/SourceReaderBaseTest.java   |  6 ++++--
 .../reader/fetcher/SplitFetcherManagerTest.java    | 22 ++++++++++++++++++++++
 .../source/reader/fetcher/SplitFetcherTest.java    |  9 +++++++++
 .../base/source/reader/mocks/MockSplitReader.java  |  3 +++
 .../source/reader/mocks/TestingSplitReader.java    | 20 ++++++++++++++++++++
 7 files changed, 73 insertions(+), 4 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
index 633c452..96663ab 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.util.ExceptionUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,7 +50,7 @@ public class SplitFetcher<E, SplitT extends SourceSplit> 
implements Runnable {
        private final Runnable shutdownHook;
        private final AtomicBoolean wakeUp;
        private final AtomicBoolean closed;
-       private FetchTask<E, SplitT> fetchTask;
+       private final FetchTask<E, SplitT> fetchTask;
        private volatile SplitFetcherTask runningTask = null;
 
        /** Flag whether this fetcher has no work assigned at the moment.
@@ -91,8 +92,13 @@ public class SplitFetcher<E, SplitT extends SourceSplit> 
implements Runnable {
                                runOnce();
                        }
                } finally {
-                       shutdownHook.run();
                        LOG.info("Split fetcher {} exited.", id);
+                       shutdownHook.run();
+                       try {
+                               splitReader.close();
+                       } catch (Exception e) {
+                               ExceptionUtils.rethrow(e);
+                       }
                }
        }
 
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java
index 7504452..87deeb9 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java
@@ -57,4 +57,11 @@ public interface SplitReader<E, SplitT extends SourceSplit> {
         * {@link #fetch()}.
         */
        void wakeUp();
+
+       /**
+        * Close the split reader.
+        *
+        * @throws Exception if closing the split reader failed.
+        */
+       void close() throws Exception;
 }
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
index da057da..f14ddb1 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
@@ -75,8 +75,10 @@ public class SourceReaderBaseTest extends 
SourceReaderTestBase<MockSourceSplit>
                                public void 
handleSplitsChanges(SplitsChange<MockSourceSplit> splitsChanges) {}
 
                                @Override
-                               public void wakeUp() {
-                               }
+                               public void wakeUp() {}
+
+                               @Override
+                               public void close() throws Exception {}
                        },
                        getConfig(),
                        null)) {
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java
index 6d7b8b1..3fd6a792 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java
@@ -22,11 +22,13 @@ import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import 
org.apache.flink.connector.base.source.reader.mocks.TestingRecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.mocks.TestingSourceSplit;
+import org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
 import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
 import org.apache.flink.core.testutils.OneShotLatch;
 
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -35,6 +37,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Queue;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.fail;
@@ -57,6 +60,22 @@ public class SplitFetcherManagerTest {
                );
        }
 
+       @Test
+       public void testCloseFetcherWithException() throws Exception {
+               TestingSplitReader<Object, TestingSourceSplit> reader = new 
TestingSplitReader<>();
+               reader.setCloseWithException();
+               SplitFetcherManager<Object, TestingSourceSplit> fetcherManager =
+                               createFetcher("test-split", new 
FutureCompletingBlockingQueue<>(), reader);
+               fetcherManager.close(1000L);
+               try {
+                       fetcherManager.checkErrors();
+               } catch (Exception e) {
+                       assertEquals(
+                                       "Artificial exception on closing the 
split reader.",
+                                       
ExceptionUtils.getRootCause(e).getMessage());
+               }
+       }
+
        // the final modifier is important so that '@SafeVarargs' is accepted 
on Java 8
        @SuppressWarnings("FinalPrivateMethod")
        @SafeVarargs
@@ -146,6 +165,9 @@ public class SplitFetcherManagerTest {
                @Override
                public void wakeUp() {}
 
+               @Override
+               public void close() throws Exception {}
+
                public void awaitAllRecordsReturned() throws 
InterruptedException {
                        inBlocking.await();
                }
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
index 5082ebf..d96d495 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
@@ -227,6 +227,15 @@ public class SplitFetcherTest {
                }
        }
 
+       @Test
+       public void testClose() {
+               TestingSplitReader<Object, TestingSourceSplit> splitReader = 
new TestingSplitReader<>();
+               final SplitFetcher<Object, TestingSourceSplit> fetcher = 
createFetcher(splitReader);
+               fetcher.shutdown();
+               fetcher.run();
+               assertTrue(splitReader.isClosed());
+       }
+
        // 
------------------------------------------------------------------------
        //  testing utils
        // 
------------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java
index f3d15f6..e991b9c 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java
@@ -77,6 +77,9 @@ public class MockSplitReader implements SplitReader<int[], 
MockSourceSplit> {
                }
        }
 
+       @Override
+       public void close() throws Exception {}
+
        private RecordsBySplits<int[]> getRecords() {
                final RecordsBySplits.Builder<int[]> records = new 
RecordsBySplits.Builder<>();
 
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java
index 2e6c760..505e797 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java
@@ -33,11 +33,15 @@ import java.util.Arrays;
 public class TestingSplitReader<E, SplitT extends SourceSplit> implements 
SplitReader<E, SplitT> {
 
        private final ArrayDeque<RecordsWithSplitIds<E>> fetches;
+       private volatile boolean closed;
+       private volatile boolean closeWithException;
 
        @SafeVarargs
        public TestingSplitReader(RecordsWithSplitIds<E>... fetches) {
                this.fetches = new ArrayDeque<>(fetches.length);
                this.fetches.addAll(Arrays.asList(fetches));
+               this.closed = false;
+               this.closeWithException = false;
        }
 
        @Override
@@ -66,4 +70,20 @@ public class TestingSplitReader<E, SplitT extends 
SourceSplit> implements SplitR
                        fetches.notifyAll();
                }
        }
+
+       @Override
+       public void close() throws Exception {
+               if (closeWithException) {
+                       throw new Exception("Artificial exception on closing 
the split reader.");
+               }
+               closed = true;
+       }
+
+       public void setCloseWithException() {
+               closeWithException = true;
+       }
+
+       public boolean isClosed() {
+               return closed;
+       }
 }

Reply via email to