This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f220c2486181e4af19b246e9745a4990cd6aa9ce
Author: Stephan Ewen <[email protected]>
AuthorDate: Tue Sep 15 17:43:16 2020 +0200

    [FLINK-19250][connectors] Fix error propagation in connector base 
(SplitFetcherManager).
    
    This makes sure that the reader is notified / woken up when the fetcher 
encounters an error.
---
 .../source/reader/fetcher/SplitFetcherManager.java |   4 +-
 .../reader/fetcher/SplitFetcherManagerTest.java    | 159 +++++++++++++++++++++
 2 files changed, 161 insertions(+), 2 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
index ffac523..7a20a59 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
@@ -92,9 +92,9 @@ public abstract class SplitFetcherManager<E, SplitT extends 
SourceSplit> {
                                if 
(!uncaughtFetcherException.compareAndSet(null, t)) {
                                        // Add the exception to the exception 
list.
                                        
uncaughtFetcherException.get().addSuppressed(t);
-                                       // Wake up the main thread to let it 
know the exception.
-                                       elementsQueue.notifyAvailable();
                                }
+                               // Wake up the main thread to let it know the 
exception.
+                               elementsQueue.notifyAvailable();
                        }
                };
                this.splitReaderFactory = splitReaderFactory;
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java
new file mode 100644
index 0000000..3ff25e0
--- /dev/null
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.reader.fetcher;
+
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import 
org.apache.flink.connector.base.source.reader.mocks.TestingRecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.mocks.TestingSourceSplit;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.core.testutils.OneShotLatch;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Queue;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.fail;
+
+/**
+ * Unit tests for the {@link SplitFetcherManager}.
+ */
+public class SplitFetcherManagerTest {
+
+       @Test
+       public void testExceptionPropagationFirstFetch() throws Exception {
+               testExceptionPropagation();
+       }
+
+       @Test
+       public void testExceptionPropagationSuccessiveFetch() throws Exception {
+               testExceptionPropagation(
+                               new TestingRecordsWithSplitIds<>("testSplit", 
1, 2, 3, 4),
+                               new TestingRecordsWithSplitIds<>("testSplit", 
5, 6, 7, 8)
+               );
+       }
+
+       // the final modifier is important so that '@SafeVarargs' is accepted 
on Java 8
+       @SuppressWarnings("FinalPrivateMethod")
+       @SafeVarargs
+       private final void testExceptionPropagation(final 
RecordsWithSplitIds<Integer>... fetchesBeforeError) throws Exception {
+               final IOException testingException = new IOException("test");
+
+               final 
FutureCompletingBlockingQueue<RecordsWithSplitIds<Integer>> queue = new 
FutureCompletingBlockingQueue<>(10);
+               final AwaitingReader<Integer, TestingSourceSplit> reader = new 
AwaitingReader<>(testingException, fetchesBeforeError);
+               final SplitFetcherManager<Integer, TestingSourceSplit> fetcher 
= createFetcher("testSplit", queue, reader);
+
+               reader.awaitAllRecordsReturned();
+               drainQueue(queue);
+
+               assertFalse(queue.getAvailabilityFuture().isDone());
+               reader.triggerThrowException();
+
+               // await the error propagation
+               queue.getAvailabilityFuture().get();
+
+               try {
+                       fetcher.checkErrors();
+                       fail("expected exception");
+               } catch (Exception e) {
+                       assertSame(testingException, e.getCause().getCause());
+               } finally {
+                       fetcher.close(20_000L);
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  test helpers
+       // 
------------------------------------------------------------------------
+
+       private static <E> SplitFetcherManager<E, TestingSourceSplit> 
createFetcher(
+                       final String splitId,
+                       final 
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> queue,
+                       final SplitReader<E, TestingSourceSplit> reader) {
+
+               final SingleThreadFetcherManager<E, TestingSourceSplit> fetcher 
=
+                               new SingleThreadFetcherManager<>(queue, () -> 
reader);
+               fetcher.addSplits(Collections.singletonList(new 
TestingSourceSplit(splitId)));
+               return fetcher;
+       }
+
+       private static void drainQueue(FutureCompletingBlockingQueue<?> queue) {
+               //noinspection StatementWithEmptyBody
+               while (queue.poll() != null) {}
+       }
+
+       // 
------------------------------------------------------------------------
+       //  test mocks
+       // 
------------------------------------------------------------------------
+
+       private static final class AwaitingReader<E, SplitT extends 
SourceSplit> implements SplitReader <E, SplitT> {
+
+               private final Queue<RecordsWithSplitIds<E>> fetches;
+               private final IOException testError;
+
+               private final OneShotLatch inBlocking = new OneShotLatch();
+               private final OneShotLatch throwError = new OneShotLatch();
+
+               @SafeVarargs
+               AwaitingReader(IOException testError, RecordsWithSplitIds<E>... 
fetches) {
+                       this.testError = testError;
+                       this.fetches = new ArrayDeque<>(Arrays.asList(fetches));
+               }
+
+               @Override
+               public RecordsWithSplitIds<E> fetch() throws IOException {
+                       if (!fetches.isEmpty()) {
+                               return fetches.poll();
+                       } else {
+                               inBlocking.trigger();
+                               try {
+                                       throwError.await();
+                               } catch (InterruptedException e) {
+                                       Thread.currentThread().interrupt();
+                                       throw new IOException("interrupted");
+                               }
+                               throw testError;
+                       }
+               }
+
+               @Override
+               public void handleSplitsChanges(Queue<SplitsChange<SplitT>> 
splitsChanges) {
+                       splitsChanges.clear();
+               }
+
+               @Override
+               public void wakeUp() {}
+
+               public void awaitAllRecordsReturned() throws 
InterruptedException {
+                       inBlocking.await();
+               }
+
+               public void triggerThrowException() {
+                       throwError.trigger();
+               }
+       }
+}

Reply via email to