This is an automated email from the ASF dual-hosted git repository. jqin pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit a444d6ad7e13981911ea078f6c28db84564fc428 Author: Stephan Ewen <[email protected]> AuthorDate: Mon Nov 9 20:36:56 2020 +0100 [FLINK-20063][connector files] FileSourceReader request only a split if it doesn't have one already Backported the SourceReaderBase.getNumberOfCurrentlyAssignedSplits() to releasae-1.11. The changes to the FileSource are ignored during the backport. This closes #14004 --- .../flink/connector/base/source/reader/SourceReaderBase.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java index dfbacec..0a7a742 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java @@ -236,6 +236,17 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt splitFetcherManager.close(options.sourceReaderCloseTimeout); } + /** + * Gets the number of splits the reads has currently assigned. + * + * <p>These are the splits that have been added via {@link #addSplits(List)} and have not + * yet been finished by returning them from the {@link SplitReader#fetch()} as part of + * {@link RecordsWithSplitIds#finishedSplits()}. + */ + public int getNumberOfCurrentlyAssignedSplits() { + return splitStates.size(); + } + // -------------------- Abstract method to allow different implementations ------------------ /** * Handles the finished splits to clean the state if needed.
