This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit f220c2486181e4af19b246e9745a4990cd6aa9ce Author: Stephan Ewen <[email protected]> AuthorDate: Tue Sep 15 17:43:16 2020 +0200 [FLINK-19250][connectors] Fix error propagation in connector base (SplitFetcherManager). This makes sure that the reader is notified / woken up when the fetcher encounters an error. --- .../source/reader/fetcher/SplitFetcherManager.java | 4 +- .../reader/fetcher/SplitFetcherManagerTest.java | 159 +++++++++++++++++++++ 2 files changed, 161 insertions(+), 2 deletions(-) diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java index ffac523..7a20a59 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java @@ -92,9 +92,9 @@ public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> { if (!uncaughtFetcherException.compareAndSet(null, t)) { // Add the exception to the exception list. uncaughtFetcherException.get().addSuppressed(t); - // Wake up the main thread to let it know the exception. - elementsQueue.notifyAvailable(); } + // Wake up the main thread to let it know the exception. + elementsQueue.notifyAvailable(); } }; this.splitReaderFactory = splitReaderFactory; diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java new file mode 100644 index 0000000..3ff25e0 --- /dev/null +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.reader.fetcher; + +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.mocks.TestingRecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.mocks.TestingSourceSplit; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.core.testutils.OneShotLatch; + +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Arrays; +import java.util.Collections; +import java.util.Queue; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.fail; + +/** + * Unit tests for the {@link SplitFetcherManager}. + */ +public class SplitFetcherManagerTest { + + @Test + public void testExceptionPropagationFirstFetch() throws Exception { + testExceptionPropagation(); + } + + @Test + public void testExceptionPropagationSuccessiveFetch() throws Exception { + testExceptionPropagation( + new TestingRecordsWithSplitIds<>("testSplit", 1, 2, 3, 4), + new TestingRecordsWithSplitIds<>("testSplit", 5, 6, 7, 8) + ); + } + + // the final modifier is important so that '@SafeVarargs' is accepted on Java 8 + @SuppressWarnings("FinalPrivateMethod") + @SafeVarargs + private final void testExceptionPropagation(final RecordsWithSplitIds<Integer>... fetchesBeforeError) throws Exception { + final IOException testingException = new IOException("test"); + + final FutureCompletingBlockingQueue<RecordsWithSplitIds<Integer>> queue = new FutureCompletingBlockingQueue<>(10); + final AwaitingReader<Integer, TestingSourceSplit> reader = new AwaitingReader<>(testingException, fetchesBeforeError); + final SplitFetcherManager<Integer, TestingSourceSplit> fetcher = createFetcher("testSplit", queue, reader); + + reader.awaitAllRecordsReturned(); + drainQueue(queue); + + assertFalse(queue.getAvailabilityFuture().isDone()); + reader.triggerThrowException(); + + // await the error propagation + queue.getAvailabilityFuture().get(); + + try { + fetcher.checkErrors(); + fail("expected exception"); + } catch (Exception e) { + assertSame(testingException, e.getCause().getCause()); + } finally { + fetcher.close(20_000L); + } + } + + // ------------------------------------------------------------------------ + // test helpers + // ------------------------------------------------------------------------ + + private static <E> SplitFetcherManager<E, TestingSourceSplit> createFetcher( + final String splitId, + final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> queue, + final SplitReader<E, TestingSourceSplit> reader) { + + final SingleThreadFetcherManager<E, TestingSourceSplit> fetcher = + new SingleThreadFetcherManager<>(queue, () -> reader); + fetcher.addSplits(Collections.singletonList(new TestingSourceSplit(splitId))); + return fetcher; + } + + private static void drainQueue(FutureCompletingBlockingQueue<?> queue) { + //noinspection StatementWithEmptyBody + while (queue.poll() != null) {} + } + + // ------------------------------------------------------------------------ + // test mocks + // ------------------------------------------------------------------------ + + private static final class AwaitingReader<E, SplitT extends SourceSplit> implements SplitReader <E, SplitT> { + + private final Queue<RecordsWithSplitIds<E>> fetches; + private final IOException testError; + + private final OneShotLatch inBlocking = new OneShotLatch(); + private final OneShotLatch throwError = new OneShotLatch(); + + @SafeVarargs + AwaitingReader(IOException testError, RecordsWithSplitIds<E>... fetches) { + this.testError = testError; + this.fetches = new ArrayDeque<>(Arrays.asList(fetches)); + } + + @Override + public RecordsWithSplitIds<E> fetch() throws IOException { + if (!fetches.isEmpty()) { + return fetches.poll(); + } else { + inBlocking.trigger(); + try { + throwError.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("interrupted"); + } + throw testError; + } + } + + @Override + public void handleSplitsChanges(Queue<SplitsChange<SplitT>> splitsChanges) { + splitsChanges.clear(); + } + + @Override + public void wakeUp() {} + + public void awaitAllRecordsReturned() throws InterruptedException { + inBlocking.await(); + } + + public void triggerThrowException() { + throwError.trigger(); + } + } +}
