Repository: flink
Updated Branches:
  refs/heads/masteer [created] 499b60fed


[FLINK-3290] [py] Generalize OperationInfo transfer

-identifier saved in java OpInfo
-changed default values to prevent null exceptions
-all operations use the same routine to transfer parameters
-PyPlRcv can handle Tuple0
-labeled py OpInfo fields as transferred/internal
-fixed broadcast OpInfo not having correct identifier
-removed unused projection code


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/499b60fe
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/499b60fe
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/499b60fe

Branch: refs/heads/masteer
Commit: 499b60fedd6db1cd0c1a4e1cc8c59a94b89c5c84
Parents: f681d9b
Author: zentol <s.mo...@web.de>
Authored: Wed Jan 27 14:52:27 2016 +0100
Committer: zentol <s.mo...@web.de>
Committed: Wed Jan 27 14:55:34 2016 +0100

----------------------------------------------------------------------
 .../flink/python/api/PythonOperationInfo.java   | 213 +++++--------------
 .../flink/python/api/PythonPlanBinder.java      | 207 ++++++++----------
 .../api/streaming/plan/PythonPlanReceiver.java  |   2 +-
 .../api/flink/functions/GroupReduceFunction.py  |   4 +-
 .../api/flink/functions/ReduceFunction.py       |   4 +-
 .../flink/python/api/flink/plan/DataSet.py      |   1 +
 .../flink/python/api/flink/plan/Environment.py  | 141 +++---------
 .../python/api/flink/plan/OperationInfo.py      |  30 +--
 8 files changed, 196 insertions(+), 406 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/499b60fe/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
index 30a7133..1e3005d 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
@@ -20,10 +20,10 @@ import org.apache.flink.api.java.aggregation.Aggregations;
 import org.apache.flink.api.java.tuple.Tuple;
 import static org.apache.flink.api.java.typeutils.TypeExtractor.getForObject;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
