This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit d6253f47f9e86771fdb8499df0d98aae3c695e59 Author: Stephan Ewen <se...@apache.org> AuthorDate: Sat Apr 10 00:41:12 2021 +0200 [FLINK-21996][refactor] Make NumberSequenceSource extensible to allow specifying the number of desired sequence splits. --- .../connector/source/lib/NumberSequenceSource.java | 45 +++++++++++++++------- 1 file changed, 32 insertions(+), 13 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/NumberSequenceSource.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/NumberSequenceSource.java index 8c4b995..73c2928 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/NumberSequenceSource.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/NumberSequenceSource.java @@ -40,6 +40,7 @@ import org.apache.flink.util.NumberSequenceIterator; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.List; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -81,6 +82,18 @@ public class NumberSequenceSource this.to = to; } + public long getFrom() { + return from; + } + + public long getTo() { + return to; + } + + // ------------------------------------------------------------------------ + // source methods + // ------------------------------------------------------------------------ + @Override public TypeInformation<Long> getProducedType() { return Types.LONG; @@ -100,19 +113,8 @@ public class NumberSequenceSource public SplitEnumerator<NumberSequenceSplit, Collection<NumberSequenceSplit>> createEnumerator( final SplitEnumeratorContext<NumberSequenceSplit> enumContext) { - final NumberSequenceIterator[] subSequences = - new NumberSequenceIterator(from, to).split(enumContext.currentParallelism()); - final ArrayList<NumberSequenceSplit> splits = new ArrayList<>(subSequences.length); - - int splitId = 1; - for (NumberSequenceIterator seq : subSequences) { - if (seq.hasNext()) { - splits.add( - new NumberSequenceSplit( - String.valueOf(splitId++), seq.getCurrent(), seq.getTo())); - } - } - + final List<NumberSequenceSplit> splits = + splitNumberRange(from, to, enumContext.currentParallelism()); return new IteratorSourceEnumerator<>(enumContext, splits); } @@ -134,6 +136,23 @@ public class NumberSequenceSource return new CheckpointSerializer(); } + protected List<NumberSequenceSplit> splitNumberRange(long from, long to, int numSplits) { + final NumberSequenceIterator[] subSequences = + new NumberSequenceIterator(from, to).split(numSplits); + final ArrayList<NumberSequenceSplit> splits = new ArrayList<>(subSequences.length); + + int splitId = 1; + for (NumberSequenceIterator seq : subSequences) { + if (seq.hasNext()) { + splits.add( + new NumberSequenceSplit( + String.valueOf(splitId++), seq.getCurrent(), seq.getTo())); + } + } + + return splits; + } + // ------------------------------------------------------------------------ // splits & checkpoint // ------------------------------------------------------------------------