Repository: flink
Updated Branches:
  refs/heads/master bce068c49 -> b201f8664


[FLINK-3993] [py] Add Environment.generateSequence()

This closes #2055


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b201f866
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b201f866
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b201f866

Branch: refs/heads/master
Commit: b201f8664c9814b6e0a0f5149338effff49d4c88
Parents: bce068c
Author: omaralvarez <[email protected]>
Authored: Tue May 31 13:23:38 2016 +0200
Committer: zentol <[email protected]>
Committed: Wed Jun 1 16:09:43 2016 +0200

----------------------------------------------------------------------
 docs/apis/batch/python.md                         |  4 ++++
 .../flink/python/api/PythonOperationInfo.java     |  6 ++++--
 .../apache/flink/python/api/PythonPlanBinder.java |  2 +-
 .../flink/python/api/flink/plan/Constants.py      |  1 +
 .../flink/python/api/flink/plan/Environment.py    | 18 ++++++++++++++++++
 .../flink/python/api/flink/plan/OperationInfo.py  |  2 ++
 .../org/apache/flink/python/api/test_main.py      |  4 ++++
 7 files changed, 34 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b201f866/docs/apis/batch/python.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/python.md b/docs/apis/batch/python.md
index 8771cb5..0f55124 100644
--- a/docs/apis/batch/python.md
+++ b/docs/apis/batch/python.md
@@ -458,6 +458,7 @@ File-based:
 Collection-based:
 
 - `from_elements(*args)` - Creates a data set from a Seq. All elements
+- `generate_sequence(from, to)` - Generates the sequence of numbers in the 
given interval, in parallel. 
 
 **Examples**
 
@@ -475,6 +476,9 @@ csvInput = env.read_csv("hdfs:///the/CSV/file", (INT, 
STRING, DOUBLE))
 
 \# create a set from some given elements
 values = env.from_elements("Foo", "bar", "foobar", "fubar")
+
+\# generate a number sequence
+numbers = env.generate_sequence(1, 10000000)
 {% endhighlight %}
 
 {% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/b201f866/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
index 7f7a993..89aad22 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
@@ -39,7 +39,7 @@ public class PythonOperationInfo {
        public String path;
        public String fieldDelimiter;
        public String lineDelimiter;
-       public long from;
+       public long frm;
        public long to;
        public WriteMode writeMode;
        public boolean toError;
@@ -83,6 +83,8 @@ public class PythonOperationInfo {
                        ? WriteMode.OVERWRITE
                        : WriteMode.NO_OVERWRITE;
                path = (String) streamer.getRecord();
+               frm = (Long) streamer.getRecord();
+               to = (Long) streamer.getRecord();
                setID = (Integer) streamer.getRecord(true);
                toError = (Boolean) streamer.getRecord();
                count = (Integer) streamer.getRecord(true);
@@ -121,7 +123,7 @@ public class PythonOperationInfo {
                sb.append("Path: ").append(path).append("\n");
                sb.append("FieldDelimiter: 
").append(fieldDelimiter).append("\n");
                sb.append("LineDelimiter: ").append(lineDelimiter).append("\n");
-               sb.append("From: ").append(from).append("\n");
+               sb.append("From: ").append(frm).append("\n");
                sb.append("To: ").append(to).append("\n");
                sb.append("WriteMode: ").append(writeMode).append("\n");
                sb.append("toError: ").append(toError).append("\n");

http://git-wip-us.apache.org/repos/asf/flink/blob/b201f866/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
index f91237f..f43a4f9 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -410,7 +410,7 @@ public class PythonPlanBinder {
        }
 
        private void createSequenceSource(PythonOperationInfo info) throws 
IOException {
-               sets.put(info.setID, env.generateSequence(info.from, 
info.to).setParallelism(getParallelism(info)).name("SequenceSource")
+               sets.put(info.setID, env.generateSequence(info.frm, 
info.to).setParallelism(getParallelism(info)).name("SequenceSource")
                                .map(new 
SerializerMap<Long>()).setParallelism(getParallelism(info)).name("SequenceSourcePostStep"));
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b201f866/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Constants.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Constants.py
 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Constants.py
index be9fc6d..a1dffaf 100644
--- 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Constants.py
+++ 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Constants.py
@@ -41,6 +41,7 @@ class _Identifier(object):
     SOURCE_CSV = "source_csv"
     SOURCE_TEXT = "source_text"
     SOURCE_VALUE = "source_value"
+    SOURCE_SEQ = "source_seq"
     SINK_CSV = "sink_csv"
     SINK_TEXT = "sink_text"
     SINK_PRINT = "sink_print"

http://git-wip-us.apache.org/repos/asf/flink/blob/b201f866/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
index d0f28dc..3dbce45 100644
--- 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
+++ 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
@@ -118,6 +118,22 @@ class Environment(object):
         self._sources.append(child)
         return child_set
 
+    def generate_sequence(self, frm, to):
+        """
+        Creates a new data set that contains the given sequence
+
+        :param frm: The start number for the sequence.
+        :param to: The end number for the sequence.
+        :return: A DataSet representing the given sequence of numbers.
+        """
+        child = OperationInfo()
+        child_set = DataSet(self, child)
+        child.identifier = _Identifier.SOURCE_SEQ
+        child.frm = frm
+        child.to = to
+        self._sources.append(child)
+        return child_set
+
     def set_parallelism(self, parallelism):
         """
         Sets the parallelism for operations executed through this environment.
@@ -270,6 +286,8 @@ class Environment(object):
         collect(set.delimiter_field)
         collect(set.write_mode)
         collect(set.path)
+        collect(set.frm)
+        collect(set.to)
         collect(set.id)
         collect(set.to_err)
         collect(set.count)

http://git-wip-us.apache.org/repos/asf/flink/blob/b201f866/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py
 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py
index 5d83e33..fcda712 100644
--- 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py
+++ 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py
@@ -42,6 +42,8 @@ class OperationInfo():
             self.delimiter_field = ","
             self.write_mode = WriteMode.NO_OVERWRITE
             self.path = ""
+            self.frm = 0
+            self.to = 0
             self.count = 0
             self.values = []
             self.projections = []

http://git-wip-us.apache.org/repos/asf/flink/blob/b201f866/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
 
b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
index 3e718d3..9b0f144 100644
--- 
a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
+++ 
b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
@@ -78,6 +78,10 @@ if __name__ == "__main__":
 
     d6 = env.from_elements(1, 1, 12)
 
+    #Generate Sequence Source
+    d7 = env.generate_sequence(1, 5)\
+         .map(Id()).map_partition(Verify([1,2,3,4,5], "Sequence")).output()
+
     #CSV Source/Sink
     csv_data = 
env.read_csv("src/test/python/org/apache/flink/python/api/data_csv", (INT, INT, 
STRING))
 

Reply via email to