Repository: flink Updated Branches: refs/heads/master bce068c49 -> b201f8664
[FLINK-3993] [py] Add Environment.generateSequence() This closes #2055 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b201f866 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b201f866 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b201f866 Branch: refs/heads/master Commit: b201f8664c9814b6e0a0f5149338effff49d4c88 Parents: bce068c Author: omaralvarez <[email protected]> Authored: Tue May 31 13:23:38 2016 +0200 Committer: zentol <[email protected]> Committed: Wed Jun 1 16:09:43 2016 +0200 ---------------------------------------------------------------------- docs/apis/batch/python.md | 4 ++++ .../flink/python/api/PythonOperationInfo.java | 6 ++++-- .../apache/flink/python/api/PythonPlanBinder.java | 2 +- .../flink/python/api/flink/plan/Constants.py | 1 + .../flink/python/api/flink/plan/Environment.py | 18 ++++++++++++++++++ .../flink/python/api/flink/plan/OperationInfo.py | 2 ++ .../org/apache/flink/python/api/test_main.py | 4 ++++ 7 files changed, 34 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b201f866/docs/apis/batch/python.md ---------------------------------------------------------------------- diff --git a/docs/apis/batch/python.md b/docs/apis/batch/python.md index 8771cb5..0f55124 100644 --- a/docs/apis/batch/python.md +++ b/docs/apis/batch/python.md @@ -458,6 +458,7 @@ File-based: Collection-based: - `from_elements(*args)` - Creates a data set from a Seq. All elements +- `generate_sequence(from, to)` - Generates the sequence of numbers in the given interval, in parallel. **Examples** @@ -475,6 +476,9 @@ csvInput = env.read_csv("hdfs:///the/CSV/file", (INT, STRING, DOUBLE)) \# create a set from some given elements values = env.from_elements("Foo", "bar", "foobar", "fubar") + +\# generate a number sequence +numbers = env.generate_sequence(1, 10000000) {% endhighlight %} {% top %} http://git-wip-us.apache.org/repos/asf/flink/blob/b201f866/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java index 7f7a993..89aad22 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java @@ -39,7 +39,7 @@ public class PythonOperationInfo { public String path; public String fieldDelimiter; public String lineDelimiter; - public long from; + public long frm; public long to; public WriteMode writeMode; public boolean toError; @@ -83,6 +83,8 @@ public class PythonOperationInfo { ? WriteMode.OVERWRITE : WriteMode.NO_OVERWRITE; path = (String) streamer.getRecord(); + frm = (Long) streamer.getRecord(); + to = (Long) streamer.getRecord(); setID = (Integer) streamer.getRecord(true); toError = (Boolean) streamer.getRecord(); count = (Integer) streamer.getRecord(true); @@ -121,7 +123,7 @@ public class PythonOperationInfo { sb.append("Path: ").append(path).append("\n"); sb.append("FieldDelimiter: ").append(fieldDelimiter).append("\n"); sb.append("LineDelimiter: ").append(lineDelimiter).append("\n"); - sb.append("From: ").append(from).append("\n"); + sb.append("From: ").append(frm).append("\n"); sb.append("To: ").append(to).append("\n"); sb.append("WriteMode: ").append(writeMode).append("\n"); sb.append("toError: ").append(toError).append("\n"); http://git-wip-us.apache.org/repos/asf/flink/blob/b201f866/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java index f91237f..f43a4f9 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java @@ -410,7 +410,7 @@ public class PythonPlanBinder { } private void createSequenceSource(PythonOperationInfo info) throws IOException { - sets.put(info.setID, env.generateSequence(info.from, info.to).setParallelism(getParallelism(info)).name("SequenceSource") + sets.put(info.setID, env.generateSequence(info.frm, info.to).setParallelism(getParallelism(info)).name("SequenceSource") .map(new SerializerMap<Long>()).setParallelism(getParallelism(info)).name("SequenceSourcePostStep")); } http://git-wip-us.apache.org/repos/asf/flink/blob/b201f866/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Constants.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Constants.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Constants.py index be9fc6d..a1dffaf 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Constants.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Constants.py @@ -41,6 +41,7 @@ class _Identifier(object): SOURCE_CSV = "source_csv" SOURCE_TEXT = "source_text" SOURCE_VALUE = "source_value" + SOURCE_SEQ = "source_seq" SINK_CSV = "sink_csv" SINK_TEXT = "sink_text" SINK_PRINT = "sink_print" http://git-wip-us.apache.org/repos/asf/flink/blob/b201f866/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py index d0f28dc..3dbce45 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py @@ -118,6 +118,22 @@ class Environment(object): self._sources.append(child) return child_set + def generate_sequence(self, frm, to): + """ + Creates a new data set that contains the given sequence + + :param frm: The start number for the sequence. + :param to: The end number for the sequence. + :return: A DataSet representing the given sequence of numbers. + """ + child = OperationInfo() + child_set = DataSet(self, child) + child.identifier = _Identifier.SOURCE_SEQ + child.frm = frm + child.to = to + self._sources.append(child) + return child_set + def set_parallelism(self, parallelism): """ Sets the parallelism for operations executed through this environment. @@ -270,6 +286,8 @@ class Environment(object): collect(set.delimiter_field) collect(set.write_mode) collect(set.path) + collect(set.frm) + collect(set.to) collect(set.id) collect(set.to_err) collect(set.count) http://git-wip-us.apache.org/repos/asf/flink/blob/b201f866/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py index 5d83e33..fcda712 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py @@ -42,6 +42,8 @@ class OperationInfo(): self.delimiter_field = "," self.write_mode = WriteMode.NO_OVERWRITE self.path = "" + self.frm = 0 + self.to = 0 self.count = 0 self.values = [] self.projections = [] http://git-wip-us.apache.org/repos/asf/flink/blob/b201f866/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py index 3e718d3..9b0f144 100644 --- a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py +++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py @@ -78,6 +78,10 @@ if __name__ == "__main__": d6 = env.from_elements(1, 1, 12) + #Generate Sequence Source + d7 = env.generate_sequence(1, 5)\ + .map(Id()).map_partition(Verify([1,2,3,4,5], "Sequence")).output() + #CSV Source/Sink csv_data = env.read_csv("src/test/python/org/apache/flink/python/api/data_csv", (INT, INT, STRING))
