Repository: flink Updated Branches: refs/heads/masteer [created] 499b60fed
[FLINK-3290] [py] Generalize OperationInfo transfer -identifier saved in java OpInfo -changed default values to prevent null exceptions -all operations use the same routine to transfer parameters -PyPlRcv can handle Tuple0 -labeled py OpInfo fields as transferred/internal -fixed broadcast OpInfo not having correct identifier -removed unused projection code Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/499b60fe Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/499b60fe Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/499b60fe Branch: refs/heads/masteer Commit: 499b60fedd6db1cd0c1a4e1cc8c59a94b89c5c84 Parents: f681d9b Author: zentol <s.mo...@web.de> Authored: Wed Jan 27 14:52:27 2016 +0100 Committer: zentol <s.mo...@web.de> Committed: Wed Jan 27 14:55:34 2016 +0100 ---------------------------------------------------------------------- .../flink/python/api/PythonOperationInfo.java | 213 +++++-------------- .../flink/python/api/PythonPlanBinder.java | 207 ++++++++---------- .../api/streaming/plan/PythonPlanReceiver.java | 2 +- .../api/flink/functions/GroupReduceFunction.py | 4 +- .../api/flink/functions/ReduceFunction.py | 4 +- .../flink/python/api/flink/plan/DataSet.py | 1 + .../flink/python/api/flink/plan/Environment.py | 141 +++--------- .../python/api/flink/plan/OperationInfo.py | 30 +-- 8 files changed, 196 insertions(+), 406 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/499b60fe/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java index 30a7133..1e3005d 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java @@ -20,10 +20,10 @@ import org.apache.flink.api.java.aggregation.Aggregations; import org.apache.flink.api.java.tuple.Tuple; import static org.apache.flink.api.java.typeutils.TypeExtractor.getForObject; import org.apache.flink.core.fs.FileSystem.WriteMode; -import org.apache.flink.python.api.PythonPlanBinder.Operation; import org.apache.flink.python.api.streaming.plan.PythonPlanStreamer; public class PythonOperationInfo { + public String identifier; public int parentID; //DataSet that an operation is applied on public int otherID; //secondary DataSet public int setID; //ID for new DataSet @@ -35,7 +35,6 @@ public class PythonOperationInfo { public Object[] values; public int count; public String field; - public int[] fields; public Order order; public String path; public String fieldDelimiter; @@ -47,154 +46,59 @@ public class PythonOperationInfo { public String name; public boolean usesUDF; - public PythonOperationInfo(PythonPlanStreamer streamer, Operation identifier) throws IOException { - Object tmpType; - switch (identifier) { - case SOURCE_CSV: - setID = (Integer) streamer.getRecord(true); - path = (String) streamer.getRecord(); - fieldDelimiter = (String) streamer.getRecord(); - lineDelimiter = (String) streamer.getRecord(); - tmpType = (Tuple) streamer.getRecord(); - types = tmpType == null ? null : getForObject(tmpType); - return; - case SOURCE_TEXT: - setID = (Integer) streamer.getRecord(true); - path = (String) streamer.getRecord(); - return; - case SOURCE_VALUE: - setID = (Integer) streamer.getRecord(true); - int valueCount = (Integer) streamer.getRecord(true); - values = new Object[valueCount]; - for (int x = 0; x < valueCount; x++) { - values[x] = streamer.getRecord(); - } - return; - case SOURCE_SEQ: - setID = (Integer) streamer.getRecord(true); - from = (Long) streamer.getRecord(); - to = (Long) streamer.getRecord(); - return; - case SINK_CSV: - parentID = (Integer) streamer.getRecord(true); - path = (String) streamer.getRecord(); - fieldDelimiter = (String) streamer.getRecord(); - lineDelimiter = (String) streamer.getRecord(); - writeMode = ((Integer) streamer.getRecord(true)) == 1 - ? WriteMode.OVERWRITE - : WriteMode.NO_OVERWRITE; - return; - case SINK_TEXT: - parentID = (Integer) streamer.getRecord(true); - path = (String) streamer.getRecord(); - writeMode = ((Integer) streamer.getRecord(true)) == 1 - ? WriteMode.OVERWRITE - : WriteMode.NO_OVERWRITE; - return; - case SINK_PRINT: - parentID = (Integer) streamer.getRecord(true); - toError = (Boolean) streamer.getRecord(); - return; - case BROADCAST: - parentID = (Integer) streamer.getRecord(true); - otherID = (Integer) streamer.getRecord(true); - name = (String) streamer.getRecord(); - return; - } - setID = (Integer) streamer.getRecord(true); + public PythonOperationInfo(PythonPlanStreamer streamer) throws IOException { + identifier = (String) streamer.getRecord(); parentID = (Integer) streamer.getRecord(true); - switch (identifier) { - case AGGREGATE: - count = (Integer) streamer.getRecord(true); - aggregates = new AggregationEntry[count]; - for (int x = 0; x < count; x++) { - int encodedAgg = (Integer) streamer.getRecord(true); - int field = (Integer) streamer.getRecord(true); - aggregates[x] = new AggregationEntry(encodedAgg, field); - } - return; - case FIRST: - count = (Integer) streamer.getRecord(true); - return; - case DISTINCT: - case GROUPBY: - case PARTITION_HASH: - keys = normalizeKeys(streamer.getRecord(true)); - return; - case PROJECTION: - fields = toIntArray(streamer.getRecord(true)); - return; - case REBALANCE: - return; - case SORT: - field = "f0.f" + (Integer) streamer.getRecord(true); - int encodedOrder = (Integer) streamer.getRecord(true); - switch (encodedOrder) { - case 0: - order = Order.NONE; - break; - case 1: - order = Order.ASCENDING; - break; - case 2: - order = Order.DESCENDING; - break; - case 3: - order = Order.ANY; - break; - default: - order = Order.NONE; - break; - } - return; - case UNION: - otherID = (Integer) streamer.getRecord(true); - return; - case COGROUP: - otherID = (Integer) streamer.getRecord(true); - keys1 = normalizeKeys(streamer.getRecord(true)); - keys2 = normalizeKeys(streamer.getRecord(true)); - tmpType = streamer.getRecord(); - types = tmpType == null ? null : getForObject(tmpType); - name = (String) streamer.getRecord(); - return; - case CROSS: - case CROSS_H: - case CROSS_T: - otherID = (Integer) streamer.getRecord(true); - usesUDF = (Boolean) streamer.getRecord(); - tmpType = streamer.getRecord(); - types = tmpType == null ? null : getForObject(tmpType); - name = (String) streamer.getRecord(); - return; - case REDUCE: - case GROUPREDUCE: - tmpType = streamer.getRecord(); - types = tmpType == null ? null : getForObject(tmpType); - name = (String) streamer.getRecord(); - return; - case JOIN: - case JOIN_H: - case JOIN_T: - keys1 = normalizeKeys(streamer.getRecord(true)); - keys2 = normalizeKeys(streamer.getRecord(true)); - otherID = (Integer) streamer.getRecord(true); - usesUDF = (Boolean) streamer.getRecord(); - tmpType = streamer.getRecord(); - types = tmpType == null ? null : getForObject(tmpType); - name = (String) streamer.getRecord(); - return; - case MAPPARTITION: - case FLATMAP: - case MAP: - case FILTER: - tmpType = streamer.getRecord(); - types = tmpType == null ? null : getForObject(tmpType); - name = (String) streamer.getRecord(); - return; + otherID = (Integer) streamer.getRecord(true); + field = "f0.f" + (Integer) streamer.getRecord(true); + int encodedOrder = (Integer) streamer.getRecord(true); + switch (encodedOrder) { + case 0: + order = Order.NONE; + break; + case 1: + order = Order.ASCENDING; + break; + case 2: + order = Order.DESCENDING; + break; + case 3: + order = Order.ANY; + break; default: - throw new UnsupportedOperationException("This operation is not implemented in the Python API: " + identifier); + order = Order.NONE; + break; + } + keys = normalizeKeys(streamer.getRecord(true)); + keys1 = normalizeKeys(streamer.getRecord(true)); + keys2 = normalizeKeys(streamer.getRecord(true)); + Object tmpType = streamer.getRecord(); + types = tmpType == null ? null : getForObject(tmpType); + usesUDF = (Boolean) streamer.getRecord(); + name = (String) streamer.getRecord(); + lineDelimiter = (String) streamer.getRecord(); + fieldDelimiter = (String) streamer.getRecord(); + writeMode = ((Integer) streamer.getRecord(true)) == 1 + ? WriteMode.OVERWRITE + : WriteMode.NO_OVERWRITE; + path = (String) streamer.getRecord(); + setID = (Integer) streamer.getRecord(true); + toError = (Boolean) streamer.getRecord(); + count = (Integer) streamer.getRecord(true); + int valueCount = (Integer) streamer.getRecord(true); + values = new Object[valueCount]; + for (int x = 0; x < valueCount; x++) { + values[x] = streamer.getRecord(); } + + /* + aggregates = new AggregationEntry[count]; + for (int x = 0; x < count; x++) { + int encodedAgg = (Integer) streamer.getRecord(true); + int field = (Integer) streamer.getRecord(true); + aggregates[x] = new AggregationEntry(encodedAgg, field); + } + */ } @Override @@ -283,21 +187,6 @@ public class PythonOperationInfo { throw new RuntimeException("Key argument is neither an int[] nor a Tuple: " + keys.toString()); } - private static int[] toIntArray(Object key) { - if (key instanceof Tuple) { - Tuple tuple = (Tuple) key; - int[] keys = new int[tuple.getArity()]; - for (int y = 0; y < tuple.getArity(); y++) { - keys[y] = (Integer) tuple.getField(y); - } - return keys; - } - if (key instanceof int[]) { - return (int[]) key; - } - throw new RuntimeException("Key argument is neither an int[] nor a Tuple."); - } - private static String[] tupleToStringArray(Tuple tuple) { String[] keys = new String[tuple.getArity()]; for (int y = 0; y < tuple.getArity(); y++) { http://git-wip-us.apache.org/repos/asf/flink/blob/499b60fe/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java index 5b0d846..3877ef1 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java @@ -280,7 +280,7 @@ public class PythonPlanBinder { */ protected enum Operation { SOURCE_CSV, SOURCE_TEXT, SOURCE_VALUE, SOURCE_SEQ, SINK_CSV, SINK_TEXT, SINK_PRINT, - PROJECTION, SORT, UNION, FIRST, DISTINCT, GROUPBY, AGGREGATE, + SORT, UNION, FIRST, DISTINCT, GROUPBY, AGGREGATE, REBALANCE, PARTITION_HASH, BROADCAST, COGROUP, CROSS, CROSS_H, CROSS_T, FILTER, FLATMAP, GROUPREDUCE, JOIN, JOIN_H, JOIN_T, MAP, REDUCE, MAPPARTITION @@ -289,121 +289,105 @@ public class PythonPlanBinder { private void receiveOperations() throws IOException { Integer operationCount = (Integer) streamer.getRecord(true); for (int x = 0; x < operationCount; x++) { - String identifier = (String) streamer.getRecord(); - Operation op = null; + PythonOperationInfo info = new PythonOperationInfo(streamer); + Operation op; try { - op = Operation.valueOf(identifier.toUpperCase()); + op = Operation.valueOf(info.identifier.toUpperCase()); } catch (IllegalArgumentException iae) { - throw new IllegalArgumentException("Invalid operation specified: " + identifier); + throw new IllegalArgumentException("Invalid operation specified: " + info.identifier); } - if (op != null) { - switch (op) { - case SOURCE_CSV: - createCsvSource(createOperationInfo(op)); - break; - case SOURCE_TEXT: - createTextSource(createOperationInfo(op)); - break; - case SOURCE_VALUE: - createValueSource(createOperationInfo(op)); - break; - case SOURCE_SEQ: - createSequenceSource(createOperationInfo(op)); - break; - case SINK_CSV: - createCsvSink(createOperationInfo(op)); - break; - case SINK_TEXT: - createTextSink(createOperationInfo(op)); - break; - case SINK_PRINT: - createPrintSink(createOperationInfo(op)); - break; - case BROADCAST: - createBroadcastVariable(createOperationInfo(op)); - break; - case AGGREGATE: - createAggregationOperation(createOperationInfo(op)); - break; - case DISTINCT: - createDistinctOperation(createOperationInfo(op)); - break; - case FIRST: - createFirstOperation(createOperationInfo(op)); - break; - case PARTITION_HASH: - createHashPartitionOperation(createOperationInfo(op)); - break; - case PROJECTION: - createProjectOperation(createOperationInfo(op)); - break; - case REBALANCE: - createRebalanceOperation(createOperationInfo(op)); - break; - case GROUPBY: - createGroupOperation(createOperationInfo(op)); - break; - case SORT: - createSortOperation(createOperationInfo(op)); - break; - case UNION: - createUnionOperation(createOperationInfo(op)); - break; - case COGROUP: - createCoGroupOperation(createOperationInfo(op)); - break; - case CROSS: - createCrossOperation(NONE, createOperationInfo(op)); - break; - case CROSS_H: - createCrossOperation(HUGE, createOperationInfo(op)); - break; - case CROSS_T: - createCrossOperation(TINY, createOperationInfo(op)); - break; - case FILTER: - createFilterOperation(createOperationInfo(op)); - break; - case FLATMAP: - createFlatMapOperation(createOperationInfo(op)); - break; - case GROUPREDUCE: - createGroupReduceOperation(createOperationInfo(op)); - break; - case JOIN: - createJoinOperation(NONE, createOperationInfo(op)); - break; - case JOIN_H: - createJoinOperation(HUGE, createOperationInfo(op)); - break; - case JOIN_T: - createJoinOperation(TINY, createOperationInfo(op)); - break; - case MAP: - createMapOperation(createOperationInfo(op)); - break; - case MAPPARTITION: - createMapPartitionOperation(createOperationInfo(op)); - break; - case REDUCE: - createReduceOperation(createOperationInfo(op)); - break; - } + switch (op) { + case SOURCE_CSV: + createCsvSource(info); + break; + case SOURCE_TEXT: + createTextSource(info); + break; + case SOURCE_VALUE: + createValueSource(info); + break; + case SOURCE_SEQ: + createSequenceSource(info); + break; + case SINK_CSV: + createCsvSink(info); + break; + case SINK_TEXT: + createTextSink(info); + break; + case SINK_PRINT: + createPrintSink(info); + break; + case BROADCAST: + createBroadcastVariable(info); + break; + case AGGREGATE: + createAggregationOperation(info); + break; + case DISTINCT: + createDistinctOperation(info); + break; + case FIRST: + createFirstOperation(info); + break; + case PARTITION_HASH: + createHashPartitionOperation(info); + break; + case REBALANCE: + createRebalanceOperation(info); + break; + case GROUPBY: + createGroupOperation(info); + break; + case SORT: + createSortOperation(info); + break; + case UNION: + createUnionOperation(info); + break; + case COGROUP: + createCoGroupOperation(info); + break; + case CROSS: + createCrossOperation(NONE, info); + break; + case CROSS_H: + createCrossOperation(HUGE, info); + break; + case CROSS_T: + createCrossOperation(TINY, info); + break; + case FILTER: + createFilterOperation(info); + break; + case FLATMAP: + createFlatMapOperation(info); + break; + case GROUPREDUCE: + createGroupReduceOperation(info); + break; + case JOIN: + createJoinOperation(NONE, info); + break; + case JOIN_H: + createJoinOperation(HUGE, info); + break; + case JOIN_T: + createJoinOperation(TINY, info); + break; + case MAP: + createMapOperation(info); + break; + case MAPPARTITION: + createMapPartitionOperation(info); + break; + case REDUCE: + createReduceOperation(info); + break; } } } - /** - * This method creates an OperationInfo object based on the operation-identifier passed. - * - * @param operationIdentifier - * @return - * @throws IOException - */ - private PythonOperationInfo createOperationInfo(Operation operationIdentifier) throws IOException { - return new PythonOperationInfo(streamer, operationIdentifier); - } - private void createCsvSource(PythonOperationInfo info) throws IOException { if (!(info.types instanceof TupleTypeInfo)) { throw new RuntimeException("The output type of a csv source has to be a tuple. The derived type is " + info); @@ -491,11 +475,6 @@ public class PythonPlanBinder { sets.put(info.setID, op1.partitionByHash(info.keys).map(new KeyDiscarder()).name("HashPartitionPostStep")); } - private void createProjectOperation(PythonOperationInfo info) throws IOException { - DataSet op1 = (DataSet) sets.get(info.parentID); - sets.put(info.setID, op1.project(info.fields).name("Projection")); - } - private void createRebalanceOperation(PythonOperationInfo info) throws IOException { DataSet op = (DataSet) sets.get(info.parentID); sets.put(info.setID, op.rebalance().name("Rebalance")); http://git-wip-us.apache.org/repos/asf/flink/blob/499b60fe/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java index 6d2dcd1..a54b8dd 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java @@ -49,7 +49,7 @@ public class PythonPlanReceiver implements Serializable { private Deserializer getDeserializer() throws IOException { byte type = (byte) input.readByte(); - if (type > 0 && type < 26) { + if (type >= 0 && type < 26) { Deserializer[] d = new Deserializer[type]; for (int x = 0; x < d.length; x++) { d[x] = getDeserializer(); http://git-wip-us.apache.org/repos/asf/flink/blob/499b60fe/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py index 340497d..8d1934c 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py @@ -27,7 +27,7 @@ class GroupReduceFunction(Function.Function): def _configure(self, input_file, output_file, port, env, info): super(GroupReduceFunction, self)._configure(input_file, output_file, port, env, info) - if info.key1 is None: + if len(info.key1) == 0: self._run = self._run_all_group_reduce else: self._run = self._run_grouped_group_reduce @@ -63,4 +63,4 @@ class GroupReduceFunction(Function.Function): pass def combine(self, iterator, collector): - self.reduce(iterator, collector) \ No newline at end of file + self.reduce(iterator, collector) http://git-wip-us.apache.org/repos/asf/flink/blob/499b60fe/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py index 95e8b8a..b1d2201 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py @@ -26,7 +26,7 @@ class ReduceFunction(Function.Function): def _configure(self, input_file, output_file, port, env, info): super(ReduceFunction, self)._configure(input_file, output_file, port, env, info) - if info.key1 is None: + if len(info.key1) == 0: self._run = self._run_all_reduce else: self._run = self._run_grouped_reduce @@ -64,4 +64,4 @@ class ReduceFunction(Function.Function): pass def combine(self, value1, value2): - return self.reduce(value1, value2) \ No newline at end of file + return self.reduce(value1, value2) http://git-wip-us.apache.org/repos/asf/flink/blob/499b60fe/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py index 3132651..5bb34e5 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py @@ -548,6 +548,7 @@ class OperatorSet(DataSet): def with_broadcast_set(self, name, set): child = OperationInfo() + child.identifier = _Identifier.BROADCAST child.parent = self._info child.other = set._info child.name = name http://git-wip-us.apache.org/repos/asf/flink/blob/499b60fe/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py index 777f30b..b410ead 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py @@ -238,11 +238,7 @@ class Environment(object): def _send_plan(self): self._send_parameters() - self._collector.collect(len(self._sources) + len(self._sets) + len(self._sinks) + len(self._broadcast)) - self._send_sources() self._send_operations() - self._send_sinks() - self._send_broadcast() def _send_parameters(self): collect = self._collector.collect @@ -251,117 +247,40 @@ class Environment(object): collect(("mode", self._local_mode)) collect(("retry", self._retry)) - def _send_sources(self): - for source in self._sources: - identifier = source.identifier - collect = self._collector.collect - collect(identifier) - collect(source.id) - for case in Switch(identifier): - if case(_Identifier.SOURCE_CSV): - collect(source.path) - collect(source.delimiter_field) - collect(source.delimiter_line) - collect(source.types) - break - if case(_Identifier.SOURCE_TEXT): - collect(source.path) - break - if case(_Identifier.SOURCE_VALUE): - collect(len(source.values)) - for value in source.values: - collect(value) - break - def _send_operations(self): - collect = self._collector.collect + self._collector.collect(len(self._sources) + len(self._sets) + len(self._sinks) + len(self._broadcast)) + for source in self._sources: + self._send_operation(source) for set in self._sets: - identifier = set.identifier - collect(set.identifier) - collect(set.id) - collect(set.parent.id) - for case in Switch(identifier): - if case(_Identifier.REBALANCE): - break - if case(_Identifier.DISTINCT, _Identifier.PARTITION_HASH): - collect(set.keys) - break - if case(_Identifier.FIRST): - collect(set.count) - break - if case(_Identifier.SORT): - collect(set.field) - collect(set.order) - break - if case(_Identifier.GROUP): - collect(set.keys) - break - if case(_Identifier.COGROUP): - collect(set.other.id) - collect(set.key1) - collect(set.key2) - collect(set.types) - collect(set.name) - break - if case(_Identifier.CROSS, _Identifier.CROSSH, _Identifier.CROSST): - collect(set.other.id) - collect(set.uses_udf) - collect(set.types) - collect(set.name) - break - if case(_Identifier.REDUCE, _Identifier.GROUPREDUCE): - collect(set.types) - collect(set.name) - break - if case(_Identifier.JOIN, _Identifier.JOINH, _Identifier.JOINT): - collect(set.key1) - collect(set.key2) - collect(set.other.id) - collect(set.uses_udf) - collect(set.types) - collect(set.name) - break - if case(_Identifier.MAP, _Identifier.MAPPARTITION, _Identifier.FLATMAP, _Identifier.FILTER): - collect(set.types) - collect(set.name) - break - if case(_Identifier.UNION): - collect(set.other.id) - break - if case(_Identifier.PROJECTION): - collect(set.keys) - break - if case(): - raise KeyError("Environment._send_child_sets(): Invalid operation identifier: " + str(identifier)) - - def _send_sinks(self): + self._send_operation(set) for sink in self._sinks: - identifier = sink.identifier - collect = self._collector.collect - collect(identifier) - collect(sink.parent.id) - for case in Switch(identifier): - if case(_Identifier.SINK_CSV): - collect(sink.path) - collect(sink.delimiter_field) - collect(sink.delimiter_line) - collect(sink.write_mode) - break; - if case(_Identifier.SINK_TEXT): - collect(sink.path) - collect(sink.write_mode) - break - if case(_Identifier.SINK_PRINT): - collect(sink.to_err) - break - - def _send_broadcast(self): + self._send_operation(sink) + for bcv in self._broadcast: + self._send_operation(bcv) + + def _send_operation(self, set): collect = self._collector.collect - for entry in self._broadcast: - collect(_Identifier.BROADCAST) - collect(entry.parent.id) - collect(entry.other.id) - collect(entry.name) + collect(set.identifier) + collect(set.parent.id if set.parent is not None else -1) + collect(set.other.id if set.other is not None else -1) + collect(set.field) + collect(set.order) + collect(set.keys) + collect(set.key1) + collect(set.key2) + collect(set.types) + collect(set.uses_udf) + collect(set.name) + collect(set.delimiter_line) + collect(set.delimiter_field) + collect(set.write_mode) + collect(set.path) + collect(set.id) + collect(set.to_err) + collect(set.count) + collect(len(set.values)) + for value in set.values: + collect(value) def _receive_result(self): jer = JobExecutionResult() http://git-wip-us.apache.org/repos/asf/flink/blob/499b60fe/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py index 6eb228c..bd7d2b5 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py @@ -21,33 +21,35 @@ from flink.plan.Constants import WriteMode class OperationInfo(): def __init__(self, info=None): if info is None: + #fields being transferred to the java side + self.identifier = -1 self.parent = None self.other = None - self.parent_set = None - self.other_set = None - self.identifier = None - self.field = None - self.order = None - self.keys = None - self.key1 = None - self.key2 = None + self.field = -1 + self.order = 0 + self.keys = () + self.key1 = () + self.key2 = () self.types = None - self.operator = None self.uses_udf = False self.name = None self.delimiter_line = "\n" self.delimiter_field = "," self.write_mode = WriteMode.NO_OVERWRITE - self.sinks = [] - self.children = [] - self.path = None + self.path = "" self.count = 0 self.values = [] self.projections = [] - self.bcvars = [] - self.id = None + self.id = -1 self.to_err = False + #internally used + self.parent_set = None + self.other_set = None self.chained_info = None + self.bcvars = [] + self.sinks = [] + self.children = [] + self.operator = None else: self.__dict__.update(info.__dict__)