Repository: flink
Updated Branches:
  refs/heads/master 440137cc3 -> 40422d505


[FLINK-3275] [py] Support for DataSet.setParallelism()

-parallelism is stored Value object within the OperationInfo, so it can be 
passed as a reference to multiple operations (in cases where a set is 
internally executed as multiple operations)
-setParallelism is called for every DataSet with either a user-set value or 
env.getParallelism
-added a DataSink set, providing access to name() and setParallelism() for sinks


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/40422d50
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/40422d50
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/40422d50

Branch: refs/heads/master
Commit: 40422d5057e5c1d7b75aec48bacbd7518cd7c9e1
Parents: 440137c
Author: zentol <s.mo...@web.de>
Authored: Thu Jan 28 10:00:25 2016 +0100
Committer: zentol <s.mo...@web.de>
Committed: Thu Jan 28 11:50:51 2016 +0100

----------------------------------------------------------------------
 .../flink/python/api/PythonOperationInfo.java   |   2 +
 .../flink/python/api/PythonPlanBinder.java      | 117 ++++++++++---------
 .../flink/python/api/flink/plan/DataSet.py      |  40 ++++++-
 .../flink/python/api/flink/plan/Environment.py  |   1 +
 .../python/api/flink/plan/OperationInfo.py      |   6 +
 5 files changed, 109 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/40422d50/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