-import org.apache.flink.python.api.PythonPlanBinder.Operation;
 import org.apache.flink.python.api.streaming.plan.PythonPlanStreamer;
 
 public class PythonOperationInfo {
+       public String identifier;
        public int parentID; //DataSet that an operation is applied on
        public int otherID; //secondary DataSet
        public int setID; //ID for new DataSet
@@ -35,7 +35,6 @@ public class PythonOperationInfo {
        public Object[] values;
        public int count;
        public String field;
-       public int[] fields;
        public Order order;
        public String path;
        public String fieldDelimiter;
@@ -47,154 +46,59 @@ public class PythonOperationInfo {
        public String name;
        public boolean usesUDF;
 
-       public PythonOperationInfo(PythonPlanStreamer streamer, Operation 
identifier) throws IOException {
-               Object tmpType;
-               switch (identifier) {
-                       case SOURCE_CSV:
-                               setID = (Integer) streamer.getRecord(true);
-                               path = (String) streamer.getRecord();
-                               fieldDelimiter = (String) streamer.getRecord();
-                               lineDelimiter = (String) streamer.getRecord();
-                               tmpType = (Tuple) streamer.getRecord();
-                               types = tmpType == null ? null : 
getForObject(tmpType);
-                               return;
-                       case SOURCE_TEXT:
-                               setID = (Integer) streamer.getRecord(true);
-                               path = (String) streamer.getRecord();
-                               return;
-                       case SOURCE_VALUE:
-                               setID = (Integer) streamer.getRecord(true);
-                               int valueCount = (Integer) 
streamer.getRecord(true);
-                               values = new Object[valueCount];
-                               for (int x = 0; x < valueCount; x++) {
-                                       values[x] = streamer.getRecord();
-                               }
-                               return;
-                       case SOURCE_SEQ:
-                               setID = (Integer) streamer.getRecord(true);
-                               from = (Long) streamer.getRecord();
-                               to = (Long) streamer.getRecord();
-                               return;
-                       case SINK_CSV:
-                               parentID = (Integer) streamer.getRecord(true);
-                               path = (String) streamer.getRecord();
-                               fieldDelimiter = (String) streamer.getRecord();
-                               lineDelimiter = (String) streamer.getRecord();
-                               writeMode = ((Integer) 
streamer.getRecord(true)) == 1
-                                               ? WriteMode.OVERWRITE
-                                               : WriteMode.NO_OVERWRITE;
-                               return;
-                       case SINK_TEXT:
-                               parentID = (Integer) streamer.getRecord(true);
-                               path = (String) streamer.getRecord();
-                               writeMode = ((Integer) 
streamer.getRecord(true)) == 1
-                                               ? WriteMode.OVERWRITE
-                                               : WriteMode.NO_OVERWRITE;
-                               return;
-                       case SINK_PRINT:
-                               parentID = (Integer) streamer.getRecord(true);
-                               toError = (Boolean) streamer.getRecord();
-                               return;
-                       case BROADCAST:
-                               parentID = (Integer) streamer.getRecord(true);
-                               otherID = (Integer) streamer.getRecord(true);
-                               name = (String) streamer.getRecord();
-                               return;
-               }
-               setID = (Integer) streamer.getRecord(true);
+       public PythonOperationInfo(PythonPlanStreamer streamer) throws 
IOException {
+               identifier = (String) streamer.getRecord();
                parentID = (Integer) streamer.getRecord(true);
-               switch (identifier) {
-                       case AGGREGATE:
-                               count = (Integer) streamer.getRecord(true);
-                               aggregates = new AggregationEntry[count];
-                               for (int x = 0; x < count; x++) {
-                                       int encodedAgg = (Integer) 
streamer.getRecord(true);
-                                       int field = (Integer) 
streamer.getRecord(true);
-                                       aggregates[x] = new 
AggregationEntry(encodedAgg, field);
-                               }
-                               return;
-                       case FIRST:
-                               count = (Integer) streamer.getRecord(true);
-                               return;
-                       case DISTINCT:
-                       case GROUPBY:
-                       case PARTITION_HASH:
-                               keys = normalizeKeys(streamer.getRecord(true));
-                               return;
-                       case PROJECTION:
-                               fields = toIntArray(streamer.getRecord(true));
-                               return;
-                       case REBALANCE:
-                               return;
-                       case SORT:
-                               field = "f0.f" + (Integer) 
streamer.getRecord(true);
-                               int encodedOrder = (Integer) 
streamer.getRecord(true);
-                               switch (encodedOrder) {
-                                       case 0:
-                                               order = Order.NONE;
-                                               break;
-                                       case 1:
-                                               order = Order.ASCENDING;
-                                               break;
-                                       case 2:
-                                               order = Order.DESCENDING;
-                                               break;
-                                       case 3:
-                                               order = Order.ANY;
-                                               break;
-                                       default:
-                                               order = Order.NONE;
-                                               break;
-                               }
-                               return;
-                       case UNION:
-                               otherID = (Integer) streamer.getRecord(true);
-                               return;
-                       case COGROUP:
-                               otherID = (Integer) streamer.getRecord(true);
-                               keys1 = normalizeKeys(streamer.getRecord(true));
-                               keys2 = normalizeKeys(streamer.getRecord(true));
-                               tmpType = streamer.getRecord();
-                               types = tmpType == null ? null : 
getForObject(tmpType);
-                               name = (String) streamer.getRecord();
-                               return;
-                       case CROSS:
-                       case CROSS_H:
-                       case CROSS_T:
-                               otherID = (Integer) streamer.getRecord(true);
-                               usesUDF = (Boolean) streamer.getRecord();
-                               tmpType = streamer.getRecord();
-                               types = tmpType == null ? null : 
getForObject(tmpType);
-                               name = (String) streamer.getRecord();
-                               return;
-                       case REDUCE:
-                       case GROUPREDUCE:
-                               tmpType = streamer.getRecord();
-                               types = tmpType == null ? null : 
getForObject(tmpType);
-                               name = (String) streamer.getRecord();
-                               return;
-                       case JOIN:
-                       case JOIN_H:
-                       case JOIN_T:
-                               keys1 = normalizeKeys(streamer.getRecord(true));
-                               keys2 = normalizeKeys(streamer.getRecord(true));
-                               otherID = (Integer) streamer.getRecord(true);
-                               usesUDF = (Boolean) streamer.getRecord();
-                               tmpType = streamer.getRecord();
-                               types = tmpType == null ? null : 
getForObject(tmpType);
-                               name = (String) streamer.getRecord();
-                               return;
-                       case MAPPARTITION:
-                       case FLATMAP:
-                       case MAP:
-                       case FILTER:
-                               tmpType = streamer.getRecord();
-                               types = tmpType == null ? null : 
getForObject(tmpType);
-                               name = (String) streamer.getRecord();
-                               return;
+               otherID = (Integer) streamer.getRecord(true);
+               field = "f0.f" + (Integer) streamer.getRecord(true);
+               int encodedOrder = (Integer) streamer.getRecord(true);
+               switch (encodedOrder) {
+                       case 0:
+                               order = Order.NONE;
+                               break;
+                       case 1:
+                               order = Order.ASCENDING;
+                               break;
+                       case 2:
+                               order = Order.DESCENDING;
+                               break;
+                       case 3:
+                               order = Order.ANY;
+                               break;
                        default:
-                               throw new UnsupportedOperationException("This 
operation is not implemented in the Python API: " + identifier);
+                               order = Order.NONE;
+                               break;
+               }
+               keys = normalizeKeys(streamer.getRecord(true));
+               keys1 = normalizeKeys(streamer.getRecord(true));
+               keys2 = normalizeKeys(streamer.getRecord(true));
+               Object tmpType = streamer.getRecord();
+               types = tmpType == null ? null : getForObject(tmpType);
+               usesUDF = (Boolean) streamer.getRecord();
+               name = (String) streamer.getRecord();
+               lineDelimiter = (String) streamer.getRecord();
+               fieldDelimiter = (String) streamer.getRecord();
+               writeMode = ((Integer) streamer.getRecord(true)) == 1
+                       ? WriteMode.OVERWRITE
+                       : WriteMode.NO_OVERWRITE;
+               path = (String) streamer.getRecord();
+               setID = (Integer) streamer.getRecord(true);
+               toError = (Boolean) streamer.getRecord();
+               count = (Integer) streamer.getRecord(true);
+               int valueCount = (Integer) streamer.getRecord(true);
+               values = new Object[valueCount];
+               for (int x = 0; x < valueCount; x++) {
+                       values[x] = streamer.getRecord();
                }
+
+               /*
+               aggregates = new AggregationEntry[count];
+               for (int x = 0; x < count; x++) {
+                       int encodedAgg = (Integer) streamer.getRecord(true);
+                       int field = (Integer) streamer.getRecord(true);
+                       aggregates[x] = new AggregationEntry(encodedAgg, field);
+               }
+               */
        }
 
        @Override
@@ -283,21 +187,6 @@ public class PythonOperationInfo {
                throw new RuntimeException("Key argument is neither an int[] 
nor a Tuple: " + keys.toString());
        }
 
-       private static int[] toIntArray(Object key) {
-               if (key instanceof Tuple) {
-                       Tuple tuple = (Tuple) key;
-                       int[] keys = new int[tuple.getArity()];
-                       for (int y = 0; y < tuple.getArity(); y++) {
-                               keys[y] = (Integer) tuple.getField(y);
-                       }
-                       return keys;
-               }
-               if (key instanceof int[]) {
-                       return (int[]) key;
-               }
-               throw new RuntimeException("Key argument is neither an int[] 
nor a Tuple.");
-       }
-
        private static String[] tupleToStringArray(Tuple tuple) {
                String[] keys = new String[tuple.getArity()];
                for (int y = 0; y < tuple.getArity(); y++) {

http://git-wip-us.apache.org/repos/asf/flink/blob/499b60fe/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
index 5b0d846..3877ef1 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -280,7 +280,7 @@ public class PythonPlanBinder {
         */
        protected enum Operation {
                SOURCE_CSV, SOURCE_TEXT, SOURCE_VALUE, SOURCE_SEQ, SINK_CSV, 
SINK_TEXT, SINK_PRINT,
-               PROJECTION, SORT, UNION, FIRST, DISTINCT, GROUPBY, AGGREGATE,
+               SORT, UNION, FIRST, DISTINCT, GROUPBY, AGGREGATE,
                REBALANCE, PARTITION_HASH,
                BROADCAST,
                COGROUP, CROSS, CROSS_H, CROSS_T, FILTER, FLATMAP, GROUPREDUCE, 
JOIN, JOIN_H, JOIN_T, MAP, REDUCE, MAPPARTITION
@@ -289,121 +289,105 @@ public class PythonPlanBinder {
        private void receiveOperations() throws IOException {
                Integer operationCount = (Integer) streamer.getRecord(true);
                for (int x = 0; x < operationCount; x++) {
-                       String identifier = (String) streamer.getRecord();
-                       Operation op = null;
+                       PythonOperationInfo info = new 
PythonOperationInfo(streamer);
+                       Operation op;
                        try {
-                               op = 
Operation.valueOf(identifier.toUpperCase());
+                               op = 
Operation.valueOf(info.identifier.toUpperCase());
                        } catch (IllegalArgumentException iae) {
-                               throw new IllegalArgumentException("Invalid 
operation specified: " + identifier);
+                               throw new IllegalArgumentException("Invalid 
operation specified: " + info.identifier);
                        }
-                       if (op != null) {
-                               switch (op) {
-                                       case SOURCE_CSV:
-                                               
createCsvSource(createOperationInfo(op));
-                                               break;
-                                       case SOURCE_TEXT:
-                                               
createTextSource(createOperationInfo(op));
-                                               break;
-                                       case SOURCE_VALUE:
-                                               
createValueSource(createOperationInfo(op));
-                                               break;
-                                       case SOURCE_SEQ:
-                                               
createSequenceSource(createOperationInfo(op));
-                                               break;
-                                       case SINK_CSV:
-                                               
createCsvSink(createOperationInfo(op));
-                                               break;
-                                       case SINK_TEXT:
-                                               
createTextSink(createOperationInfo(op));
-                                               break;
-                                       case SINK_PRINT:
-                                               
createPrintSink(createOperationInfo(op));
-                                               break;
-                                       case BROADCAST:
-                                               
createBroadcastVariable(createOperationInfo(op));
-                                               break;
-                                       case AGGREGATE:
-                                               
createAggregationOperation(createOperationInfo(op));
-                                               break;
-                                       case DISTINCT:
-                                               
createDistinctOperation(createOperationInfo(op));
-                                               break;
-                                       case FIRST:
-                                               
createFirstOperation(createOperationInfo(op));
-                                               break;
-                                       case PARTITION_HASH:
-                                               
createHashPartitionOperation(createOperationInfo(op));
-                                               break;
-                                       case PROJECTION:
-                                               
createProjectOperation(createOperationInfo(op));
-                                               break;
-                                       case REBALANCE:
-                                               
createRebalanceOperation(createOperationInfo(op));
-                                               break;
-                                       case GROUPBY:
-                                               
createGroupOperation(createOperationInfo(op));
-                                               break;
-                                       case SORT:
-                                               
createSortOperation(createOperationInfo(op));
-                                               break;
-                                       case UNION:
-                                               
createUnionOperation(createOperationInfo(op));
-                                               break;
-                                       case COGROUP:
-                                               
createCoGroupOperation(createOperationInfo(op));
-                                               break;
-                                       case CROSS:
-                                               createCrossOperation(NONE, 
createOperationInfo(op));
-                                               break;
-                                       case CROSS_H:
-                                               createCrossOperation(HUGE, 
createOperationInfo(op));
-                                               break;
-                                       case CROSS_T:
-                                               createCrossOperation(TINY, 
createOperationInfo(op));
-                                               break;
-                                       case FILTER:
-                                               
createFilterOperation(createOperationInfo(op));
-                                               break;
-                                       case FLATMAP:
-                                               
createFlatMapOperation(createOperationInfo(op));
-                                               break;
-                                       case GROUPREDUCE:
-                                               
createGroupReduceOperation(createOperationInfo(op));
-                                               break;
-                                       case JOIN:
-                                               createJoinOperation(NONE, 
createOperationInfo(op));
-                                               break;
-                                       case JOIN_H:
-                                               createJoinOperation(HUGE, 
createOperationInfo(op));
-                                               break;
-                                       case JOIN_T:
-                                               createJoinOperation(TINY, 
createOperationInfo(op));
-                                               break;
-                                       case MAP:
-                                               
createMapOperation(createOperationInfo(op));
-                                               break;
-                                       case MAPPARTITION:
-                                               
createMapPartitionOperation(createOperationInfo(op));
-                                               break;
-                                       case REDUCE:
-                                               
createReduceOperation(createOperationInfo(op));
-                                               break;
-                               }
+                       switch (op) {
+                               case SOURCE_CSV:
+                                       createCsvSource(info);
+                                       break;
+                               case SOURCE_TEXT:
+                                       createTextSource(info);
+                                       break;
+                               case SOURCE_VALUE:
+                                       createValueSource(info);
+                                       break;
+                               case SOURCE_SEQ:
+                                       createSequenceSource(info);
+                                       break;
+                               case SINK_CSV:
+                                       createCsvSink(info);
+                                       break;
+                               case SINK_TEXT:
+                                       createTextSink(info);
+                                       break;
+                               case SINK_PRINT:
+                                       createPrintSink(info);
+                                       break;
+                               case BROADCAST:
+                                       createBroadcastVariable(info);
+                                       break;
+                               case AGGREGATE:
+                                       createAggregationOperation(info);
+                                       break;
+                               case DISTINCT:
+                                       createDistinctOperation(info);
+                                       break;
+                               case FIRST:
+                                       createFirstOperation(info);
+                                       break;
+                               case PARTITION_HASH:
+                                       createHashPartitionOperation(info);
+                                       break;
+                               case REBALANCE:
+                                       createRebalanceOperation(info);
+                                       break;
+                               case GROUPBY:
+                                       createGroupOperation(info);
+                                       break;
+                               case SORT:
+                                       createSortOperation(info);
+                                       break;
+                               case UNION:
+                                       createUnionOperation(info);
+                                       break;
+                               case COGROUP:
+                                       createCoGroupOperation(info);
+                                       break;
+                               case CROSS:
+                                       createCrossOperation(NONE, info);
+                                       break;
+                               case CROSS_H:
+                                       createCrossOperation(HUGE, info);
+                                       break;
+                               case CROSS_T:
+                                       createCrossOperation(TINY, info);
+                                       break;
+                               case FILTER:
+                                       createFilterOperation(info);
+                                       break;
+                               case FLATMAP:
+                                       createFlatMapOperation(info);
+                                       break;
+                               case GROUPREDUCE:
+                                       createGroupReduceOperation(info);
+                                       break;
+                               case JOIN:
+                                       createJoinOperation(NONE, info);
+                                       break;
+                               case JOIN_H:
+                                       createJoinOperation(HUGE, info);
+                                       break;
+                               case JOIN_T:
+                                       createJoinOperation(TINY, info);
+                                       break;
+                               case MAP:
+                                       createMapOperation(info);
+                                       break;
+                               case MAPPARTITION:
+                                       createMapPartitionOperation(info);
+                                       break;
+                               case REDUCE:
+                                       createReduceOperation(info);
+                                       break;
                        }
                }
        }
 
-       /**
-        * This method creates an OperationInfo object based on the 
operation-identifier passed.
-        *
-        * @param operationIdentifier
-        * @return
-        * @throws IOException
-        */
-       private PythonOperationInfo createOperationInfo(Operation 
operationIdentifier) throws IOException {
-               return new PythonOperationInfo(streamer, operationIdentifier);
-       }
-
        private void createCsvSource(PythonOperationInfo info) throws 
IOException {
                if (!(info.types instanceof TupleTypeInfo)) {
                        throw new RuntimeException("The output type of a csv 
source has to be a tuple. The derived type is " + info);
@@ -491,11 +475,6 @@ public class PythonPlanBinder {
                sets.put(info.setID, op1.partitionByHash(info.keys).map(new 
KeyDiscarder()).name("HashPartitionPostStep"));
        }
 
-       private void createProjectOperation(PythonOperationInfo info) throws 
IOException {
-               DataSet op1 = (DataSet) sets.get(info.parentID);
-               sets.put(info.setID, 
op1.project(info.fields).name("Projection"));
-       }
-
        private void createRebalanceOperation(PythonOperationInfo info) throws 
IOException {
                DataSet op = (DataSet) sets.get(info.parentID);
                sets.put(info.setID, op.rebalance().name("Rebalance"));

http://git-wip-us.apache.org/repos/asf/flink/blob/499b60fe/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java
index 6d2dcd1..a54b8dd 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java
@@ -49,7 +49,7 @@ public class PythonPlanReceiver implements Serializable {
 
        private Deserializer getDeserializer() throws IOException {
                byte type = (byte) input.readByte();
-               if (type > 0 && type < 26) {
+               if (type >= 0 && type < 26) {
                                Deserializer[] d = new Deserializer[type];
                                for (int x = 0; x < d.length; x++) {
                                        d[x] = getDeserializer();

http://git-wip-us.apache.org/repos/asf/flink/blob/499b60fe/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py
 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py
index 340497d..8d1934c 100644
--- 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py
+++ 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py
@@ -27,7 +27,7 @@ class GroupReduceFunction(Function.Function):
 
     def _configure(self, input_file, output_file, port, env, info):
         super(GroupReduceFunction, self)._configure(input_file, output_file, 
port, env, info)
-        if info.key1 is None:
+        if len(info.key1) == 0:
             self._run = self._run_all_group_reduce
         else:
             self._run = self._run_grouped_group_reduce
@@ -63,4 +63,4 @@ class GroupReduceFunction(Function.Function):
         pass
 
     def combine(self, iterator, collector):
-        self.reduce(iterator, collector)
\ No newline at end of file
+        self.reduce(iterator, collector)

http://git-wip-us.apache.org/repos/asf/flink/blob/499b60fe/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py
 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py
index 95e8b8a..b1d2201 100644
--- 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py
+++ 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py
@@ -26,7 +26,7 @@ class ReduceFunction(Function.Function):
 
     def _configure(self, input_file, output_file, port, env, info):
         super(ReduceFunction, self)._configure(input_file, output_file, port, 
env, info)
-        if info.key1 is None:
+        if len(info.key1) == 0:
             self._run = self._run_all_reduce
         else:
             self._run = self._run_grouped_reduce
@@ -64,4 +64,4 @@ class ReduceFunction(Function.Function):
         pass
 
     def combine(self, value1, value2):
-        return self.reduce(value1, value2)
\ No newline at end of file
+        return self.reduce(value1, value2)

http://git-wip-us.apache.org/repos/asf/flink/blob/499b60fe/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
index 3132651..5bb34e5 100644
--- 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
+++ 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
@@ -548,6 +548,7 @@ class OperatorSet(DataSet):
 
     def with_broadcast_set(self, name, set):
         child = OperationInfo()
+        child.identifier = _Identifier.BROADCAST
         child.parent = self._info
         child.other = set._info
         child.name = name

http://git-wip-us.apache.org/repos/asf/flink/blob/499b60fe/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
index 777f30b..b410ead 100644
--- 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
+++ 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
@@ -238,11 +238,7 @@ class Environment(object):
 
     def _send_plan(self):
         self._send_parameters()
-        self._collector.collect(len(self._sources) + len(self._sets) + 
len(self._sinks) + len(self._broadcast))
-        self._send_sources()
         self._send_operations()
-        self._send_sinks()
-        self._send_broadcast()
 
     def _send_parameters(self):
         collect = self._collector.collect
@@ -251,117 +247,40 @@ class Environment(object):
         collect(("mode", self._local_mode))
         collect(("retry", self._retry))
 
-    def _send_sources(self):
-        for source in self._sources:
-            identifier = source.identifier
-            collect = self._collector.collect
-            collect(identifier)
-            collect(source.id)
-            for case in Switch(identifier):
-                if case(_Identifier.SOURCE_CSV):
-                    collect(source.path)
-                    collect(source.delimiter_field)
-                    collect(source.delimiter_line)
-                    collect(source.types)
-                    break
-                if case(_Identifier.SOURCE_TEXT):
-                    collect(source.path)
-                    break
-                if case(_Identifier.SOURCE_VALUE):
-                    collect(len(source.values))
-                    for value in source.values:
-                        collect(value)
-                    break
-
     def _send_operations(self):
-        collect = self._collector.collect
+        self._collector.collect(len(self._sources) + len(self._sets) + 
len(self._sinks) + len(self._broadcast))
+        for source in self._sources:
+            self._send_operation(source)
         for set in self._sets:
-            identifier = set.identifier
-            collect(set.identifier)
-            collect(set.id)
-            collect(set.parent.id)
-            for case in Switch(identifier):
-                if case(_Identifier.REBALANCE):
-                    break
-                if case(_Identifier.DISTINCT, _Identifier.PARTITION_HASH):
-                    collect(set.keys)
-                    break
-                if case(_Identifier.FIRST):
-                    collect(set.count)
-                    break
-                if case(_Identifier.SORT):
-                    collect(set.field)
-                    collect(set.order)
-                    break
-                if case(_Identifier.GROUP):
-                    collect(set.keys)
-                    break
-                if case(_Identifier.COGROUP):
-                    collect(set.other.id)
-                    collect(set.key1)
-                    collect(set.key2)
-                    collect(set.types)
-                    collect(set.name)
-                    break
-                if case(_Identifier.CROSS, _Identifier.CROSSH, 
_Identifier.CROSST):
-                    collect(set.other.id)
-                    collect(set.uses_udf)
-                    collect(set.types)
-                    collect(set.name)
-                    break
-                if case(_Identifier.REDUCE, _Identifier.GROUPREDUCE):
-                    collect(set.types)
-                    collect(set.name)
-                    break
-                if case(_Identifier.JOIN, _Identifier.JOINH, 
_Identifier.JOINT):
-                    collect(set.key1)
-                    collect(set.key2)
-                    collect(set.other.id)
-                    collect(set.uses_udf)
-                    collect(set.types)
-                    collect(set.name)
-                    break
-                if case(_Identifier.MAP, _Identifier.MAPPARTITION, 
_Identifier.FLATMAP, _Identifier.FILTER):
-                    collect(set.types)
-                    collect(set.name)
-                    break
-                if case(_Identifier.UNION):
-                    collect(set.other.id)
-                    break
-                if case(_Identifier.PROJECTION):
-                    collect(set.keys)
-                    break
-                if case():
-                    raise KeyError("Environment._send_child_sets(): Invalid 
operation identifier: " + str(identifier))
-
-    def _send_sinks(self):
+            self._send_operation(set)
         for sink in self._sinks:
-            identifier = sink.identifier
-            collect = self._collector.collect
-            collect(identifier)
-            collect(sink.parent.id)
-            for case in Switch(identifier):
-                if case(_Identifier.SINK_CSV):
-                    collect(sink.path)
-                    collect(sink.delimiter_field)
-                    collect(sink.delimiter_line)
-                    collect(sink.write_mode)
-                    break;
-                if case(_Identifier.SINK_TEXT):
-                    collect(sink.path)
-                    collect(sink.write_mode)
-                    break
-                if case(_Identifier.SINK_PRINT):
-                    collect(sink.to_err)
-                    break
-
-    def _send_broadcast(self):
+            self._send_operation(sink)
+        for bcv in self._broadcast:
+            self._send_operation(bcv)
+
+    def _send_operation(self, set):
         collect = self._collector.collect
-        for entry in self._broadcast:
-            collect(_Identifier.BROADCAST)
-            collect(entry.parent.id)
-            collect(entry.other.id)
-            collect(entry.name)
+        collect(set.identifier)
+        collect(set.parent.id if set.parent is not None else -1)
+        collect(set.other.id if set.other is not None else -1)
+        collect(set.field)
+        collect(set.order)
+        collect(set.keys)
+        collect(set.key1)
+        collect(set.key2)
+        collect(set.types)
+        collect(set.uses_udf)
+        collect(set.name)
+        collect(set.delimiter_line)
+        collect(set.delimiter_field)
+        collect(set.write_mode)
+        collect(set.path)
+        collect(set.id)
+        collect(set.to_err)
+        collect(set.count)
+        collect(len(set.values))
+        for value in set.values:
+            collect(value)
 
     def _receive_result(self):
         jer = JobExecutionResult()

http://git-wip-us.apache.org/repos/asf/flink/blob/499b60fe/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py
 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py
index 6eb228c..bd7d2b5 100644
--- 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py
+++ 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py
@@ -21,33 +21,35 @@ from flink.plan.Constants import WriteMode
 class OperationInfo():
     def __init__(self, info=None):
         if info is None:
+            #fields being transferred to the java side
+            self.identifier = -1
             self.parent = None
             self.other = None
-            self.parent_set = None
-            self.other_set = None
-            self.identifier = None
-            self.field = None
-            self.order = None
-            self.keys = None
-            self.key1 = None
-            self.key2 = None
+            self.field = -1
+            self.order = 0
+            self.keys = ()
+            self.key1 = ()
+            self.key2 = ()
             self.types = None
-            self.operator = None
             self.uses_udf = False
             self.name = None
             self.delimiter_line = "\n"
             self.delimiter_field = ","
             self.write_mode = WriteMode.NO_OVERWRITE
-            self.sinks = []
-            self.children = []
-            self.path = None
+            self.path = ""
             self.count = 0
             self.values = []
             self.projections = []
-            self.bcvars = []
-            self.id = None
+            self.id = -1
             self.to_err = False
+            #internally used
+            self.parent_set = None
+            self.other_set = None
             self.chained_info = None
+            self.bcvars = []
+            self.sinks = []
+            self.children = []
+            self.operator = None
         else:
             self.__dict__.update(info.__dict__)
 

Reply via email to