Repository: flink Updated Branches: refs/heads/master 440137cc3 -> 40422d505
[FLINK-3275] [py] Support for DataSet.setParallelism() -parallelism is stored Value object within the OperationInfo, so it can be passed as a reference to multiple operations (in cases where a set is internally executed as multiple operations) -setParallelism is called for every DataSet with either a user-set value or env.getParallelism -added a DataSink set, providing access to name() and setParallelism() for sinks Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/40422d50 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/40422d50 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/40422d50 Branch: refs/heads/master Commit: 40422d5057e5c1d7b75aec48bacbd7518cd7c9e1 Parents: 440137c Author: zentol <s.mo...@web.de> Authored: Thu Jan 28 10:00:25 2016 +0100 Committer: zentol <s.mo...@web.de> Committed: Thu Jan 28 11:50:51 2016 +0100 ---------------------------------------------------------------------- .../flink/python/api/PythonOperationInfo.java | 2 + .../flink/python/api/PythonPlanBinder.java | 117 ++++++++++--------- .../flink/python/api/flink/plan/DataSet.py | 40 ++++++- .../flink/python/api/flink/plan/Environment.py | 1 + .../python/api/flink/plan/OperationInfo.py | 6 + 5 files changed, 109 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/40422d50/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java index 1e3005d..7f7a993 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java @@ -45,6 +45,7 @@ public class PythonOperationInfo { public boolean toError; public String name; public boolean usesUDF; + public int parallelism; public PythonOperationInfo(PythonPlanStreamer streamer) throws IOException { identifier = (String) streamer.getRecord(); @@ -90,6 +91,7 @@ public class PythonOperationInfo { for (int x = 0; x < valueCount; x++) { values[x] = streamer.getRecord(); } + parallelism = (Integer) streamer.getRecord(true); /* aggregates = new AggregationEntry[count]; http://git-wip-us.apache.org/repos/asf/flink/blob/40422d50/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java index 3877ef1..1534ebf 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java @@ -388,42 +388,53 @@ public class PythonPlanBinder { } } + private int getParallelism(PythonOperationInfo info) { + return info.parallelism == -1 ? env.getParallelism() : info.parallelism; + } + private void createCsvSource(PythonOperationInfo info) throws IOException { if (!(info.types instanceof TupleTypeInfo)) { throw new RuntimeException("The output type of a csv source has to be a tuple. The derived type is " + info); } - - sets.put(info.setID, env.createInput(new TupleCsvInputFormat(new Path(info.path), - info.lineDelimiter, info.fieldDelimiter, (TupleTypeInfo) info.types), info.types) - .name("CsvSource").map(new SerializerMap()).name("CsvSourcePostStep")); + Path path = new Path(info.path); + String lineD = info.lineDelimiter; + String fieldD = info.fieldDelimiter; + TupleTypeInfo<?> types = (TupleTypeInfo) info.types; + sets.put(info.setID, env.createInput(new TupleCsvInputFormat(path, lineD, fieldD, types), info.types).setParallelism(getParallelism(info)).name("CsvSource") + .map(new SerializerMap()).setParallelism(getParallelism(info)).name("CsvSourcePostStep")); } private void createTextSource(PythonOperationInfo info) throws IOException { - sets.put(info.setID, env.readTextFile(info.path).name("TextSource").map(new SerializerMap()).name("TextSourcePostStep")); + sets.put(info.setID, env.readTextFile(info.path).setParallelism(getParallelism(info)).name("TextSource") + .map(new SerializerMap()).setParallelism(getParallelism(info)).name("TextSourcePostStep")); } private void createValueSource(PythonOperationInfo info) throws IOException { - sets.put(info.setID, env.fromElements(info.values).name("ValueSource").map(new SerializerMap()).name("ValueSourcePostStep")); + sets.put(info.setID, env.fromElements(info.values).setParallelism(getParallelism(info)).name("ValueSource") + .map(new SerializerMap()).setParallelism(getParallelism(info)).name("ValueSourcePostStep")); } private void createSequenceSource(PythonOperationInfo info) throws IOException { - sets.put(info.setID, env.generateSequence(info.from, info.to).name("SequenceSource").map(new SerializerMap()).name("SequenceSourcePostStep")); + sets.put(info.setID, env.generateSequence(info.from, info.to).setParallelism(getParallelism(info)).name("SequenceSource") + .map(new SerializerMap()).setParallelism(getParallelism(info)).name("SequenceSourcePostStep")); } private void createCsvSink(PythonOperationInfo info) throws IOException { DataSet parent = (DataSet) sets.get(info.parentID); - parent.map(new StringTupleDeserializerMap()).name("CsvSinkPreStep") - .writeAsCsv(info.path, info.lineDelimiter, info.fieldDelimiter, info.writeMode).name("CsvSink"); + parent.map(new StringTupleDeserializerMap()).setParallelism(getParallelism(info)).name("CsvSinkPreStep") + .writeAsCsv(info.path, info.lineDelimiter, info.fieldDelimiter, info.writeMode).setParallelism(getParallelism(info)).name("CsvSink"); } private void createTextSink(PythonOperationInfo info) throws IOException { DataSet parent = (DataSet) sets.get(info.parentID); - parent.map(new StringDeserializerMap()).writeAsText(info.path, info.writeMode).name("TextSink"); + parent.map(new StringDeserializerMap()).setParallelism(getParallelism(info)) + .writeAsText(info.path, info.writeMode).setParallelism(getParallelism(info)).name("TextSink"); } private void createPrintSink(PythonOperationInfo info) throws IOException { DataSet parent = (DataSet) sets.get(info.parentID); - parent.map(new StringDeserializerMap()).name("PrintSinkPreStep").output(new PrintingOutputFormat(info.toError)); + parent.map(new StringDeserializerMap()).setParallelism(getParallelism(info)).name("PrintSinkPreStep") + .output(new PrintingOutputFormat(info.toError)).setParallelism(getParallelism(info)); } private void createBroadcastVariable(PythonOperationInfo info) throws IOException { @@ -452,17 +463,18 @@ public class PythonPlanBinder { ao = ao.and(info.aggregates[x].agg, info.aggregates[x].field); } - sets.put(info.setID, ao.name("Aggregation")); + sets.put(info.setID, ao.setParallelism(getParallelism(info)).name("Aggregation")); } private void createDistinctOperation(PythonOperationInfo info) throws IOException { DataSet op = (DataSet) sets.get(info.parentID); - sets.put(info.setID, op.distinct(info.keys).name("Distinct").map(new KeyDiscarder()).name("DistinctPostStep")); + sets.put(info.setID, op.distinct(info.keys).setParallelism(getParallelism(info)).name("Distinct") + .map(new KeyDiscarder()).setParallelism(getParallelism(info)).name("DistinctPostStep")); } private void createFirstOperation(PythonOperationInfo info) throws IOException { DataSet op = (DataSet) sets.get(info.parentID); - sets.put(info.setID, op.first(info.count).name("First")); + sets.put(info.setID, op.first(info.count).setParallelism(getParallelism(info)).name("First")); } private void createGroupOperation(PythonOperationInfo info) throws IOException { @@ -472,12 +484,13 @@ public class PythonPlanBinder { private void createHashPartitionOperation(PythonOperationInfo info) throws IOException { DataSet op1 = (DataSet) sets.get(info.parentID); - sets.put(info.setID, op1.partitionByHash(info.keys).map(new KeyDiscarder()).name("HashPartitionPostStep")); + sets.put(info.setID, op1.partitionByHash(info.keys).setParallelism(getParallelism(info)) + .map(new KeyDiscarder()).setParallelism(getParallelism(info)).name("HashPartitionPostStep")); } private void createRebalanceOperation(PythonOperationInfo info) throws IOException { DataSet op = (DataSet) sets.get(info.parentID); - sets.put(info.setID, op.rebalance().name("Rebalance")); + sets.put(info.setID, op.rebalance().setParallelism(getParallelism(info)).name("Rebalance")); } private void createSortOperation(PythonOperationInfo info) throws IOException { @@ -494,19 +507,16 @@ public class PythonPlanBinder { private void createUnionOperation(PythonOperationInfo info) throws IOException { DataSet op1 = (DataSet) sets.get(info.parentID); DataSet op2 = (DataSet) sets.get(info.otherID); - sets.put(info.setID, op1.union(op2).name("Union")); + sets.put(info.setID, op1.union(op2).setParallelism(getParallelism(info)).name("Union")); } private void createCoGroupOperation(PythonOperationInfo info) { DataSet op1 = (DataSet) sets.get(info.parentID); DataSet op2 = (DataSet) sets.get(info.otherID); - sets.put(info.setID, new CoGroupRawOperator( - op1, - op2, - new Keys.ExpressionKeys(info.keys1, op1.getType()), - new Keys.ExpressionKeys(info.keys2, op2.getType()), - new PythonCoGroup(info.setID, info.types), - info.types, info.name)); + Keys.ExpressionKeys<?> key1 = new Keys.ExpressionKeys(info.keys1, op1.getType()); + Keys.ExpressionKeys<?> key2 = new Keys.ExpressionKeys(info.keys2, op2.getType()); + PythonCoGroup pcg = new PythonCoGroup(info.setID, info.types); + sets.put(info.setID, new CoGroupRawOperator(op1, op2, key1, key2, pcg, info.types, info.name).setParallelism(getParallelism(info))); } private void createCrossOperation(DatasizeHint mode, PythonOperationInfo info) { @@ -527,8 +537,10 @@ public class PythonPlanBinder { default: throw new IllegalArgumentException("Invalid Cross mode specified: " + mode); } + + defaultResult.setParallelism(getParallelism(info)); if (info.usesUDF) { - sets.put(info.setID, defaultResult.mapPartition(new PythonMapPartition(info.setID, info.types)).name(info.name)); + sets.put(info.setID, defaultResult.mapPartition(new PythonMapPartition(info.setID, info.types)).setParallelism(getParallelism(info)).name(info.name)); } else { sets.put(info.setID, defaultResult.name("DefaultCross")); } @@ -536,12 +548,12 @@ public class PythonPlanBinder { private void createFilterOperation(PythonOperationInfo info) { DataSet op1 = (DataSet) sets.get(info.parentID); - sets.put(info.setID, op1.mapPartition(new PythonMapPartition(info.setID, info.types)).name(info.name)); + sets.put(info.setID, op1.mapPartition(new PythonMapPartition(info.setID, info.types)).setParallelism(getParallelism(info)).name(info.name)); } private void createFlatMapOperation(PythonOperationInfo info) { DataSet op1 = (DataSet) sets.get(info.parentID); - sets.put(info.setID, op1.mapPartition(new PythonMapPartition(info.setID, info.types)).name(info.name)); + sets.put(info.setID, op1.mapPartition(new PythonMapPartition(info.setID, info.types)).setParallelism(getParallelism(info)).name(info.name)); } private void createGroupReduceOperation(PythonOperationInfo info) { @@ -560,24 +572,18 @@ public class PythonPlanBinder { } private DataSet applyGroupReduceOperation(DataSet op1, PythonOperationInfo info) { - return op1.reduceGroup(new IdentityGroupReduce()) - .setCombinable(false).name("PythonGroupReducePreStep") - .mapPartition(new PythonMapPartition(info.setID, info.types)) - .name(info.name); + return op1.reduceGroup(new IdentityGroupReduce()).setCombinable(false).name("PythonGroupReducePreStep").setParallelism(getParallelism(info)) + .mapPartition(new PythonMapPartition(info.setID, info.types)).setParallelism(getParallelism(info)).name(info.name); } private DataSet applyGroupReduceOperation(UnsortedGrouping op1, PythonOperationInfo info) { - return op1.reduceGroup(new IdentityGroupReduce()) - .setCombinable(false).name("PythonGroupReducePreStep") - .mapPartition(new PythonMapPartition(info.setID, info.types)) - .name(info.name); + return op1.reduceGroup(new IdentityGroupReduce()).setCombinable(false).setParallelism(getParallelism(info)).name("PythonGroupReducePreStep") + .mapPartition(new PythonMapPartition(info.setID, info.types)).setParallelism(getParallelism(info)).name(info.name); } private DataSet applyGroupReduceOperation(SortedGrouping op1, PythonOperationInfo info) { - return op1.reduceGroup(new IdentityGroupReduce()) - .setCombinable(false).name("PythonGroupReducePreStep") - .mapPartition(new PythonMapPartition(info.setID, info.types)) - .name(info.name); + return op1.reduceGroup(new IdentityGroupReduce()).setCombinable(false).setParallelism(getParallelism(info)).name("PythonGroupReducePreStep") + .mapPartition(new PythonMapPartition(info.setID, info.types)).setParallelism(getParallelism(info)).name(info.name); } private void createJoinOperation(DatasizeHint mode, PythonOperationInfo info) { @@ -585,21 +591,24 @@ public class PythonPlanBinder { DataSet op2 = (DataSet) sets.get(info.otherID); if (info.usesUDF) { - sets.put(info.setID, createDefaultJoin(op1, op2, info.keys1, info.keys2, mode) - .mapPartition(new PythonMapPartition(info.setID, info.types)).name(info.name)); + sets.put(info.setID, createDefaultJoin(op1, op2, info.keys1, info.keys2, mode, getParallelism(info)) + .mapPartition(new PythonMapPartition(info.setID, info.types)).setParallelism(getParallelism(info)).name(info.name)); } else { - sets.put(info.setID, createDefaultJoin(op1, op2, info.keys1, info.keys2, mode)); + sets.put(info.setID, createDefaultJoin(op1, op2, info.keys1, info.keys2, mode, getParallelism(info))); } } - private DataSet createDefaultJoin(DataSet op1, DataSet op2, String[] firstKeys, String[] secondKeys, DatasizeHint mode) { + private DataSet createDefaultJoin(DataSet op1, DataSet op2, String[] firstKeys, String[] secondKeys, DatasizeHint mode, int parallelism) { switch (mode) { case NONE: - return op1.join(op2).where(firstKeys).equalTo(secondKeys).map(new NestedKeyDiscarder()).name("DefaultJoinPostStep"); + return op1.join(op2).where(firstKeys).equalTo(secondKeys).setParallelism(parallelism) + .map(new NestedKeyDiscarder()).setParallelism(parallelism).name("DefaultJoinPostStep"); case HUGE: - return op1.joinWithHuge(op2).where(firstKeys).equalTo(secondKeys).map(new NestedKeyDiscarder()).name("DefaultJoinPostStep"); + return op1.joinWithHuge(op2).where(firstKeys).equalTo(secondKeys).setParallelism(parallelism) + .map(new NestedKeyDiscarder()).setParallelism(parallelism).name("DefaultJoinPostStep"); case TINY: - return op1.joinWithTiny(op2).where(firstKeys).equalTo(secondKeys).map(new NestedKeyDiscarder()).name("DefaultJoinPostStep"); + return op1.joinWithTiny(op2).where(firstKeys).equalTo(secondKeys).setParallelism(parallelism) + .map(new NestedKeyDiscarder()).setParallelism(parallelism).name("DefaultJoinPostStep"); default: throw new IllegalArgumentException("Invalid join mode specified."); } @@ -607,12 +616,12 @@ public class PythonPlanBinder { private void createMapOperation(PythonOperationInfo info) { DataSet op1 = (DataSet) sets.get(info.parentID); - sets.put(info.setID, op1.mapPartition(new PythonMapPartition(info.setID, info.types)).name(info.name)); + sets.put(info.setID, op1.mapPartition(new PythonMapPartition(info.setID, info.types)).setParallelism(getParallelism(info)).name(info.name)); } private void createMapPartitionOperation(PythonOperationInfo info) { DataSet op1 = (DataSet) sets.get(info.parentID); - sets.put(info.setID, op1.mapPartition(new PythonMapPartition(info.setID, info.types)).name(info.name)); + sets.put(info.setID, op1.mapPartition(new PythonMapPartition(info.setID, info.types)).setParallelism(getParallelism(info)).name(info.name)); } private void createReduceOperation(PythonOperationInfo info) { @@ -627,16 +636,12 @@ public class PythonPlanBinder { } private DataSet applyReduceOperation(DataSet op1, PythonOperationInfo info) { - return op1.reduceGroup(new IdentityGroupReduce()) - .setCombinable(false).name("PythonReducePreStep") - .mapPartition(new PythonMapPartition(info.setID, info.types)) - .name(info.name); + return op1.reduceGroup(new IdentityGroupReduce()).setCombinable(false).setParallelism(getParallelism(info)).name("PythonReducePreStep") + .mapPartition(new PythonMapPartition(info.setID, info.types)).setParallelism(getParallelism(info)).name(info.name); } private DataSet applyReduceOperation(UnsortedGrouping op1, PythonOperationInfo info) { - return op1.reduceGroup(new IdentityGroupReduce()) - .setCombinable(false).name("PythonReducePreStep") - .mapPartition(new PythonMapPartition(info.setID, info.types)) - .name(info.name); + return op1.reduceGroup(new IdentityGroupReduce()).setCombinable(false).setParallelism(getParallelism(info)).name("PythonReducePreStep") + .mapPartition(new PythonMapPartition(info.setID, info.types)).setParallelism(getParallelism(info)).name(info.name); } } http://git-wip-us.apache.org/repos/asf/flink/blob/40422d50/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py index 5bb34e5..e024a38 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py @@ -55,6 +55,22 @@ class CsvStringify(MapFunction): return str(value) +class DataSink(object): + def __init__(self, env, info): + self._env = env + self._info = info + info.id = env._counter + env._counter += 1 + + def name(self, name): + self._info.name = name + return self + + def set_parallelism(self, parallelism): + self._info.parallelism.value = parallelism + return self + + class DataSet(object): def __init__(self, env, info): self._env = env @@ -66,15 +82,18 @@ class DataSet(object): """ Writes a DataSet to the standard output stream (stdout). """ - self.map(Stringify())._output(to_error) + return self.map(Stringify())._output(to_error) def _output(self, to_error): child = OperationInfo() + child_set = DataSink(self._env, child) child.identifier = _Identifier.SINK_PRINT child.parent = self._info child.to_err = to_error + self._info.parallelism = child.parallelism self._info.sinks.append(child) self._env._sinks.append(child) + return child_set def write_text(self, path, write_mode=WriteMode.NO_OVERWRITE): """ @@ -87,12 +106,15 @@ class DataSet(object): def _write_text(self, path, write_mode): child = OperationInfo() + child_set = DataSink(self._env, child) child.identifier = _Identifier.SINK_TEXT child.parent = self._info child.path = path child.write_mode = write_mode + self._info.parallelism = child.parallelism self._info.sinks.append(child) self._env._sinks.append(child) + return child_set def write_csv(self, path, line_delimiter="\n", field_delimiter=',', write_mode=WriteMode.NO_OVERWRITE): """ @@ -106,14 +128,17 @@ class DataSet(object): def _write_csv(self, path, line_delimiter, field_delimiter, write_mode): child = OperationInfo() + child_set = DataSink(self._env, child) child.identifier = _Identifier.SINK_CSV child.path = path child.parent = self._info child.delimiter_field = field_delimiter child.delimiter_line = line_delimiter child.write_mode = write_mode + self._info.parallelism = child.parallelism self._info.sinks.append(child) self._env._sinks.append(child) + return child_set def reduce_group(self, operator, combinable=False): """ @@ -303,6 +328,7 @@ class DataSet(object): child.identifier = _Identifier.DISTINCT child.parent = self._info child.keys = fields + self._info.parallelism = child.parallelism self._info.children.append(child) self._env._sets.append(child) return child_set @@ -498,6 +524,7 @@ class DataSet(object): child.identifier = _Identifier.PARTITION_HASH child.parent = self._info child.keys = fields + self._info.parallelism = child.parallelism self._info.children.append(child) self._env._sets.append(child) return child_set @@ -541,6 +568,10 @@ class DataSet(object): self._info.name = name return self + def set_parallelism(self, parallelism): + self._info.parallelism.value = parallelism + return self + class OperatorSet(DataSet): def __init__(self, env, info): @@ -611,6 +642,7 @@ class Grouping(object): child.types = _createArrayTypeInfo() child.name = "PythonGroupReduce" child.key1 = self._child_chain[0].keys + self._info.parallelism = child.parallelism self._info.children.append(child) self._env._sets.append(child) @@ -666,6 +698,7 @@ class UnsortedGrouping(Grouping): child.name = "PythonReduce" child.types = _createArrayTypeInfo() child.key1 = self._child_chain[0].keys + self._info.parallelism = child.parallelism self._info.children.append(child) self._env._sets.append(child) @@ -830,6 +863,8 @@ class CoGroupOperatorUsing(object): self._info.key2 = tuple([x for x in range(len(self._info.key2))]) operator._keys1 = self._info.key1 operator._keys2 = self._info.key2 + self._info.parent.parallelism = self._info.parallelism + self._info.other.parallelism = self._info.parallelism self._info.operator = operator self._info.types = _createArrayTypeInfo() self._info.name = "PythonCoGroup" @@ -864,6 +899,7 @@ class JoinOperatorWhere(object): new_parent_set = self._info.parent_set.map(lambda x: (f(x), x)) new_parent_set._info.types = _createKeyValueTypeInfo(len(fields)) self._info.parent = new_parent_set._info + self._info.parent.parallelism = self._info.parallelism self._info.parent.children.append(self._info) self._info.key1 = tuple([x for x in range(len(fields))]) return JoinOperatorTo(self._env, self._info) @@ -895,6 +931,7 @@ class JoinOperatorTo(object): new_other_set = self._info.other_set.map(lambda x: (f(x), x)) new_other_set._info.types = _createKeyValueTypeInfo(len(fields)) self._info.other = new_other_set._info + self._info.other.parallelism = self._info.parallelism self._info.other.children.append(self._info) self._info.key2 = tuple([x for x in range(len(fields))]) self._env._sets.append(self._info) @@ -977,6 +1014,7 @@ class Projectable: child.parent = info child.types = _createArrayTypeInfo() child.name = "Projector" + child.parallelism = info.parallelism info.children.append(child) env._sets.append(child) return child_set http://git-wip-us.apache.org/repos/asf/flink/blob/40422d50/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py index b410ead..a9f7f14 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py @@ -281,6 +281,7 @@ class Environment(object): collect(len(set.values)) for value in set.values: collect(value) + collect(set.parallelism.value) def _receive_result(self): jer = JobExecutionResult() http://git-wip-us.apache.org/repos/asf/flink/blob/40422d50/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py index bd7d2b5..5d83e33 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py @@ -18,6 +18,11 @@ from flink.plan.Constants import WriteMode +class Value(): + def __init__(self, value): + self.value = value + + class OperationInfo(): def __init__(self, info=None): if info is None: @@ -42,6 +47,7 @@ class OperationInfo(): self.projections = [] self.id = -1 self.to_err = False + self.parallelism = Value(-1) #internally used self.parent_set = None self.other_set = None