index 1e3005d..7f7a993 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
@@ -45,6 +45,7 @@ public class PythonOperationInfo {
        public boolean toError;
        public String name;
        public boolean usesUDF;
+       public int parallelism;
 
        public PythonOperationInfo(PythonPlanStreamer streamer) throws 
IOException {
                identifier = (String) streamer.getRecord();
@@ -90,6 +91,7 @@ public class PythonOperationInfo {
                for (int x = 0; x < valueCount; x++) {
                        values[x] = streamer.getRecord();
                }
+               parallelism = (Integer) streamer.getRecord(true);
 
                /*
                aggregates = new AggregationEntry[count];

http://git-wip-us.apache.org/repos/asf/flink/blob/40422d50/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
index 3877ef1..1534ebf 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -388,42 +388,53 @@ public class PythonPlanBinder {
                }
        }
 
+       private int getParallelism(PythonOperationInfo info) {
+               return info.parallelism == -1 ? env.getParallelism() : 
info.parallelism;
+       }
+
        private void createCsvSource(PythonOperationInfo info) throws 
IOException {
                if (!(info.types instanceof TupleTypeInfo)) {
                        throw new RuntimeException("The output type of a csv 
source has to be a tuple. The derived type is " + info);
                }
-
-               sets.put(info.setID, env.createInput(new 
TupleCsvInputFormat(new Path(info.path),
-                               info.lineDelimiter, info.fieldDelimiter, 
(TupleTypeInfo) info.types), info.types)
-                               .name("CsvSource").map(new 
SerializerMap()).name("CsvSourcePostStep"));
+               Path path = new Path(info.path);
+               String lineD = info.lineDelimiter;
+               String fieldD = info.fieldDelimiter;
+               TupleTypeInfo<?> types = (TupleTypeInfo) info.types;
+               sets.put(info.setID, env.createInput(new 
TupleCsvInputFormat(path, lineD, fieldD, types), 
info.types).setParallelism(getParallelism(info)).name("CsvSource")
+                               .map(new 
SerializerMap()).setParallelism(getParallelism(info)).name("CsvSourcePostStep"));
        }
 
        private void createTextSource(PythonOperationInfo info) throws 
IOException {
-               sets.put(info.setID, 
env.readTextFile(info.path).name("TextSource").map(new 
SerializerMap()).name("TextSourcePostStep"));
+               sets.put(info.setID, 
env.readTextFile(info.path).setParallelism(getParallelism(info)).name("TextSource")
+                               .map(new 
SerializerMap()).setParallelism(getParallelism(info)).name("TextSourcePostStep"));
        }
 
        private void createValueSource(PythonOperationInfo info) throws 
IOException {
-               sets.put(info.setID, 
env.fromElements(info.values).name("ValueSource").map(new 
SerializerMap()).name("ValueSourcePostStep"));
+               sets.put(info.setID, 
env.fromElements(info.values).setParallelism(getParallelism(info)).name("ValueSource")
+                               .map(new 
SerializerMap()).setParallelism(getParallelism(info)).name("ValueSourcePostStep"));
        }
 
        private void createSequenceSource(PythonOperationInfo info) throws 
IOException {
-               sets.put(info.setID, env.generateSequence(info.from, 
info.to).name("SequenceSource").map(new 
SerializerMap()).name("SequenceSourcePostStep"));
+               sets.put(info.setID, env.generateSequence(info.from, 
info.to).setParallelism(getParallelism(info)).name("SequenceSource")
+                               .map(new 
SerializerMap()).setParallelism(getParallelism(info)).name("SequenceSourcePostStep"));
        }
 
        private void createCsvSink(PythonOperationInfo info) throws IOException 
{
                DataSet parent = (DataSet) sets.get(info.parentID);
-               parent.map(new 
StringTupleDeserializerMap()).name("CsvSinkPreStep")
-                               .writeAsCsv(info.path, info.lineDelimiter, 
info.fieldDelimiter, info.writeMode).name("CsvSink");
+               parent.map(new 
StringTupleDeserializerMap()).setParallelism(getParallelism(info)).name("CsvSinkPreStep")
+                               .writeAsCsv(info.path, info.lineDelimiter, 
info.fieldDelimiter, 
info.writeMode).setParallelism(getParallelism(info)).name("CsvSink");
        }
 
        private void createTextSink(PythonOperationInfo info) throws 
IOException {
                DataSet parent = (DataSet) sets.get(info.parentID);
-               parent.map(new StringDeserializerMap()).writeAsText(info.path, 
info.writeMode).name("TextSink");
+               parent.map(new 
StringDeserializerMap()).setParallelism(getParallelism(info))
+                       .writeAsText(info.path, 
info.writeMode).setParallelism(getParallelism(info)).name("TextSink");
        }
 
        private void createPrintSink(PythonOperationInfo info) throws 
IOException {
                DataSet parent = (DataSet) sets.get(info.parentID);
-               parent.map(new 
StringDeserializerMap()).name("PrintSinkPreStep").output(new 
PrintingOutputFormat(info.toError));
+               parent.map(new 
StringDeserializerMap()).setParallelism(getParallelism(info)).name("PrintSinkPreStep")
+                       .output(new 
PrintingOutputFormat(info.toError)).setParallelism(getParallelism(info));
        }
 
        private void createBroadcastVariable(PythonOperationInfo info) throws 
IOException {
@@ -452,17 +463,18 @@ public class PythonPlanBinder {
                        ao = ao.and(info.aggregates[x].agg, 
info.aggregates[x].field);
                }
 
-               sets.put(info.setID, ao.name("Aggregation"));
+               sets.put(info.setID, 
ao.setParallelism(getParallelism(info)).name("Aggregation"));
        }
 
        private void createDistinctOperation(PythonOperationInfo info) throws 
IOException {
                DataSet op = (DataSet) sets.get(info.parentID);
-               sets.put(info.setID, 
op.distinct(info.keys).name("Distinct").map(new 
KeyDiscarder()).name("DistinctPostStep"));
+               sets.put(info.setID, 
op.distinct(info.keys).setParallelism(getParallelism(info)).name("Distinct")
+                               .map(new 
KeyDiscarder()).setParallelism(getParallelism(info)).name("DistinctPostStep"));
        }
 
        private void createFirstOperation(PythonOperationInfo info) throws 
IOException {
                DataSet op = (DataSet) sets.get(info.parentID);
-               sets.put(info.setID, op.first(info.count).name("First"));
+               sets.put(info.setID, 
op.first(info.count).setParallelism(getParallelism(info)).name("First"));
        }
 
        private void createGroupOperation(PythonOperationInfo info) throws 
IOException {
@@ -472,12 +484,13 @@ public class PythonPlanBinder {
 
        private void createHashPartitionOperation(PythonOperationInfo info) 
throws IOException {
                DataSet op1 = (DataSet) sets.get(info.parentID);
-               sets.put(info.setID, op1.partitionByHash(info.keys).map(new 
KeyDiscarder()).name("HashPartitionPostStep"));
+               sets.put(info.setID, 
op1.partitionByHash(info.keys).setParallelism(getParallelism(info))
+                               .map(new 
KeyDiscarder()).setParallelism(getParallelism(info)).name("HashPartitionPostStep"));
        }
 
        private void createRebalanceOperation(PythonOperationInfo info) throws 
IOException {
                DataSet op = (DataSet) sets.get(info.parentID);
-               sets.put(info.setID, op.rebalance().name("Rebalance"));
+               sets.put(info.setID, 
op.rebalance().setParallelism(getParallelism(info)).name("Rebalance"));
        }
 
        private void createSortOperation(PythonOperationInfo info) throws 
IOException {
@@ -494,19 +507,16 @@ public class PythonPlanBinder {
        private void createUnionOperation(PythonOperationInfo info) throws 
IOException {
                DataSet op1 = (DataSet) sets.get(info.parentID);
                DataSet op2 = (DataSet) sets.get(info.otherID);
-               sets.put(info.setID, op1.union(op2).name("Union"));
+               sets.put(info.setID, 
op1.union(op2).setParallelism(getParallelism(info)).name("Union"));
        }
 
        private void createCoGroupOperation(PythonOperationInfo info) {
                DataSet op1 = (DataSet) sets.get(info.parentID);
                DataSet op2 = (DataSet) sets.get(info.otherID);
-               sets.put(info.setID, new CoGroupRawOperator(
-                               op1,
-                               op2,
-                               new Keys.ExpressionKeys(info.keys1, 
op1.getType()),
-                               new Keys.ExpressionKeys(info.keys2, 
op2.getType()),
-                               new PythonCoGroup(info.setID, info.types),
-                               info.types, info.name));
+               Keys.ExpressionKeys<?> key1 = new 
Keys.ExpressionKeys(info.keys1, op1.getType());
+               Keys.ExpressionKeys<?> key2 = new 
Keys.ExpressionKeys(info.keys2, op2.getType());
+               PythonCoGroup pcg = new PythonCoGroup(info.setID, info.types);
+               sets.put(info.setID, new CoGroupRawOperator(op1, op2, key1, 
key2, pcg, info.types, info.name).setParallelism(getParallelism(info)));
        }
 
        private void createCrossOperation(DatasizeHint mode, 
PythonOperationInfo info) {
@@ -527,8 +537,10 @@ public class PythonPlanBinder {
                        default:
                                throw new IllegalArgumentException("Invalid 
Cross mode specified: " + mode);
                }
+
+               defaultResult.setParallelism(getParallelism(info));
                if (info.usesUDF) {
-                       sets.put(info.setID, defaultResult.mapPartition(new 
PythonMapPartition(info.setID, info.types)).name(info.name));
+                       sets.put(info.setID, defaultResult.mapPartition(new 
PythonMapPartition(info.setID, 
info.types)).setParallelism(getParallelism(info)).name(info.name));
                } else {
                        sets.put(info.setID, 
defaultResult.name("DefaultCross"));
                }
@@ -536,12 +548,12 @@ public class PythonPlanBinder {
 
        private void createFilterOperation(PythonOperationInfo info) {
                DataSet op1 = (DataSet) sets.get(info.parentID);
-               sets.put(info.setID, op1.mapPartition(new 
PythonMapPartition(info.setID, info.types)).name(info.name));
+               sets.put(info.setID, op1.mapPartition(new 
PythonMapPartition(info.setID, 
info.types)).setParallelism(getParallelism(info)).name(info.name));
        }
 
        private void createFlatMapOperation(PythonOperationInfo info) {
                DataSet op1 = (DataSet) sets.get(info.parentID);
-               sets.put(info.setID, op1.mapPartition(new 
PythonMapPartition(info.setID, info.types)).name(info.name));
+               sets.put(info.setID, op1.mapPartition(new 
PythonMapPartition(info.setID, 
info.types)).setParallelism(getParallelism(info)).name(info.name));
        }
 
        private void createGroupReduceOperation(PythonOperationInfo info) {
@@ -560,24 +572,18 @@ public class PythonPlanBinder {
        }
 
        private DataSet applyGroupReduceOperation(DataSet op1, 
PythonOperationInfo info) {
-               return op1.reduceGroup(new IdentityGroupReduce())
-                               
.setCombinable(false).name("PythonGroupReducePreStep")
-                               .mapPartition(new 
PythonMapPartition(info.setID, info.types))
-                               .name(info.name);
+               return op1.reduceGroup(new 
IdentityGroupReduce()).setCombinable(false).name("PythonGroupReducePreStep").setParallelism(getParallelism(info))
+                               .mapPartition(new 
PythonMapPartition(info.setID, 
info.types)).setParallelism(getParallelism(info)).name(info.name);
        }
 
        private DataSet applyGroupReduceOperation(UnsortedGrouping op1, 
PythonOperationInfo info) {
-               return op1.reduceGroup(new IdentityGroupReduce())
-                               
.setCombinable(false).name("PythonGroupReducePreStep")
-                               .mapPartition(new 
PythonMapPartition(info.setID, info.types))
-                               .name(info.name);
+               return op1.reduceGroup(new 
IdentityGroupReduce()).setCombinable(false).setParallelism(getParallelism(info)).name("PythonGroupReducePreStep")
+                               .mapPartition(new 
PythonMapPartition(info.setID, 
info.types)).setParallelism(getParallelism(info)).name(info.name);
        }
 
        private DataSet applyGroupReduceOperation(SortedGrouping op1, 
PythonOperationInfo info) {
-               return op1.reduceGroup(new IdentityGroupReduce())
-                               
.setCombinable(false).name("PythonGroupReducePreStep")
-                               .mapPartition(new 
PythonMapPartition(info.setID, info.types))
-                               .name(info.name);
+               return op1.reduceGroup(new 
IdentityGroupReduce()).setCombinable(false).setParallelism(getParallelism(info)).name("PythonGroupReducePreStep")
+                               .mapPartition(new 
PythonMapPartition(info.setID, 
info.types)).setParallelism(getParallelism(info)).name(info.name);
        }
 
        private void createJoinOperation(DatasizeHint mode, PythonOperationInfo 
info) {
@@ -585,21 +591,24 @@ public class PythonPlanBinder {
                DataSet op2 = (DataSet) sets.get(info.otherID);
 
                if (info.usesUDF) {
-                       sets.put(info.setID, createDefaultJoin(op1, op2, 
info.keys1, info.keys2, mode)
-                                       .mapPartition(new 
PythonMapPartition(info.setID, info.types)).name(info.name));
+                       sets.put(info.setID, createDefaultJoin(op1, op2, 
info.keys1, info.keys2, mode, getParallelism(info))
+                                       .mapPartition(new 
PythonMapPartition(info.setID, 
info.types)).setParallelism(getParallelism(info)).name(info.name));
                } else {
-                       sets.put(info.setID, createDefaultJoin(op1, op2, 
info.keys1, info.keys2, mode));
+                       sets.put(info.setID, createDefaultJoin(op1, op2, 
info.keys1, info.keys2, mode, getParallelism(info)));
                }
        }
 
-       private DataSet createDefaultJoin(DataSet op1, DataSet op2, String[] 
firstKeys, String[] secondKeys, DatasizeHint mode) {
+       private DataSet createDefaultJoin(DataSet op1, DataSet op2, String[] 
firstKeys, String[] secondKeys, DatasizeHint mode, int parallelism) {
                switch (mode) {
                        case NONE:
-                               return 
op1.join(op2).where(firstKeys).equalTo(secondKeys).map(new 
NestedKeyDiscarder()).name("DefaultJoinPostStep");
+                               return 
op1.join(op2).where(firstKeys).equalTo(secondKeys).setParallelism(parallelism)
+                                       .map(new 
NestedKeyDiscarder()).setParallelism(parallelism).name("DefaultJoinPostStep");
                        case HUGE:
-                               return 
op1.joinWithHuge(op2).where(firstKeys).equalTo(secondKeys).map(new 
NestedKeyDiscarder()).name("DefaultJoinPostStep");
+                               return 
op1.joinWithHuge(op2).where(firstKeys).equalTo(secondKeys).setParallelism(parallelism)
+                                       .map(new 
NestedKeyDiscarder()).setParallelism(parallelism).name("DefaultJoinPostStep");
                        case TINY:
-                               return 
op1.joinWithTiny(op2).where(firstKeys).equalTo(secondKeys).map(new 
NestedKeyDiscarder()).name("DefaultJoinPostStep");
+                               return 
op1.joinWithTiny(op2).where(firstKeys).equalTo(secondKeys).setParallelism(parallelism)
+                                       .map(new 
NestedKeyDiscarder()).setParallelism(parallelism).name("DefaultJoinPostStep");
                        default:
                                throw new IllegalArgumentException("Invalid 
join mode specified.");
                }
@@ -607,12 +616,12 @@ public class PythonPlanBinder {
 
        private void createMapOperation(PythonOperationInfo info) {
                DataSet op1 = (DataSet) sets.get(info.parentID);
-               sets.put(info.setID, op1.mapPartition(new 
PythonMapPartition(info.setID, info.types)).name(info.name));
+               sets.put(info.setID, op1.mapPartition(new 
PythonMapPartition(info.setID, 
info.types)).setParallelism(getParallelism(info)).name(info.name));
        }
 
        private void createMapPartitionOperation(PythonOperationInfo info) {
                DataSet op1 = (DataSet) sets.get(info.parentID);
-               sets.put(info.setID, op1.mapPartition(new 
PythonMapPartition(info.setID, info.types)).name(info.name));
+               sets.put(info.setID, op1.mapPartition(new 
PythonMapPartition(info.setID, 
info.types)).setParallelism(getParallelism(info)).name(info.name));
        }
 
        private void createReduceOperation(PythonOperationInfo info) {
@@ -627,16 +636,12 @@ public class PythonPlanBinder {
        }
 
        private DataSet applyReduceOperation(DataSet op1, PythonOperationInfo 
info) {
-               return op1.reduceGroup(new IdentityGroupReduce())
-                               
.setCombinable(false).name("PythonReducePreStep")
-                               .mapPartition(new 
PythonMapPartition(info.setID, info.types))
-                               .name(info.name);
+               return op1.reduceGroup(new 
IdentityGroupReduce()).setCombinable(false).setParallelism(getParallelism(info)).name("PythonReducePreStep")
+                               .mapPartition(new 
PythonMapPartition(info.setID, 
info.types)).setParallelism(getParallelism(info)).name(info.name);
        }
 
        private DataSet applyReduceOperation(UnsortedGrouping op1, 
PythonOperationInfo info) {
-               return op1.reduceGroup(new IdentityGroupReduce())
-                               
.setCombinable(false).name("PythonReducePreStep")
-                               .mapPartition(new 
PythonMapPartition(info.setID, info.types))
-                               .name(info.name);
+               return op1.reduceGroup(new 
IdentityGroupReduce()).setCombinable(false).setParallelism(getParallelism(info)).name("PythonReducePreStep")
+                               .mapPartition(new 
PythonMapPartition(info.setID, 
info.types)).setParallelism(getParallelism(info)).name(info.name);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/40422d50/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
index 5bb34e5..e024a38 100644
--- 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
+++ 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
@@ -55,6 +55,22 @@ class CsvStringify(MapFunction):
             return str(value)
 
 
+class DataSink(object):
+    def __init__(self, env, info):
+        self._env = env
+        self._info = info
+        info.id = env._counter
+        env._counter += 1
+
+    def name(self, name):
+        self._info.name = name
+        return self
+
+    def set_parallelism(self, parallelism):
+        self._info.parallelism.value = parallelism
+        return self
+
+
 class DataSet(object):
     def __init__(self, env, info):
         self._env = env
@@ -66,15 +82,18 @@ class DataSet(object):
         """
         Writes a DataSet to the standard output stream (stdout).
         """
-        self.map(Stringify())._output(to_error)
+        return self.map(Stringify())._output(to_error)
 
     def _output(self, to_error):
         child = OperationInfo()
+        child_set = DataSink(self._env, child)
         child.identifier = _Identifier.SINK_PRINT
         child.parent = self._info
         child.to_err = to_error
+        self._info.parallelism = child.parallelism
         self._info.sinks.append(child)
         self._env._sinks.append(child)
+        return child_set
 
     def write_text(self, path, write_mode=WriteMode.NO_OVERWRITE):
         """
@@ -87,12 +106,15 @@ class DataSet(object):
 
     def _write_text(self, path, write_mode):
         child = OperationInfo()
+        child_set = DataSink(self._env, child)
         child.identifier = _Identifier.SINK_TEXT
         child.parent = self._info
         child.path = path
         child.write_mode = write_mode
+        self._info.parallelism = child.parallelism
         self._info.sinks.append(child)
         self._env._sinks.append(child)
+        return child_set
 
     def write_csv(self, path, line_delimiter="\n", field_delimiter=',', 
write_mode=WriteMode.NO_OVERWRITE):
         """
@@ -106,14 +128,17 @@ class DataSet(object):
 
     def _write_csv(self, path, line_delimiter, field_delimiter, write_mode):
         child = OperationInfo()
+        child_set = DataSink(self._env, child)
         child.identifier = _Identifier.SINK_CSV
         child.path = path
         child.parent = self._info
         child.delimiter_field = field_delimiter
         child.delimiter_line = line_delimiter
         child.write_mode = write_mode
+        self._info.parallelism = child.parallelism
         self._info.sinks.append(child)
         self._env._sinks.append(child)
+        return child_set
 
     def reduce_group(self, operator, combinable=False):
         """
@@ -303,6 +328,7 @@ class DataSet(object):
         child.identifier = _Identifier.DISTINCT
         child.parent = self._info
         child.keys = fields
+        self._info.parallelism = child.parallelism
         self._info.children.append(child)
         self._env._sets.append(child)
         return child_set
@@ -498,6 +524,7 @@ class DataSet(object):
         child.identifier = _Identifier.PARTITION_HASH
         child.parent = self._info
         child.keys = fields
+        self._info.parallelism = child.parallelism
         self._info.children.append(child)
         self._env._sets.append(child)
         return child_set
@@ -541,6 +568,10 @@ class DataSet(object):
         self._info.name = name
         return self
 
+    def set_parallelism(self, parallelism):
+        self._info.parallelism.value = parallelism
+        return self
+
 
 class OperatorSet(DataSet):
     def __init__(self, env, info):
@@ -611,6 +642,7 @@ class Grouping(object):
         child.types = _createArrayTypeInfo()
         child.name = "PythonGroupReduce"
         child.key1 = self._child_chain[0].keys
+        self._info.parallelism = child.parallelism
         self._info.children.append(child)
         self._env._sets.append(child)
 
@@ -666,6 +698,7 @@ class UnsortedGrouping(Grouping):
         child.name = "PythonReduce"
         child.types = _createArrayTypeInfo()
         child.key1 = self._child_chain[0].keys
+        self._info.parallelism = child.parallelism
         self._info.children.append(child)
         self._env._sets.append(child)
 
@@ -830,6 +863,8 @@ class CoGroupOperatorUsing(object):
         self._info.key2 = tuple([x for x in range(len(self._info.key2))])
         operator._keys1 = self._info.key1
         operator._keys2 = self._info.key2
+        self._info.parent.parallelism = self._info.parallelism
+        self._info.other.parallelism = self._info.parallelism
         self._info.operator = operator
         self._info.types = _createArrayTypeInfo()
         self._info.name = "PythonCoGroup"
@@ -864,6 +899,7 @@ class JoinOperatorWhere(object):
         new_parent_set = self._info.parent_set.map(lambda x: (f(x), x))
         new_parent_set._info.types = _createKeyValueTypeInfo(len(fields))
         self._info.parent = new_parent_set._info
+        self._info.parent.parallelism = self._info.parallelism
         self._info.parent.children.append(self._info)
         self._info.key1 = tuple([x for x in range(len(fields))])
         return JoinOperatorTo(self._env, self._info)
@@ -895,6 +931,7 @@ class JoinOperatorTo(object):
         new_other_set = self._info.other_set.map(lambda x: (f(x), x))
         new_other_set._info.types = _createKeyValueTypeInfo(len(fields))
         self._info.other = new_other_set._info
+        self._info.other.parallelism = self._info.parallelism
         self._info.other.children.append(self._info)
         self._info.key2 = tuple([x for x in range(len(fields))])
         self._env._sets.append(self._info)
@@ -977,6 +1014,7 @@ class Projectable:
         child.parent = info
         child.types = _createArrayTypeInfo()
         child.name = "Projector"
+        child.parallelism = info.parallelism
         info.children.append(child)
         env._sets.append(child)
         return child_set

http://git-wip-us.apache.org/repos/asf/flink/blob/40422d50/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
index b410ead..a9f7f14 100644
--- 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
+++ 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
@@ -281,6 +281,7 @@ class Environment(object):
         collect(len(set.values))
         for value in set.values:
             collect(value)
+        collect(set.parallelism.value)
 
     def _receive_result(self):
         jer = JobExecutionResult()

http://git-wip-us.apache.org/repos/asf/flink/blob/40422d50/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py
 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py
index bd7d2b5..5d83e33 100644
--- 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py
+++ 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py
@@ -18,6 +18,11 @@
 from flink.plan.Constants import WriteMode
 
 
+class Value():
+    def __init__(self, value):
+        self.value = value
+
+
 class OperationInfo():
     def __init__(self, info=None):
         if info is None:
@@ -42,6 +47,7 @@ class OperationInfo():
             self.projections = []
             self.id = -1
             self.to_err = False
+            self.parallelism = Value(-1)
             #internally used
             self.parent_set = None
             self.other_set = None

Reply via email to