This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 55745d0ee829d49ee3d6532f6b495e7ed04b26d8 Author: Stephan Ewen <[email protected]> AuthorDate: Mon Sep 14 12:26:47 2020 +0200 [FLINK-19225][connectors] Various small improvements to SourceReaderBase - A slight improvement of main loop, to avoid a branch and improve inlining friendlyness. - Check for fetcher errors when moving between fetched. This is still guaranteed to propagate the errors, just some micro- or milliseconds later. In return we save one or two volatile accesses on the hot per-record path. - Extend logging in SourceReaderBase to simplify the debugging in the case something goes wrong. - Add Task name to Split Fetcher Threads to make it easier to attribute the threads when analyzing stack traces. --- .../base/source/reader/SourceReaderBase.java | 31 ++++++++++++++-------- .../source/reader/fetcher/SplitFetcherManager.java | 3 ++- 2 files changed, 22 insertions(+), 12 deletions(-) diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java index 979afb2..0305e2d 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java @@ -116,12 +116,13 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt @Override public InputStatus pollNext(ReaderOutput<T> output) throws Exception { - splitFetcherManager.checkErrors(); - // make sure we have a fetch we are working on, or move to the next - final RecordsWithSplitIds<E> recordsWithSplitId = getCurrentOrNewFetch(output); + RecordsWithSplitIds<E> recordsWithSplitId = this.currentFetch; if (recordsWithSplitId == null) { - return trace(finishedOrAvailableLater()); + recordsWithSplitId = getNextFetch(output); + if (recordsWithSplitId == null) { + return trace(finishedOrAvailableLater()); + } } // we need to loop here, because we may have to go across splits @@ -132,6 +133,12 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt // emit the record. recordEmitter.emitRecord(record, currentSplitOutput, currentSplitContext.state); LOG.trace("Emitted record: {}", record); + + // We always emit MORE_AVAILABLE here, even though we do not strictly know whether + // more is available. If nothing more is available, the next invocation will find + // this out and return the correct status. + // That means we emit the occasional 'false positive' for availability, but this + // saves us doing checks for every record. Ultimately, this is cheaper. return trace(InputStatus.MORE_AVAILABLE); } else if (!moveToNextSplit(recordsWithSplitId, output)) { @@ -147,13 +154,11 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt } @Nullable - private RecordsWithSplitIds<E> getCurrentOrNewFetch(final ReaderOutput<T> output) { - RecordsWithSplitIds<E> recordsWithSplitId = this.currentFetch; - if (recordsWithSplitId != null) { - return recordsWithSplitId; - } + private RecordsWithSplitIds<E> getNextFetch(final ReaderOutput<T> output) { + splitFetcherManager.checkErrors(); - recordsWithSplitId = elementsQueue.poll(); + LOG.trace("Getting next source data batch from queue"); + final RecordsWithSplitIds<E> recordsWithSplitId = elementsQueue.poll(); if (recordsWithSplitId == null || !moveToNextSplit(recordsWithSplitId, output)) { // No element available, set to available later if needed. return null; @@ -170,6 +175,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt final Set<String> finishedSplits = fetch.finishedSplits(); if (!finishedSplits.isEmpty()) { + LOG.info("Finished reading split(s) {}", finishedSplits); for (String finishedSplitId : finishedSplits) { splitStates.remove(finishedSplitId); output.releaseOutputForSplit(finishedSplitId); @@ -183,6 +189,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt private boolean moveToNextSplit(RecordsWithSplitIds<E> recordsWithSplitIds, ReaderOutput<T> output) { final String nextSplitId = recordsWithSplitIds.nextSplit(); if (nextSplitId == null) { + LOG.trace("Current fetch is finished."); finishCurrentFetch(recordsWithSplitIds, output); return false; } @@ -190,6 +197,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt currentSplitContext = splitStates.get(nextSplitId); checkState(currentSplitContext != null, "Have records for a split that was not registered"); currentSplitOutput = currentSplitContext.getOrCreateSplitOutput(output); + LOG.trace("Emitting records from fetch for split {}", nextSplitId); return true; } @@ -218,7 +226,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt @Override public void addSplits(List<SplitT> splits) { - LOG.trace("Adding splits {}", splits); + LOG.info("Adding split(s) to reader: {}", splits); // Initialize the state for each split. splits.forEach(s -> splitStates.put(s.splitId(), new SplitContext<>(s.splitId(), initializedState(s)))); // Hand over the splits to the split fetcher to start fetch. @@ -229,6 +237,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt public void handleSourceEvents(SourceEvent sourceEvent) { LOG.trace("Handling source event: {}", sourceEvent); if (sourceEvent instanceof NoMoreSplitsEvent) { + LOG.info("Reader received NoMoreSplits event."); noMoreSplitsAssignment = true; futureNotifier.notifyComplete(); } diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java index 822a9a9..26d92e3 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java @@ -107,7 +107,8 @@ public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> { // Create the executor with a thread factory that fails the source reader if one of // the fetcher thread exits abnormally. - this.executors = Executors.newCachedThreadPool(r -> new Thread(r, "SourceFetcher")); + final String taskThreadName = Thread.currentThread().getName(); + this.executors = Executors.newCachedThreadPool(r -> new Thread(r, "Source Data Fetcher for " + taskThreadName)); this.closed = false; }
