This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d6253f47f9e86771fdb8499df0d98aae3c695e59
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sat Apr 10 00:41:12 2021 +0200

    [FLINK-21996][refactor] Make NumberSequenceSource extensible to allow 
specifying the number of desired sequence splits.
---
 .../connector/source/lib/NumberSequenceSource.java | 45 +++++++++++++++-------
 1 file changed, 32 insertions(+), 13 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/NumberSequenceSource.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/NumberSequenceSource.java
index 8c4b995..73c2928 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/NumberSequenceSource.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/NumberSequenceSource.java
@@ -40,6 +40,7 @@ import org.apache.flink.util.NumberSequenceIterator;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -81,6 +82,18 @@ public class NumberSequenceSource
         this.to = to;
     }
 
+    public long getFrom() {
+        return from;
+    }
+
+    public long getTo() {
+        return to;
+    }
+
+    // ------------------------------------------------------------------------
+    //  source methods
+    // ------------------------------------------------------------------------
+
     @Override
     public TypeInformation<Long> getProducedType() {
         return Types.LONG;
@@ -100,19 +113,8 @@ public class NumberSequenceSource
     public SplitEnumerator<NumberSequenceSplit, 
Collection<NumberSequenceSplit>> createEnumerator(
             final SplitEnumeratorContext<NumberSequenceSplit> enumContext) {
 
-        final NumberSequenceIterator[] subSequences =
-                new NumberSequenceIterator(from, 
to).split(enumContext.currentParallelism());
-        final ArrayList<NumberSequenceSplit> splits = new 
ArrayList<>(subSequences.length);
-
-        int splitId = 1;
-        for (NumberSequenceIterator seq : subSequences) {
-            if (seq.hasNext()) {
-                splits.add(
-                        new NumberSequenceSplit(
-                                String.valueOf(splitId++), seq.getCurrent(), 
seq.getTo()));
-            }
-        }
-
+        final List<NumberSequenceSplit> splits =
+                splitNumberRange(from, to, enumContext.currentParallelism());
         return new IteratorSourceEnumerator<>(enumContext, splits);
     }
 
@@ -134,6 +136,23 @@ public class NumberSequenceSource
         return new CheckpointSerializer();
     }
 
+    protected List<NumberSequenceSplit> splitNumberRange(long from, long to, 
int numSplits) {
+        final NumberSequenceIterator[] subSequences =
+                new NumberSequenceIterator(from, to).split(numSplits);
+        final ArrayList<NumberSequenceSplit> splits = new 
ArrayList<>(subSequences.length);
+
+        int splitId = 1;
+        for (NumberSequenceIterator seq : subSequences) {
+            if (seq.hasNext()) {
+                splits.add(
+                        new NumberSequenceSplit(
+                                String.valueOf(splitId++), seq.getCurrent(), 
seq.getTo()));
+            }
+        }
+
+        return splits;
+    }
+
     // ------------------------------------------------------------------------
     //  splits & checkpoint
     // ------------------------------------------------------------------------

Reply via email to