This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 55745d0ee829d49ee3d6532f6b495e7ed04b26d8
Author: Stephan Ewen <[email protected]>
AuthorDate: Mon Sep 14 12:26:47 2020 +0200

    [FLINK-19225][connectors] Various small improvements to SourceReaderBase
    
     - A slight improvement of main loop, to avoid a branch and improve 
inlining friendlyness.
    
     - Check for fetcher errors when moving between fetched.
       This is still guaranteed to propagate the errors, just some micro- or 
milliseconds later.
       In return we save one or two volatile accesses on the hot per-record 
path.
    
     - Extend logging in SourceReaderBase to simplify the debugging in the case 
something goes wrong.
    
     - Add Task name to Split Fetcher Threads to make it easier to attribute 
the threads when
       analyzing stack traces.
---
 .../base/source/reader/SourceReaderBase.java       | 31 ++++++++++++++--------
 .../source/reader/fetcher/SplitFetcherManager.java |  3 ++-
 2 files changed, 22 insertions(+), 12 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
index 979afb2..0305e2d 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
@@ -116,12 +116,13 @@ public abstract class SourceReaderBase<E, T, SplitT 
extends SourceSplit, SplitSt
 
        @Override
        public InputStatus pollNext(ReaderOutput<T> output) throws Exception {
-               splitFetcherManager.checkErrors();
-
                // make sure we have a fetch we are working on, or move to the 
next
-               final RecordsWithSplitIds<E> recordsWithSplitId = 
getCurrentOrNewFetch(output);
+               RecordsWithSplitIds<E> recordsWithSplitId = this.currentFetch;
                if (recordsWithSplitId == null) {
-                       return trace(finishedOrAvailableLater());
+                       recordsWithSplitId = getNextFetch(output);
+                       if (recordsWithSplitId == null) {
+                               return trace(finishedOrAvailableLater());
+                       }
                }
 
                // we need to loop here, because we may have to go across splits
@@ -132,6 +133,12 @@ public abstract class SourceReaderBase<E, T, SplitT 
extends SourceSplit, SplitSt
                                // emit the record.
                                recordEmitter.emitRecord(record, 
currentSplitOutput, currentSplitContext.state);
                                LOG.trace("Emitted record: {}", record);
+
+                               // We always emit MORE_AVAILABLE here, even 
though we do not strictly know whether
+                               // more is available. If nothing more is 
available, the next invocation will find
+                               // this out and return the correct status.
+                               // That means we emit the occasional 'false 
positive' for availability, but this
+                               // saves us doing checks for every record. 
Ultimately, this is cheaper.
                                return trace(InputStatus.MORE_AVAILABLE);
                        }
                        else if (!moveToNextSplit(recordsWithSplitId, output)) {
@@ -147,13 +154,11 @@ public abstract class SourceReaderBase<E, T, SplitT 
extends SourceSplit, SplitSt
        }
 
        @Nullable
-       private RecordsWithSplitIds<E> getCurrentOrNewFetch(final 
ReaderOutput<T> output) {
-               RecordsWithSplitIds<E> recordsWithSplitId = this.currentFetch;
-               if (recordsWithSplitId != null) {
-                       return recordsWithSplitId;
-               }
+       private RecordsWithSplitIds<E> getNextFetch(final ReaderOutput<T> 
output) {
+               splitFetcherManager.checkErrors();
 
-               recordsWithSplitId = elementsQueue.poll();
+               LOG.trace("Getting next source data batch from queue");
+               final RecordsWithSplitIds<E> recordsWithSplitId = 
elementsQueue.poll();
                if (recordsWithSplitId == null || 
!moveToNextSplit(recordsWithSplitId, output)) {
                        // No element available, set to available later if 
needed.
                        return null;
@@ -170,6 +175,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends 
SourceSplit, SplitSt
 
                final Set<String> finishedSplits = fetch.finishedSplits();
                if (!finishedSplits.isEmpty()) {
+                       LOG.info("Finished reading split(s) {}", 
finishedSplits);
                        for (String finishedSplitId : finishedSplits) {
                                splitStates.remove(finishedSplitId);
                                output.releaseOutputForSplit(finishedSplitId);
@@ -183,6 +189,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends 
SourceSplit, SplitSt
        private boolean moveToNextSplit(RecordsWithSplitIds<E> 
recordsWithSplitIds, ReaderOutput<T> output) {
                final String nextSplitId = recordsWithSplitIds.nextSplit();
                if (nextSplitId == null) {
+                       LOG.trace("Current fetch is finished.");
                        finishCurrentFetch(recordsWithSplitIds, output);
                        return false;
                }
@@ -190,6 +197,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends 
SourceSplit, SplitSt
                currentSplitContext = splitStates.get(nextSplitId);
                checkState(currentSplitContext != null, "Have records for a 
split that was not registered");
                currentSplitOutput = 
currentSplitContext.getOrCreateSplitOutput(output);
+               LOG.trace("Emitting records from fetch for split {}", 
nextSplitId);
                return true;
        }
 
@@ -218,7 +226,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends 
SourceSplit, SplitSt
 
        @Override
        public void addSplits(List<SplitT> splits) {
-               LOG.trace("Adding splits {}", splits);
+               LOG.info("Adding split(s) to reader: {}", splits);
                // Initialize the state for each split.
                splits.forEach(s -> splitStates.put(s.splitId(), new 
SplitContext<>(s.splitId(), initializedState(s))));
                // Hand over the splits to the split fetcher to start fetch.
@@ -229,6 +237,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends 
SourceSplit, SplitSt
        public void handleSourceEvents(SourceEvent sourceEvent) {
                LOG.trace("Handling source event: {}", sourceEvent);
                if (sourceEvent instanceof NoMoreSplitsEvent) {
+                       LOG.info("Reader received NoMoreSplits event.");
                        noMoreSplitsAssignment = true;
                        futureNotifier.notifyComplete();
                }
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
index 822a9a9..26d92e3 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
@@ -107,7 +107,8 @@ public abstract class SplitFetcherManager<E, SplitT extends 
SourceSplit> {
 
                // Create the executor with a thread factory that fails the 
source reader if one of
                // the fetcher thread exits abnormally.
-               this.executors = Executors.newCachedThreadPool(r -> new 
Thread(r, "SourceFetcher"));
+               final String taskThreadName = Thread.currentThread().getName();
+               this.executors = Executors.newCachedThreadPool(r -> new 
Thread(r, "Source Data Fetcher for " + taskThreadName));
                this.closed = false;
        }
 

Reply via email to