[FLINK-377] Generic Interface

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/af9248c3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/af9248c3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/af9248c3

Branch: refs/heads/master
Commit: af9248c35a5a138d311073b54f6abd4260ab7fd9
Parents: 877043f
Author: zentol <s.mo...@web.de>
Authored: Tue Mar 3 20:50:41 2015 +0100
Committer: zentol <s.mo...@web.de>
Committed: Tue Apr 21 13:37:29 2015 +0200

----------------------------------------------------------------------
 .../operators/base/CoGroupRawOperatorBase.java  | 270 ++++++++
 flink-dist/pom.xml                              |   5 +
 .../api/java/operators/CoGroupRawOperator.java  | 118 ++++
 .../org/apache/flink/api/java/tuple/Tuple0.java |  39 ++
 .../flink/optimizer/costs/CostEstimator.java    |   1 +
 .../flink/optimizer/dag/CoGroupRawNode.java     |  82 +++
 .../operators/CoGroupRawDescriptor.java         | 171 +++++
 .../traversals/GraphCreatingVisitor.java        |   5 +
 .../runtime/operators/CoGroupRawDriver.java     | 150 +++++
 .../flink/runtime/operators/DriverStrategy.java |   3 +
 .../flink-language-binding-generic/pom.xml      |  61 ++
 .../api/java/common/OperationInfo.java          |  57 ++
 .../api/java/common/PlanBinder.java             | 656 +++++++++++++++++++
 .../api/java/common/streaming/Receiver.java     | 410 ++++++++++++
 .../api/java/common/streaming/Sender.java       | 415 ++++++++++++
 .../java/common/streaming/StreamPrinter.java    |  55 ++
 .../api/java/common/streaming/Streamer.java     | 276 ++++++++
 flink-staging/flink-language-binding/pom.xml    |  39 ++
 flink-staging/pom.xml                           |   1 +
 19 files changed, 2814 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/af9248c3/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupRawOperatorBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupRawOperatorBase.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupRawOperatorBase.java
new file mode 100644
index 0000000..2c81e02
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupRawOperatorBase.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.operators.base;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.CopyingListCollector;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.operators.BinaryOperatorInformation;
+import org.apache.flink.api.common.operators.DualInputOperator;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
+import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
+import org.apache.flink.api.common.operators.util.UserCodeWrapper;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.flink.api.common.ExecutionConfig;
+
+/**
+ * @see org.apache.flink.api.common.functions.CoGroupFunction
+ */
+public class CoGroupRawOperatorBase<IN1, IN2, OUT, FT extends 
CoGroupFunction<IN1, IN2, OUT>> extends DualInputOperator<IN1, IN2, OUT, FT> {
+
+       /**
+        * The ordering for the order inside a group from input one.
+        */
+       private Ordering groupOrder1;
+
+       /**
+        * The ordering for the order inside a group from input two.
+        */
+       private Ordering groupOrder2;
+
+       // 
--------------------------------------------------------------------------------------------
+       private boolean combinableFirst;
+
+       private boolean combinableSecond;
+
+       public CoGroupRawOperatorBase(UserCodeWrapper<FT> udf, 
BinaryOperatorInformation<IN1, IN2, OUT> operatorInfo, int[] keyPositions1, 
int[] keyPositions2, String name) {
+               super(udf, operatorInfo, keyPositions1, keyPositions2, name);
+               this.combinableFirst = false;
+               this.combinableSecond = false;
+       }
+
+       public CoGroupRawOperatorBase(FT udf, BinaryOperatorInformation<IN1, 
IN2, OUT> operatorInfo, int[] keyPositions1, int[] keyPositions2, String name) {
+               this(new UserCodeObjectWrapper<FT>(udf), operatorInfo, 
keyPositions1, keyPositions2, name);
+       }
+
+       public CoGroupRawOperatorBase(Class<? extends FT> udf, 
BinaryOperatorInformation<IN1, IN2, OUT> operatorInfo, int[] keyPositions1, 
int[] keyPositions2, String name) {
+               this(new UserCodeClassWrapper<FT>(udf), operatorInfo, 
keyPositions1, keyPositions2, name);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       /**
+        * Sets the order of the elements within a group for the given input.
+        *
+        * @param inputNum The number of the input (here either <i>0</i> or 
<i>1</i>).
+        * @param order    The order for the elements in a group.
+        */
+       public void setGroupOrder(int inputNum, Ordering order) {
+               if (inputNum == 0) {
+                       this.groupOrder1 = order;
+               } else if (inputNum == 1) {
+                       this.groupOrder2 = order;
+               } else {
+                       throw new IndexOutOfBoundsException();
+               }
+       }
+
+       /**
+        * Sets the order of the elements within a group for the first input.
+        *
+        * @param order The order for the elements in a group.
+        */
+       public void setGroupOrderForInputOne(Ordering order) {
+               setGroupOrder(0, order);
+       }
+
+       /**
+        * Sets the order of the elements within a group for the second input.
+        *
+        * @param order The order for the elements in a group.
+        */
+       public void setGroupOrderForInputTwo(Ordering order) {
+               setGroupOrder(1, order);
+       }
+
+       /**
+        * Gets the value order for an input, i.e. the order of elements within 
a group.
+        * If no such order has been set, this method returns null.
+        *
+        * @param inputNum The number of the input (here either <i>0</i> or 
<i>1</i>).
+        * @return The group order.
+        */
+       public Ordering getGroupOrder(int inputNum) {
+               if (inputNum == 0) {
+                       return this.groupOrder1;
+               } else if (inputNum == 1) {
+                       return this.groupOrder2;
+               } else {
+                       throw new IndexOutOfBoundsException();
+               }
+       }
+
+       /**
+        * Gets the order of elements within a group for the first input.
+        * If no such order has been set, this method returns null.
+        *
+        * @return The group order for the first input.
+        */
+       public Ordering getGroupOrderForInputOne() {
+               return getGroupOrder(0);
+       }
+
+       /**
+        * Gets the order of elements within a group for the second input.
+        * If no such order has been set, this method returns null.
+        *
+        * @return The group order for the second input.
+        */
+       public Ordering getGroupOrderForInputTwo() {
+               return getGroupOrder(1);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       public boolean isCombinableFirst() {
+               return this.combinableFirst;
+       }
+
+       public void setCombinableFirst(boolean combinableFirst) {
+               this.combinableFirst = combinableFirst;
+       }
+
+       public boolean isCombinableSecond() {
+               return this.combinableSecond;
+       }
+
+       public void setCombinableSecond(boolean combinableSecond) {
+               this.combinableSecond = combinableSecond;
+       }
+
+       // 
------------------------------------------------------------------------
+       @Override
+       protected List<OUT> executeOnCollections(List<IN1> input1, List<IN2> 
input2, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
+               // 
--------------------------------------------------------------------
+               // Setup
+               // 
--------------------------------------------------------------------
+               TypeInformation<IN1> inputType1 = 
getOperatorInfo().getFirstInputType();
+               TypeInformation<IN2> inputType2 = 
getOperatorInfo().getSecondInputType();
+
+               int[] inputKeys1 = getKeyColumns(0);
+               int[] inputKeys2 = getKeyColumns(1);
+
+               boolean[] inputSortDirections1 = new boolean[inputKeys1.length];
+               boolean[] inputSortDirections2 = new boolean[inputKeys2.length];
+
+               Arrays.fill(inputSortDirections1, true);
+               Arrays.fill(inputSortDirections2, true);
+
+               final TypeSerializer<IN1> inputSerializer1 = 
inputType1.createSerializer(executionConfig);
+               final TypeSerializer<IN2> inputSerializer2 = 
inputType2.createSerializer(executionConfig);
+
+               final TypeComparator<IN1> inputComparator1 = 
getTypeComparator(executionConfig, inputType1, inputKeys1, 
inputSortDirections1);
+               final TypeComparator<IN2> inputComparator2 = 
getTypeComparator(executionConfig, inputType2, inputKeys2, 
inputSortDirections2);
+
+               SimpleListIterable<IN1> iterator1 = new 
SimpleListIterable<IN1>(input1, inputComparator1, inputSerializer1);
+               SimpleListIterable<IN2> iterator2 = new 
SimpleListIterable<IN2>(input2, inputComparator2, inputSerializer2);
+
+               // 
--------------------------------------------------------------------
+               // Run UDF
+               // 
--------------------------------------------------------------------
+               CoGroupFunction<IN1, IN2, OUT> function = 
userFunction.getUserCodeObject();
+
+               FunctionUtils.setFunctionRuntimeContext(function, ctx);
+               FunctionUtils.openFunction(function, parameters);
+
+               List<OUT> result = new ArrayList<OUT>();
+               Collector<OUT> resultCollector = new 
CopyingListCollector<OUT>(result, 
getOperatorInfo().getOutputType().createSerializer(executionConfig));
+
+               function.coGroup(iterator1, iterator2, resultCollector);
+
+               FunctionUtils.closeFunction(function);
+
+               return result;
+       }
+
+       private <T> TypeComparator<T> getTypeComparator(ExecutionConfig 
executionConfig, TypeInformation<T> inputType, int[] inputKeys, boolean[] 
inputSortDirections) {
+               if (!(inputType instanceof CompositeType)) {
+                       throw new InvalidProgramException("Input types of 
coGroup must be composite types.");
+               }
+
+               return ((CompositeType<T>) 
inputType).createComparator(inputKeys, inputSortDirections, 0, executionConfig);
+       }
+
+       public static class SimpleListIterable<IN> implements Iterable<IN> {
+               private List<IN> values;
+               private TypeSerializer<IN> serializer;
+               private boolean copy;
+
+               public SimpleListIterable(List<IN> values, final 
TypeComparator<IN> comparator, TypeSerializer<IN> serializer) throws 
IOException {
+                       this.values = values;
+                       this.serializer = serializer;
+
+                       Collections.sort(values, new Comparator<IN>() {
+                               @Override
+                               public int compare(IN o1, IN o2) {
+                                       return comparator.compare(o1, o2);
+                               }
+                       });
+               }
+
+               @Override
+               public Iterator<IN> iterator() {
+                       return new SimpleListIterator<IN>(values, serializer);
+               }
+
+               protected class SimpleListIterator<IN> implements Iterator<IN> {
+                       private final List<IN> values;
+                       private final TypeSerializer<IN> serializer;
+                       private int pos = 0;
+
+                       public SimpleListIterator(List<IN> values, 
TypeSerializer<IN> serializer) {
+                               this.values = values;
+                               this.serializer = serializer;
+                       }
+
+                       @Override
+                       public boolean hasNext() {
+                               return pos < values.size();
+                       }
+
+                       @Override
+                       public IN next() {
+                               IN current = values.get(pos++);
+                               return serializer.copy(current);
+                       }
+
+                       @Override
+                       public void remove() { //unused
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af9248c3/flink-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index b826c45..8f20648 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -107,6 +107,11 @@ under the License.
                        <version>${project.version}</version>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-language-binding-generic</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
        </dependencies>
 
        <!-- See main pom.xml for explanation of profiles -->

http://git-wip-us.apache.org/repos/asf/flink/blob/af9248c3/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupRawOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupRawOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupRawOperator.java
new file mode 100644
index 0000000..38326bd
--- /dev/null
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupRawOperator.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.operators;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.operators.BinaryOperatorInformation;
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.base.CoGroupRawOperatorBase;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.Keys.IncompatibleKeysException;
+
+/**
+ * A {@link DataSet} that is the result of a CoGroup transformation. 
+ * 
+ * @param <I1> The type of the first input DataSet of the CoGroup 
transformation.
+ * @param <I2> The type of the second input DataSet of the CoGroup 
transformation.
+ * @param <OUT> The type of the result of the CoGroup transformation.
+ * 
+ * @see DataSet
+ */
+public class CoGroupRawOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, 
I2, OUT, CoGroupRawOperator<I1, I2, OUT>> {
+
+       private final CoGroupFunction<I1, I2, OUT> function;
+
+       private final Keys<I1> keys1;
+       private final Keys<I2> keys2;
+
+       private final String defaultName;
+
+       public CoGroupRawOperator(DataSet<I1> input1, DataSet<I2> input2,
+                       Keys<I1> keys1, Keys<I2> keys2,
+                       CoGroupFunction<I1, I2, OUT> function,
+                       TypeInformation<OUT> returnType,
+                       String defaultName) {
+               super(input1, input2, returnType);
+               this.function = function;
+               this.defaultName = defaultName;
+               this.name = defaultName;
+
+               if (keys1 == null || keys2 == null) {
+                       throw new NullPointerException();
+               }
+
+               this.keys1 = keys1;
+               this.keys2 = keys2;
+
+               extractSemanticAnnotationsFromUdf(function.getClass());
+       }
+
+       protected Keys<I1> getKeys1() {
+               return this.keys1;
+       }
+
+       protected Keys<I2> getKeys2() {
+               return this.keys2;
+       }
+
+       @Override
+       protected 
org.apache.flink.api.common.operators.base.CoGroupRawOperatorBase<?, ?, OUT, ?> 
translateToDataFlow(Operator<I1> input1, Operator<I2> input2) {
+               String name = getName() != null ? getName() : "CoGroup at " + 
defaultName;
+               try {
+                       keys1.areCompatible(keys2);
+               } catch (IncompatibleKeysException e) {
+                       throw new InvalidProgramException("The types of the key 
fields do not match.", e);
+               }
+
+               if (keys1 instanceof Keys.ExpressionKeys && keys2 instanceof 
Keys.ExpressionKeys) {
+                       try {
+                               keys1.areCompatible(keys2);
+                       } catch (IncompatibleKeysException e) {
+                               throw new InvalidProgramException("The types of 
the key fields do not match.", e);
+                       }
+
+                       int[] logicalKeyPositions1 = 
keys1.computeLogicalKeyPositions();
+                       int[] logicalKeyPositions2 = 
keys2.computeLogicalKeyPositions();
+
+                       CoGroupRawOperatorBase<I1, I2, OUT, CoGroupFunction<I1, 
I2, OUT>> po
+                                       = new CoGroupRawOperatorBase<I1, I2, 
OUT, CoGroupFunction<I1, I2, OUT>>(
+                                                       function, new 
BinaryOperatorInformation<I1, I2, OUT>(getInput1Type(), getInput2Type(), 
+                                                                       
getResultType()), logicalKeyPositions1, logicalKeyPositions2, name);
+
+                       // set inputs
+                       po.setFirstInput(input1);
+                       po.setSecondInput(input2);
+
+                       // set dop
+                       po.setDegreeOfParallelism(this.getParallelism());
+
+                       return po;
+
+               } else {
+                       throw new UnsupportedOperationException("Unrecognized 
or incompatible key types.");
+               }
+       }
+
+       @Override
+       protected Function getFunction() {
+               return function;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af9248c3/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple0.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple0.java 
b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple0.java
new file mode 100644
index 0000000..82209bf
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple0.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.tuple;
+
+public class Tuple0 extends Tuple {
+
+       private static final long serialVersionUID = 1L;
+
+       public Tuple0() {}
+
+       @Override
+       public <T> T getField(int pos) {
+               return null;
+       }
+
+       @Override
+       public <T> void setField(T value, int pos) {
+       }
+
+       @Override
+       public int getArity() {
+               return 0;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af9248c3/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java
index 7880734..3a02735 100644
--- 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java
@@ -180,6 +180,7 @@ public abstract class CostEstimator {
                        // this operations does not do any actual grouping, 
since every element is in the same single group
                        
                case CO_GROUP:
+               case CO_GROUP_RAW:
                case SORTED_GROUP_REDUCE:
                case SORTED_REDUCE:
                        // grouping or co-grouping over sorted streams for free

http://git-wip-us.apache.org/repos/asf/flink/blob/af9248c3/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CoGroupRawNode.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CoGroupRawNode.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CoGroupRawNode.java
new file mode 100644
index 0000000..971d244
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CoGroupRawNode.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.optimizer.dag;
+
+import java.util.Collections;
+import java.util.List;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.base.CoGroupRawOperatorBase;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.operators.CoGroupRawDescriptor;
+import org.apache.flink.optimizer.operators.OperatorDescriptorDual;
+
+/**
+ * The Optimizer representation of a <i>CoGroupRaw</i> operator.
+ */
+public class CoGroupRawNode extends TwoInputNode {
+       private List<OperatorDescriptorDual> dataProperties;
+
+       public CoGroupRawNode(CoGroupRawOperatorBase<?, ?, ?, ?> pactContract) {
+               super(pactContract);
+               this.dataProperties = initializeDataProperties();
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       /**
+        * Gets the operator for this CoGroup node.
+        * 
+        * @return The CoGroup operator.
+        */
+       @Override
+       public CoGroupRawOperatorBase<?, ?, ?, ?> getOperator() {
+               return (CoGroupRawOperatorBase<?, ?, ?, ?>) super.getOperator();
+       }
+
+       @Override
+       public String getName() {
+               return "CoGroup";
+       }
+
+       @Override
+       protected List<OperatorDescriptorDual> getPossibleProperties() {
+               return this.dataProperties;
+       }
+
+       @Override
+       protected void computeOperatorSpecificDefaultEstimates(DataStatistics 
statistics) {
+               // for CoGroup, we currently make no reasonable default 
estimates
+       }
+
+       private List<OperatorDescriptorDual> initializeDataProperties() {
+               Ordering groupOrder1 = null;
+               Ordering groupOrder2 = null;
+
+               CoGroupRawOperatorBase<?, ?, ?, ?> cgc = getOperator();
+               groupOrder1 = cgc.getGroupOrderForInputOne();
+               groupOrder2 = cgc.getGroupOrderForInputTwo();
+
+               if (groupOrder1 != null && groupOrder1.getNumberOfFields() == 
0) {
+                       groupOrder1 = null;
+               }
+               if (groupOrder2 != null && groupOrder2.getNumberOfFields() == 
0) {
+                       groupOrder2 = null;
+               }
+
+               return Collections.<OperatorDescriptorDual>singletonList(new 
CoGroupRawDescriptor(this.keys1, this.keys2, groupOrder1, groupOrder2));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af9248c3/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CoGroupRawDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CoGroupRawDescriptor.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CoGroupRawDescriptor.java
new file mode 100644
index 0000000..61561a4
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CoGroupRawDescriptor.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.optimizer.operators;
+
+import java.util.Collections;
+import java.util.List;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.dag.TwoInputNode;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
+import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.util.Utils;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+/**
+ * 
+ */
+public class CoGroupRawDescriptor extends OperatorDescriptorDual {
+
+       private final Ordering ordering1;               // ordering on the 
first input 
+       private final Ordering ordering2;               // ordering on the 
second input 
+
+       public CoGroupRawDescriptor(FieldList keys1, FieldList keys2) {
+               this(keys1, keys2, null, null);
+       }
+
+       public CoGroupRawDescriptor(FieldList keys1, FieldList keys2, Ordering 
additionalOrdering1, Ordering additionalOrdering2) {
+               super(keys1, keys2);
+
+               // if we have an additional ordering, construct the ordering to 
have primarily the grouping fields
+               if (additionalOrdering1 != null) {
+                       this.ordering1 = new Ordering();
+                       for (Integer key : this.keys1) {
+                               this.ordering1.appendOrdering(key, null, 
Order.ANY);
+                       }
+
+                       // and next the additional order fields
+                       for (int i = 0; i < 
additionalOrdering1.getNumberOfFields(); i++) {
+                               Integer field = 
additionalOrdering1.getFieldNumber(i);
+                               Order order = additionalOrdering1.getOrder(i);
+                               this.ordering1.appendOrdering(field, 
additionalOrdering1.getType(i), order);
+                       }
+               } else {
+                       this.ordering1 = Utils.createOrdering(this.keys1);
+               }
+
+               // if we have an additional ordering, construct the ordering to 
have primarily the grouping fields
+               if (additionalOrdering2 != null) {
+                       this.ordering2 = new Ordering();
+                       for (Integer key : this.keys2) {
+                               this.ordering2.appendOrdering(key, null, 
Order.ANY);
+                       }
+
+                       // and next the additional order fields
+                       for (int i = 0; i < 
additionalOrdering2.getNumberOfFields(); i++) {
+                               Integer field = 
additionalOrdering2.getFieldNumber(i);
+                               Order order = additionalOrdering2.getOrder(i);
+                               this.ordering2.appendOrdering(field, 
additionalOrdering2.getType(i), order);
+                       }
+               } else {
+                       this.ordering2 = Utils.createOrdering(this.keys2);
+               }
+       }
+
+       @Override
+       public DriverStrategy getStrategy() {
+               return DriverStrategy.CO_GROUP_RAW;
+       }
+
+       @Override
+       protected List<OperatorDescriptorDual.GlobalPropertiesPair> 
createPossibleGlobalProperties() {
+               RequestedGlobalProperties partitioned1 = new 
RequestedGlobalProperties();
+               partitioned1.setHashPartitioned(this.keys1);
+               RequestedGlobalProperties partitioned2 = new 
RequestedGlobalProperties();
+               partitioned2.setHashPartitioned(this.keys2);
+               return Collections.singletonList(new 
OperatorDescriptorDual.GlobalPropertiesPair(partitioned1, partitioned2));
+       }
+
+       @Override
+       protected List<OperatorDescriptorDual.LocalPropertiesPair> 
createPossibleLocalProperties() {
+               RequestedLocalProperties sort1 = new 
RequestedLocalProperties(this.ordering1);
+               RequestedLocalProperties sort2 = new 
RequestedLocalProperties(this.ordering2);
+               return Collections.singletonList(new 
OperatorDescriptorDual.LocalPropertiesPair(sort1, sort2));
+       }
+
+       @Override
+       public boolean areCoFulfilled(RequestedLocalProperties requested1, 
RequestedLocalProperties requested2,
+                       LocalProperties produced1, LocalProperties produced2) {
+               int numRelevantFields = this.keys1.size();
+
+               Ordering prod1 = produced1.getOrdering();
+               Ordering prod2 = produced2.getOrdering();
+
+               if (prod1 == null || prod2 == null || prod1.getNumberOfFields() 
< numRelevantFields
+                               || prod2.getNumberOfFields() < 
prod2.getNumberOfFields()) {
+                       throw new CompilerException("The given properties do 
not meet this operators requirements.");
+               }
+
+               for (int i = 0; i < numRelevantFields; i++) {
+                       if (prod1.getOrder(i) != prod2.getOrder(i)) {
+                               return false;
+                       }
+               }
+               return true;
+       }
+
+       @Override
+       public DualInputPlanNode instantiate(Channel in1, Channel in2, 
TwoInputNode node) {
+               boolean[] inputOrders = in1.getLocalProperties().getOrdering() 
== null ? null : 
in1.getLocalProperties().getOrdering().getFieldSortDirections();
+
+               if (inputOrders == null || inputOrders.length < 
this.keys1.size()) {
+                       throw new CompilerException("BUG: The input strategy 
does not sufficiently describe the sort orders for a CoGroup operator.");
+               } else if (inputOrders.length > this.keys1.size()) {
+                       boolean[] tmp = new boolean[this.keys1.size()];
+                       System.arraycopy(inputOrders, 0, tmp, 0, tmp.length);
+                       inputOrders = tmp;
+               }
+
+               return new DualInputPlanNode(node, "CoGroup (" + 
node.getOperator().getName() + ")", in1, in2,
+                               DriverStrategy.CO_GROUP_RAW, this.keys1, 
this.keys2, inputOrders);
+       }
+
+       @Override
+       public GlobalProperties computeGlobalProperties(GlobalProperties in1, 
GlobalProperties in2) {
+               GlobalProperties gp = GlobalProperties.combine(in1, in2);
+               if (gp.getUniqueFieldCombination() != null && 
gp.getUniqueFieldCombination().size() > 0
+                               && gp.getPartitioning() == 
PartitioningProperty.RANDOM_PARTITIONED) {
+                       
gp.setAnyPartitioning(gp.getUniqueFieldCombination().iterator().next().toFieldList());
+               }
+               gp.clearUniqueFieldCombinations();
+               return gp;
+       }
+
+       @Override
+       public LocalProperties computeLocalProperties(LocalProperties in1, 
LocalProperties in2) {
+               LocalProperties comb = LocalProperties.combine(in1, in2);
+               return comb.clearUniqueFieldSets();
+       }
+       
+       @Override
+       public boolean areCompatible(RequestedGlobalProperties requested1, 
RequestedGlobalProperties requested2,
+                       GlobalProperties produced1, GlobalProperties produced2)
+       {
+               return produced1.getPartitioning() == 
produced2.getPartitioning() && 
+                               (produced1.getCustomPartitioner() == null ? 
+                                       produced2.getCustomPartitioner() == 
null :
+                                       
produced1.getCustomPartitioner().equals(produced2.getCustomPartitioner()));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af9248c3/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java
index 37cffce..7fbdf81 100644
--- 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java
@@ -69,6 +69,8 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.flink.api.common.operators.base.CoGroupRawOperatorBase;
+import org.apache.flink.optimizer.dag.CoGroupRawNode;
 
 /**
  * This traversal creates the optimizer DAG from a program.
@@ -164,6 +166,9 @@ public class GraphCreatingVisitor implements 
Visitor<Operator<?>> {
                else if (c instanceof CoGroupOperatorBase) {
                        n = new CoGroupNode((CoGroupOperatorBase<?, ?, ?, ?>) 
c);
                }
+               else if (c instanceof CoGroupRawOperatorBase) {
+                       n = new CoGroupRawNode((CoGroupRawOperatorBase<?, ?, ?, 
?>) c);
+               }
                else if (c instanceof CrossOperatorBase) {
                        n = new CrossNode((CrossOperatorBase<?, ?, ?, ?>) c);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/af9248c3/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupRawDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupRawDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupRawDriver.java
new file mode 100644
index 0000000..7abad5d
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupRawDriver.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.operators;
+
+import java.io.IOException;
+import java.util.Iterator;
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CoGroupRawDriver<IT1, IT2, OT> implements 
PactDriver<CoGroupFunction<IT1, IT2, OT>, OT> {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(CoGroupRawDriver.class);
+
+       private PactTaskContext<CoGroupFunction<IT1, IT2, OT>, OT> taskContext;
+
+       private SimpleIterable<IT1> coGroupIterator1;
+       private SimpleIterable<IT2> coGroupIterator2;
+
+       @Override
+       public void setup(PactTaskContext<CoGroupFunction<IT1, IT2, OT>, OT> 
context) {
+               this.taskContext = context;
+       }
+
+       @Override
+       public int getNumberOfInputs() {
+               return 2;
+       }
+
+       @Override
+       public int getNumberOfDriverComparators() {
+               return 0;
+       }
+
+       @Override
+       public Class<CoGroupFunction<IT1, IT2, OT>> getStubType() {
+               @SuppressWarnings("unchecked")
+               final Class<CoGroupFunction<IT1, IT2, OT>> clazz = 
(Class<CoGroupFunction<IT1, IT2, OT>>) (Class<?>) CoGroupFunction.class;
+               return clazz;
+       }
+
+       @Override
+       public void prepare() throws Exception {
+               final TaskConfig config = this.taskContext.getTaskConfig();
+               if (config.getDriverStrategy() != DriverStrategy.CO_GROUP_RAW) {
+                       throw new Exception("Unrecognized driver strategy for 
CoGoup Python driver: " + config.getDriverStrategy().name());
+               }
+
+               final MutableObjectIterator<IT1> in1 = 
this.taskContext.getInput(0);
+               final MutableObjectIterator<IT2> in2 = 
this.taskContext.getInput(1);
+
+               IT1 reuse1 = 
this.taskContext.<IT1>getInputSerializer(0).getSerializer().createInstance();
+               IT2 reuse2 = 
this.taskContext.<IT2>getInputSerializer(1).getSerializer().createInstance();
+
+               this.coGroupIterator1 = new SimpleIterable<IT1>(reuse1, in1);
+               this.coGroupIterator2 = new SimpleIterable<IT2>(reuse2, in2);
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug(this.taskContext.formatLogString("CoGroup 
task iterator ready."));
+               }
+       }
+
+       @Override
+       public void run() throws Exception {
+               final CoGroupFunction<IT1, IT2, OT> coGroupStub = 
this.taskContext.getStub();
+               final Collector<OT> collector = 
this.taskContext.getOutputCollector();
+               final SimpleIterable<IT1> i1 = this.coGroupIterator1;
+               final SimpleIterable<IT2> i2 = this.coGroupIterator2;
+
+               coGroupStub.coGroup(i1, i2, collector);
+       }
+
+       @Override
+       public void cleanup() throws Exception {
+       }
+
+       @Override
+       public void cancel() throws Exception {
+               cleanup();
+       }
+
+       public static class SimpleIterable<IN> implements Iterable<IN> {
+               private IN reuse;
+               private final MutableObjectIterator<IN> iterator;
+
+               public SimpleIterable(IN reuse, MutableObjectIterator<IN> 
iterator) throws IOException {
+                       this.iterator = iterator;
+                       this.reuse = reuse;
+               }
+
+               @Override
+               public Iterator<IN> iterator() {
+                       return new SimpleIterator<IN>(reuse, iterator);
+               }
+
+               protected class SimpleIterator<IN> implements Iterator<IN> {
+                       private IN reuse;
+                       private final MutableObjectIterator<IN> iterator;
+                       private boolean consumed = true;
+
+                       public SimpleIterator(IN reuse, 
MutableObjectIterator<IN> iterator) {
+                               this.iterator = iterator;
+                               this.reuse = reuse;
+                       }
+
+                       @Override
+                       public boolean hasNext() {
+                               try {
+                                       if (!consumed) {
+                                               return true;
+                                       }
+                                       IN result = iterator.next(reuse);
+                                       consumed = result == null;
+                                       return !consumed;
+                               } catch (IOException ioex) {
+                                       throw new RuntimeException("An error 
occurred while reading the next record: "
+                                                       + ioex.getMessage(), 
ioex);
+                               }
+                       }
+
+                       @Override
+                       public IN next() {
+                               consumed = true;
+                               return reuse;
+                       }
+
+                       @Override
+                       public void remove() { //unused
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af9248c3/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
index d5b131e..7942b3b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
@@ -75,6 +75,9 @@ public enum DriverStrategy {
 
        // co-grouping inputs
        CO_GROUP(CoGroupDriver.class, null, PIPELINED, PIPELINED, 2),
+       // python-cogroup
+       CO_GROUP_RAW(CoGroupRawDriver.class, null, PIPELINED, PIPELINED, 0),
+       
        
        // the first input is build side, the second side is probe side of a 
hybrid hash table
        HYBRIDHASH_BUILD_FIRST(MatchDriver.class, null, FULL_DAM, 
MATERIALIZING, 2),

http://git-wip-us.apache.org/repos/asf/flink/blob/af9248c3/flink-staging/flink-language-binding/flink-language-binding-generic/pom.xml
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-language-binding-generic/pom.xml 
b/flink-staging/flink-language-binding/flink-language-binding-generic/pom.xml
new file mode 100644
index 0000000..a37f82a
--- /dev/null
+++ 
b/flink-staging/flink-language-binding/flink-language-binding-generic/pom.xml
@@ -0,0 +1,61 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>flink-language-binding-parent</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>0.9-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+       
+    <artifactId>flink-language-binding-generic</artifactId>
+    <name>flink-language-binding-generic</name>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-java</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-compiler</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-runtime</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-clients</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/af9248c3/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/OperationInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/OperationInfo.java
 
b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/OperationInfo.java
new file mode 100644
index 0000000..bd059b3
--- /dev/null
+++ 
b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/OperationInfo.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.languagebinding.api.java.common;
+
+import java.util.Arrays;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+/**
+ * Container for all generic information related to operations. This class 
contains the absolute minimum fields that are
+ * required for all operations. This class should be extended to contain any 
additional fields required on a
+ * per-language basis.
+ */
+public abstract class OperationInfo {
+       public int parentID; //DataSet that an operation is applied on
+       public int otherID; //secondary DataSet
+       public int setID; //ID for new DataSet
+       public int[] keys1; //join/cogroup keys
+       public int[] keys2; //join/cogroup keys
+       public TypeInformation<?> types; //typeinformation about output type
+       public ProjectionEntry[] projections; //projectFirst/projectSecond
+
+       public class ProjectionEntry {
+               public ProjectionSide side;
+               public int[] keys;
+
+               public ProjectionEntry(ProjectionSide side, int[] keys) {
+                       this.side = side;
+                       this.keys = keys;
+               }
+
+               @Override
+               public String toString() {
+                       return side + " - " + Arrays.toString(keys);
+               }
+       }
+
+       public enum ProjectionSide {
+               FIRST,
+               SECOND
+       }
+
+       public enum DatasizeHint {
+               NONE,
+               TINY,
+               HUGE
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af9248c3/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java
 
b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java
new file mode 100644
index 0000000..f701ab7
--- /dev/null
+++ 
b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java
@@ -0,0 +1,656 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.languagebinding.api.java.common;
+
+import java.io.IOException;
+import java.util.HashMap;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.java.io.CsvInputFormat;
+import org.apache.flink.api.java.operators.AggregateOperator;
+import org.apache.flink.api.java.operators.CrossOperator.DefaultCross;
+import org.apache.flink.api.java.operators.CrossOperator.ProjectCross;
+import org.apache.flink.api.java.operators.Grouping;
+import org.apache.flink.api.java.operators.JoinOperator.DefaultJoin;
+import org.apache.flink.api.java.operators.JoinOperator.ProjectJoin;
+import org.apache.flink.api.java.operators.SortedGrouping;
+import org.apache.flink.api.java.operators.UdfOperator;
+import org.apache.flink.api.java.operators.UnsortedGrouping;
+import org.apache.flink.api.java.tuple.Tuple;
+import static org.apache.flink.api.java.typeutils.TypeExtractor.getForObject;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.core.fs.Path;
+import 
org.apache.flink.languagebinding.api.java.common.OperationInfo.DatasizeHint;
+import static 
org.apache.flink.languagebinding.api.java.common.OperationInfo.DatasizeHint.HUGE;
+import static 
org.apache.flink.languagebinding.api.java.common.OperationInfo.DatasizeHint.NONE;
+import static 
org.apache.flink.languagebinding.api.java.common.OperationInfo.DatasizeHint.TINY;
+import 
org.apache.flink.languagebinding.api.java.common.OperationInfo.ProjectionEntry;
+import org.apache.flink.languagebinding.api.java.common.streaming.Receiver;
+
+/**
+ * Generic class to construct a Flink plan based on external data.
+ *
+ * @param <INFO>
+ */
+public abstract class PlanBinder<INFO extends OperationInfo> {
+       public static final String PLANBINDER_CONFIG_BCVAR_COUNT = 
"PLANBINDER_BCVAR_COUNT";
+       public static final String PLANBINDER_CONFIG_BCVAR_NAME_PREFIX = 
"PLANBINDER_BCVAR_";
+
+       protected static String FLINK_HDFS_PATH = "hdfs:/tmp";
+       public static final String FLINK_TMP_DATA_DIR = 
System.getProperty("java.io.tmpdir") + "/flink_data";
+
+       public static boolean DEBUG = false;
+
+       public static void setLocalMode() {
+               FLINK_HDFS_PATH = System.getProperty("java.io.tmpdir") + 
"/flink";
+       }
+
+       protected HashMap<Integer, Object> sets;
+       public static ExecutionEnvironment env;
+       protected Receiver receiver;
+
+       public static final int MAPPED_FILE_SIZE = 1024 * 1024 * 64;
+
+       
//====Plan==========================================================================================================
+       protected void receivePlan() throws IOException {
+               receiveParameters();
+               receiveOperations();
+       }
+
+       
//====Environment===================================================================================================
+       /**
+        * This enum contains the identifiers for all supported environment 
parameters.
+        */
+       private enum Parameters {
+               DOP,
+               MODE,
+               RETRY,
+               DEBUG
+       }
+
+       private void receiveParameters() throws IOException {
+               Integer parameterCount = (Integer) receiver.getRecord(true);
+
+               for (int x = 0; x < parameterCount; x++) {
+                       Tuple value = (Tuple) receiver.getRecord(true);
+                       switch (Parameters.valueOf(((String) 
value.getField(0)).toUpperCase())) {
+                               case DOP:
+                                       Integer dop = (Integer) 
value.getField(1);
+                                       env.setDegreeOfParallelism(dop);
+                                       break;
+                               case MODE:
+                                       FLINK_HDFS_PATH = (Boolean) 
value.getField(1) ? "file:/tmp/flink" : "hdfs:/tmp/flink";
+                                       break;
+                               case RETRY:
+                                       int retry = (Integer) value.getField(1);
+                                       env.setNumberOfExecutionRetries(retry);
+                                       break;
+                               case DEBUG:
+                                       DEBUG = (Boolean) value.getField(1);
+                                       break;
+                       }
+               }
+               if (env.getDegreeOfParallelism() < 0) {
+                       env.setDegreeOfParallelism(1);
+               }
+       }
+
+       
//====Operations====================================================================================================
+       /**
+        * This enum contains the identifiers for all supported non-UDF DataSet 
operations.
+        */
+       private enum Operation {
+               SOURCE_CSV, SOURCE_TEXT, SOURCE_VALUE, SOURCE_SEQ, SINK_CSV, 
SINK_TEXT, SINK_PRINT,
+               PROJECTION, SORT, UNION, FIRST, DISTINCT, GROUPBY, AGGREGATE,
+               REBALANCE, PARTITION_HASH,
+               BROADCAST
+       }
+
+       /**
+        * This enum contains the identifiers for all supported UDF DataSet 
operations.
+        */
+       protected enum AbstractOperation {
+               COGROUP, CROSS, CROSS_H, CROSS_T, FILTER, FLATMAP, GROUPREDUCE, 
JOIN, JOIN_H, JOIN_T, MAP, REDUCE, MAPPARTITION,
+       }
+
+       protected void receiveOperations() throws IOException {
+               Integer operationCount = (Integer) receiver.getRecord(true);
+               for (int x = 0; x < operationCount; x++) {
+                       String identifier = (String) receiver.getRecord();
+                       Operation op = null;
+                       AbstractOperation aop = null;
+                       try {
+                               op = 
Operation.valueOf(identifier.toUpperCase());
+                       } catch (IllegalArgumentException iae) {
+                               try {
+                                       aop = 
AbstractOperation.valueOf(identifier.toUpperCase());
+                               } catch (IllegalArgumentException iae2) {
+                                       throw new 
IllegalArgumentException("Invalid operation specified: " + identifier);
+                               }
+                       }
+                       if (op != null) {
+                               switch (op) {
+                                       case SOURCE_CSV:
+                                               createCsvSource();
+                                               break;
+                                       case SOURCE_TEXT:
+                                               createTextSource();
+                                               break;
+                                       case SOURCE_VALUE:
+                                               createValueSource();
+                                               break;
+                                       case SOURCE_SEQ:
+                                               createSequenceSource();
+                                               break;
+                                       case SINK_CSV:
+                                               createCsvSink();
+                                               break;
+                                       case SINK_TEXT:
+                                               createTextSink();
+                                               break;
+                                       case SINK_PRINT:
+                                               createPrintSink();
+                                               break;
+                                       case BROADCAST:
+                                               createBroadcastVariable();
+                                               break;
+                                       case AGGREGATE:
+                                               createAggregationOperation();
+                                               break;
+                                       case DISTINCT:
+                                               createDistinctOperation();
+                                               break;
+                                       case FIRST:
+                                               createFirstOperation();
+                                               break;
+                                       case PARTITION_HASH:
+                                               createHashPartitionOperation();
+                                               break;
+                                       case PROJECTION:
+                                               createProjectOperation();
+                                               break;
+                                       case REBALANCE:
+                                               createRebalanceOperation();
+                                               break;
+                                       case GROUPBY:
+                                               createGroupOperation();
+                                               break;
+                                       case SORT:
+                                               createSortOperation();
+                                               break;
+                                       case UNION:
+                                               createUnionOperation();
+                                               break;
+                               }
+                       }
+                       if (aop != null) {
+                               switch (aop) {
+                                       case COGROUP:
+                                               
createCoGroupOperation(createOperationInfo(aop));
+                                               break;
+                                       case CROSS:
+                                               createCrossOperation(NONE, 
createOperationInfo(aop));
+                                               break;
+                                       case CROSS_H:
+                                               createCrossOperation(HUGE, 
createOperationInfo(aop));
+                                               break;
+                                       case CROSS_T:
+                                               createCrossOperation(TINY, 
createOperationInfo(aop));
+                                               break;
+                                       case FILTER:
+                                               
createFilterOperation(createOperationInfo(aop));
+                                               break;
+                                       case FLATMAP:
+                                               
createFlatMapOperation(createOperationInfo(aop));
+                                               break;
+                                       case GROUPREDUCE:
+                                               
createGroupReduceOperation(createOperationInfo(aop));
+                                               break;
+                                       case JOIN:
+                                               createJoinOperation(NONE, 
createOperationInfo(aop));
+                                               break;
+                                       case JOIN_H:
+                                               createJoinOperation(HUGE, 
createOperationInfo(aop));
+                                               break;
+                                       case JOIN_T:
+                                               createJoinOperation(TINY, 
createOperationInfo(aop));
+                                               break;
+                                       case MAP:
+                                               
createMapOperation(createOperationInfo(aop));
+                                               break;
+                                       case MAPPARTITION:
+                                               
createMapPartitionOperation(createOperationInfo(aop));
+                                               break;
+                                       case REDUCE:
+                                               
createReduceOperation(createOperationInfo(aop));
+                                               break;
+                               }
+                       }
+               }
+       }
+
+       private void createCsvSource() throws IOException {
+               int id = (Integer) receiver.getRecord(true);
+               String path = (String) receiver.getRecord();
+               String fieldDelimiter = (String) receiver.getRecord();
+               String lineDelimiter = (String) receiver.getRecord();
+               Tuple types = (Tuple) receiver.getRecord();
+               sets.put(id, env.createInput(new CsvInputFormat(new Path(path), 
lineDelimiter, fieldDelimiter, getForObject(types)), 
getForObject(types)).name("CsvSource"));
+       }
+
+       private void createTextSource() throws IOException {
+               int id = (Integer) receiver.getRecord(true);
+               String path = (String) receiver.getRecord();
+               sets.put(id, env.readTextFile(path).name("TextSource"));
+       }
+
+       private void createValueSource() throws IOException {
+               int id = (Integer) receiver.getRecord(true);
+               int valueCount = (Integer) receiver.getRecord(true);
+               Object[] values = new Object[valueCount];
+               for (int x = 0; x < valueCount; x++) {
+                       values[x] = receiver.getRecord();
+               }
+               sets.put(id, env.fromElements(values).name("ValueSource"));
+       }
+
+       private void createSequenceSource() throws IOException {
+               int id = (Integer) receiver.getRecord(true);
+               long from = (Long) receiver.getRecord();
+               long to = (Long) receiver.getRecord();
+               sets.put(id, env.generateSequence(from, 
to).name("SequenceSource"));
+       }
+
+       private void createCsvSink() throws IOException {
+               int parentID = (Integer) receiver.getRecord(true);
+               String path = (String) receiver.getRecord();
+               String fieldDelimiter = (String) receiver.getRecord();
+               String lineDelimiter = (String) receiver.getRecord();
+               WriteMode writeMode = ((Integer) receiver.getRecord(true)) == 1
+                               ? WriteMode.OVERWRITE
+                               : WriteMode.NO_OVERWRITE;
+               DataSet parent = (DataSet) sets.get(parentID);
+               parent.writeAsCsv(path, lineDelimiter, fieldDelimiter, 
writeMode).name("CsvSink");
+       }
+
+       private void createTextSink() throws IOException {
+               int parentID = (Integer) receiver.getRecord(true);
+               String path = (String) receiver.getRecord();
+               WriteMode writeMode = ((Integer) receiver.getRecord(true)) == 1
+                               ? WriteMode.OVERWRITE
+                               : WriteMode.NO_OVERWRITE;
+               DataSet parent = (DataSet) sets.get(parentID);
+               parent.writeAsText(path, writeMode).name("TextSink");
+       }
+
+       private void createPrintSink() throws IOException {
+               int parentID = (Integer) receiver.getRecord(true);
+               DataSet parent = (DataSet) sets.get(parentID);
+               boolean toError = (Boolean) receiver.getRecord();
+               (toError ? parent.printToErr() : 
parent.print()).name("PrintSink");
+       }
+
+       private void createBroadcastVariable() throws IOException {
+               int parentID = (Integer) receiver.getRecord(true);
+               int otherID = (Integer) receiver.getRecord(true);
+               String name = (String) receiver.getRecord();
+               UdfOperator op1 = (UdfOperator) sets.get(parentID);
+               DataSet op2 = (DataSet) sets.get(otherID);
+
+               op1.withBroadcastSet(op2, name);
+               Configuration c = ((UdfOperator) op1).getParameters();
+
+               if (c == null) {
+                       c = new Configuration();
+               }
+
+               int count = c.getInteger(PLANBINDER_CONFIG_BCVAR_COUNT, 0);
+               c.setInteger(PLANBINDER_CONFIG_BCVAR_COUNT, count + 1);
+               c.setString(PLANBINDER_CONFIG_BCVAR_NAME_PREFIX + count, name);
+
+               op1.withParameters(c);
+       }
+
+       /**
+        * This method creates an OperationInfo object based on the 
operation-identifier passed.
+        *
+        * @param operationIdentifier
+        * @return
+        * @throws IOException
+        */
+       protected abstract INFO createOperationInfo(AbstractOperation 
operationIdentifier) throws IOException;
+
+       private void createAggregationOperation() throws IOException {
+               int setID = (Integer) receiver.getRecord(true);
+               int parentID = (Integer) receiver.getRecord(true);
+               int count = (Integer) receiver.getRecord(true);
+
+               int encodedAgg = (Integer) receiver.getRecord(true);
+               int field = (Integer) receiver.getRecord(true);
+
+               Aggregations agg = null;
+               switch (encodedAgg) {
+                       case 0:
+                               agg = Aggregations.MAX;
+                               break;
+                       case 1:
+                               agg = Aggregations.MIN;
+                               break;
+                       case 2:
+                               agg = Aggregations.SUM;
+                               break;
+               }
+               DataSet op = (DataSet) sets.get(parentID);
+               AggregateOperator ao = op.aggregate(agg, field);
+
+               for (int x = 1; x < count; x++) {
+                       encodedAgg = (Integer) receiver.getRecord(true);
+                       field = (Integer) receiver.getRecord(true);
+                       switch (encodedAgg) {
+                               case 0:
+                                       ao = ao.andMax(field);
+                                       break;
+                               case 1:
+                                       ao = ao.andMin(field);
+                                       break;
+                               case 2:
+                                       ao = ao.andSum(field);
+                                       break;
+                       }
+               }
+
+               sets.put(setID, ao.name("Aggregation"));
+       }
+
+       private void createCoGroupOperation(INFO info) {
+               DataSet op1 = (DataSet) sets.get(info.parentID);
+               DataSet op2 = (DataSet) sets.get(info.otherID);
+               sets.put(info.setID, applyCoGroupOperation(op1, op2, 
info.keys1, info.keys2, info));
+       }
+
+       protected abstract DataSet applyCoGroupOperation(DataSet op1, DataSet 
op2, int[] firstKeys, int[] secondKeys, INFO info);
+
+       private void createCrossOperation(DatasizeHint mode, INFO info) {
+               DataSet op1 = (DataSet) sets.get(info.parentID);
+               DataSet op2 = (DataSet) sets.get(info.otherID);
+
+               if (info.types != null && (info.projections == null || 
info.projections.length == 0)) {
+                       sets.put(info.setID, applyCrossOperation(op1, op2, 
mode, info));
+               } else {
+                       DefaultCross defaultResult;
+                       switch (mode) {
+                               case NONE:
+                                       defaultResult = op1.cross(op2);
+                                       break;
+                               case HUGE:
+                                       defaultResult = op1.crossWithHuge(op2);
+                                       break;
+                               case TINY:
+                                       defaultResult = op1.crossWithTiny(op2);
+                                       break;
+                               default:
+                                       throw new 
IllegalArgumentException("Invalid Cross mode specified: " + mode);
+                       }
+                       if (info.projections.length == 0) {
+                               sets.put(info.setID, 
defaultResult.name("DefaultCross"));
+                       } else {
+                               ProjectCross project = null;
+                               for (ProjectionEntry pe : info.projections) {
+                                       switch (pe.side) {
+                                               case FIRST:
+                                                       project = project == 
null ? defaultResult.projectFirst(pe.keys) : project.projectFirst(pe.keys);
+                                                       break;
+                                               case SECOND:
+                                                       project = project == 
null ? defaultResult.projectSecond(pe.keys) : project.projectSecond(pe.keys);
+                                                       break;
+                                       }
+                               }
+                               sets.put(info.setID, 
project.name("ProjectCross"));
+                       }
+               }
+       }
+
+       protected abstract DataSet applyCrossOperation(DataSet op1, DataSet 
op2, DatasizeHint mode, INFO info);
+
+       private void createDistinctOperation() throws IOException {
+               int setID = (Integer) receiver.getRecord(true);
+               int parentID = (Integer) receiver.getRecord(true);
+               Object keysArrayOrTuple = receiver.getRecord(true);
+               int[] keys;
+               if (keysArrayOrTuple instanceof Tuple) {
+                       keys = tupleToIntArray((Tuple) keysArrayOrTuple);
+               } else {
+                       keys = (int[]) keysArrayOrTuple;
+               }
+               DataSet op = (DataSet) sets.get(parentID);
+               sets.put(setID, (keys.length == 0 ? op.distinct() : 
op.distinct(keys)).name("Distinct"));
+       }
+
+       private void createFilterOperation(INFO info) {
+               DataSet op1 = (DataSet) sets.get(info.parentID);
+               sets.put(info.setID, applyFilterOperation(op1, info));
+       }
+
+       protected abstract DataSet applyFilterOperation(DataSet op1, INFO info);
+
+       private void createFlatMapOperation(INFO info) {
+               DataSet op1 = (DataSet) sets.get(info.parentID);
+               sets.put(info.setID, applyFlatMapOperation(op1, info));
+       }
+
+       protected abstract DataSet applyFlatMapOperation(DataSet op1, INFO 
info);
+
+       private void createFirstOperation() throws IOException {
+               int setID = (Integer) receiver.getRecord(true);
+               int parentID = (Integer) receiver.getRecord(true);
+               int count = (Integer) receiver.getRecord(true);
+               DataSet op = (DataSet) sets.get(parentID);
+               sets.put(setID, op.first(count).name("First"));
+       }
+
+       private void createGroupOperation() throws IOException {
+               int setID = (Integer) receiver.getRecord(true);
+               int parentID = (Integer) receiver.getRecord(true);
+               Object keysArrayOrTuple = receiver.getRecord(true);
+               int[] keys;
+               if (keysArrayOrTuple instanceof Tuple) {
+                       keys = tupleToIntArray((Tuple) keysArrayOrTuple);
+               } else {
+                       keys = (int[]) keysArrayOrTuple;
+               }
+               DataSet op1 = (DataSet) sets.get(parentID);
+               sets.put(setID, op1.groupBy(keys));
+       }
+
+       private void createGroupReduceOperation(INFO info) {
+               Object op1 = sets.get(info.parentID);
+               if (op1 instanceof DataSet) {
+                       sets.put(info.setID, 
applyGroupReduceOperation((DataSet) op1, info));
+                       return;
+               }
+               if (op1 instanceof UnsortedGrouping) {
+                       sets.put(info.setID, 
applyGroupReduceOperation((UnsortedGrouping) op1, info));
+                       return;
+               }
+               if (op1 instanceof SortedGrouping) {
+                       sets.put(info.setID, 
applyGroupReduceOperation((SortedGrouping) op1, info));
+               }
+       }
+
+       protected abstract DataSet applyGroupReduceOperation(DataSet op1, INFO 
info);
+
+       protected abstract DataSet applyGroupReduceOperation(UnsortedGrouping 
op1, INFO info);
+
+       protected abstract DataSet applyGroupReduceOperation(SortedGrouping 
op1, INFO info);
+
+       private void createHashPartitionOperation() throws IOException {
+               int setID = (Integer) receiver.getRecord(true);
+               int parentID = (Integer) receiver.getRecord(true);
+               Object keysArrayOrTuple = receiver.getRecord(true);
+               int[] keys;
+               if (keysArrayOrTuple instanceof Tuple) {
+                       keys = tupleToIntArray((Tuple) keysArrayOrTuple);
+               } else {
+                       keys = (int[]) keysArrayOrTuple;
+               }
+               DataSet op1 = (DataSet) sets.get(parentID);
+               sets.put(setID, op1.partitionByHash(keys));
+
+       }
+
+       private void createJoinOperation(DatasizeHint mode, INFO info) {
+               DataSet op1 = (DataSet) sets.get(info.parentID);
+               DataSet op2 = (DataSet) sets.get(info.otherID);
+
+               if (info.types != null && (info.projections == null || 
info.projections.length == 0)) {
+                       sets.put(info.setID, applyJoinOperation(op1, op2, 
info.keys1, info.keys2, mode, info));
+               } else {
+                       DefaultJoin defaultResult;
+                       switch (mode) {
+                               case NONE:
+                                       defaultResult = 
op1.join(op2).where(info.keys1).equalTo(info.keys2);
+                                       break;
+                               case HUGE:
+                                       defaultResult = 
op1.joinWithHuge(op2).where(info.keys1).equalTo(info.keys2);
+                                       break;
+                               case TINY:
+                                       defaultResult = 
op1.joinWithTiny(op2).where(info.keys1).equalTo(info.keys2);
+                                       break;
+                               default:
+                                       throw new 
IllegalArgumentException("Invalid join mode specified.");
+                       }
+                       if (info.projections.length == 0) {
+                               sets.put(info.setID, 
defaultResult.name("DefaultJoin"));
+                       } else {
+                               ProjectJoin project = null;
+                               for (ProjectionEntry pe : info.projections) {
+                                       switch (pe.side) {
+                                               case FIRST:
+                                                       project = project == 
null ? defaultResult.projectFirst(pe.keys) : project.projectFirst(pe.keys);
+                                                       break;
+                                               case SECOND:
+                                                       project = project == 
null ? defaultResult.projectSecond(pe.keys) : project.projectSecond(pe.keys);
+                                                       break;
+                                       }
+                               }
+                               sets.put(info.setID, 
project.name("ProjectJoin"));
+                       }
+               }
+       }
+
+       protected abstract DataSet applyJoinOperation(DataSet op1, DataSet op2, 
int[] firstKeys, int[] secondKeys, DatasizeHint mode, INFO info);
+
+       private void createMapOperation(INFO info) {
+               DataSet op1 = (DataSet) sets.get(info.parentID);
+               sets.put(info.setID, applyMapOperation(op1, info));
+       }
+
+       protected abstract DataSet applyMapOperation(DataSet op1, INFO info);
+
+       private void createMapPartitionOperation(INFO info) {
+               DataSet op1 = (DataSet) sets.get(info.parentID);
+               sets.put(info.setID, applyMapPartitionOperation(op1, info));
+       }
+
+       protected abstract DataSet applyMapPartitionOperation(DataSet op1, INFO 
info);
+
+       protected void createProjectOperation() throws IOException {
+               int setID = (Integer) receiver.getRecord(true);
+               int parentID = (Integer) receiver.getRecord(true);
+               Object keysArrayOrTuple = receiver.getRecord(true);
+               int[] keys;
+               if (keysArrayOrTuple instanceof Tuple) {
+                       keys = tupleToIntArray((Tuple) keysArrayOrTuple);
+               } else {
+                       keys = (int[]) keysArrayOrTuple;
+               }
+               DataSet op1 = (DataSet) sets.get(parentID);
+               sets.put(setID, op1.project(keys).name("Projection"));
+       }
+
+       private void createRebalanceOperation() throws IOException {
+               int setID = (Integer) receiver.getRecord(true);
+               int parentID = (Integer) receiver.getRecord(true);
+               DataSet op = (DataSet) sets.get(parentID);
+               sets.put(setID, op.rebalance().name("Rebalance"));
+       }
+
+       private void createReduceOperation(INFO info) {
+               Object op1 = sets.get(info.parentID);
+               if (op1 instanceof DataSet) {
+                       sets.put(info.setID, applyReduceOperation((DataSet) 
op1, info));
+                       return;
+               }
+               if (op1 instanceof UnsortedGrouping) {
+                       sets.put(info.setID, 
applyReduceOperation((UnsortedGrouping) op1, info));
+               }
+       }
+
+       protected abstract DataSet applyReduceOperation(DataSet op1, INFO info);
+
+       protected abstract DataSet applyReduceOperation(UnsortedGrouping op1, 
INFO info);
+
+       protected void createSortOperation() throws IOException {
+               int setID = (Integer) receiver.getRecord(true);
+               int parentID = (Integer) receiver.getRecord(true);
+               int field = (Integer) receiver.getRecord(true);
+               int encodedOrder = (Integer) receiver.getRecord(true);
+               Order order;
+               switch (encodedOrder) {
+                       case 0:
+                               order = Order.NONE;
+                               break;
+                       case 1:
+                               order = Order.ASCENDING;
+                               break;
+                       case 2:
+                               order = Order.DESCENDING;
+                               break;
+                       case 3:
+                               order = Order.ANY;
+                               break;
+                       default:
+                               order = Order.NONE;
+                               break;
+               }
+               Grouping op1 = (Grouping) sets.get(parentID);
+               if (op1 instanceof UnsortedGrouping) {
+                       sets.put(setID, ((UnsortedGrouping) 
op1).sortGroup(field, order));
+                       return;
+               }
+               if (op1 instanceof SortedGrouping) {
+                       sets.put(setID, ((SortedGrouping) op1).sortGroup(field, 
order));
+               }
+       }
+
+       protected void createUnionOperation() throws IOException {
+               int setID = (Integer) receiver.getRecord(true);
+               int parentID = (Integer) receiver.getRecord(true);
+               int otherID = (Integer) receiver.getRecord(true);
+               DataSet op1 = (DataSet) sets.get(parentID);
+               DataSet op2 = (DataSet) sets.get(otherID);
+               sets.put(setID, op1.union(op2).name("Union"));
+       }
+
+       
//====Utility=======================================================================================================
+       protected int[] tupleToIntArray(Tuple tuple) {
+               int[] keys = new int[tuple.getArity()];
+               for (int y = 0; y < tuple.getArity(); y++) {
+                       keys[y] = (Integer) tuple.getField(y);
+               }
+               return keys;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af9248c3/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Receiver.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Receiver.java
 
b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Receiver.java
new file mode 100644
index 0000000..2741714
--- /dev/null
+++ 
b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Receiver.java
@@ -0,0 +1,410 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.languagebinding.api.java.common.streaming;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.io.Serializable;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+//CHECKSTYLE.OFF: AvoidStarImport - tuple imports
+import org.apache.flink.api.java.tuple.*;
+import static 
org.apache.flink.languagebinding.api.java.common.streaming.Sender.*;
+//CHECKSTYLE.ON: AvoidStarImport
+import static 
org.apache.flink.languagebinding.api.java.common.PlanBinder.FLINK_TMP_DATA_DIR;
+import static 
org.apache.flink.languagebinding.api.java.common.PlanBinder.MAPPED_FILE_SIZE;
+import org.apache.flink.util.Collector;
+
+/**
+ * General-purpose class to read data from memory-mapped files.
+ */
+public class Receiver implements Serializable {
+       private final AbstractRichFunction function;
+
+       private File inputFile;
+       private RandomAccessFile inputRAF;
+       private FileChannel inputChannel;
+       private MappedByteBuffer fileBuffer;
+
+       private Deserializer deserializer = null;
+
+       public Receiver(AbstractRichFunction function) {
+               this.function = function;
+       }
+
+       
//=====Setup========================================================================================================
+       public void open(String path) throws IOException {
+               setupMappedFile(path);
+       }
+
+       private void setupMappedFile(String path) throws FileNotFoundException, 
IOException {
+               String inputFilePath = function == null
+                               ? FLINK_TMP_DATA_DIR + "/" + "output"
+                               : path;
+
+               File x = new File(FLINK_TMP_DATA_DIR);
+               x.mkdirs();
+
+               inputFile = new File(inputFilePath);
+               if (inputFile.exists()) {
+                       inputFile.delete();
+               }
+               inputFile.createNewFile();
+               inputRAF = new RandomAccessFile(inputFilePath, "rw");
+               inputRAF.setLength(MAPPED_FILE_SIZE);
+               inputRAF.seek(MAPPED_FILE_SIZE - 1);
+               inputRAF.writeByte(0);
+               inputRAF.seek(0);
+               inputChannel = inputRAF.getChannel();
+               fileBuffer = inputChannel.map(FileChannel.MapMode.READ_WRITE, 
0, MAPPED_FILE_SIZE);
+       }
+
+       public void close() throws IOException {
+               closeMappedFile();
+       }
+
+       private void closeMappedFile() throws IOException {
+               inputChannel.close();
+               inputRAF.close();
+       }
+
+       
//=====Record-API===================================================================================================
+       /**
+        * Loads a buffer from the memory-mapped file. The records contained 
within the buffer can be accessed using
+        * collectRecord(). These records do not necessarily have to be of the 
same type. This method requires external
+        * synchronization.
+        *
+        * @throws IOException
+        */
+       private void loadBuffer() throws IOException {
+               int count = 0;
+               while (fileBuffer.get(0) == 0 && count < 10) {
+                       try {
+                               Thread.sleep(1000);
+                       } catch (InterruptedException ie) {
+                       }
+                       fileBuffer.load();
+                       count++;
+               }
+               if (fileBuffer.get(0) == 0) {
+                       throw new RuntimeException("External process not 
respoonding.");
+               }
+               fileBuffer.position(1);
+       }
+
+       /**
+        * Returns a record from the buffer. Note: This method cannot be 
replaced with specific methods like readInt() or
+        * similar. The PlanBinder requires a method that can return any kind 
of object.
+        *
+        * @return read record
+        * @throws IOException
+        */
+       public Object getRecord() throws IOException {
+               return getRecord(false);
+       }
+
+       /**
+        * Returns a record from the buffer. Note: This method cannot be 
replaced with specific methods like readInt() or
+        * similar. The PlanBinder requires a method that can return any kind 
of object.
+        *
+        * @param normalized flag indicating whether certain types should be 
normalized
+        * @return read record
+        * @throws IOException
+        */
+       public Object getRecord(boolean normalized) throws IOException {
+               if (fileBuffer.position() == 0) {
+                       loadBuffer();
+               }
+               return receiveField(normalized);
+       }
+
+       /**
+        * Reads a single primitive value or tuple from the buffer.
+        *
+        * @return primitive value or tuple
+        * @throws IOException
+        */
+       private Object receiveField(boolean normalized) throws IOException {
+               byte type = fileBuffer.get();
+               switch (type) {
+                       case TYPE_TUPLE:
+                               int tupleSize = fileBuffer.get();
+                               Tuple tuple = createTuple(tupleSize);
+                               for (int x = 0; x < tupleSize; x++) {
+                                       
tuple.setField(receiveField(normalized), x);
+                               }
+                               return tuple;
+                       case TYPE_BOOLEAN:
+                               return fileBuffer.get() == 1;
+                       case TYPE_BYTE:
+                               return fileBuffer.get();
+                       case TYPE_SHORT:
+                               if (normalized) {
+                                       return (int) fileBuffer.getShort();
+                               } else {
+                                       return fileBuffer.getShort();
+                               }
+                       case TYPE_INTEGER:
+                               return fileBuffer.getInt();
+                       case TYPE_LONG:
+                               if (normalized) {
+                                       return new 
Long(fileBuffer.getLong()).intValue();
+                               } else {
+                                       return fileBuffer.getLong();
+                               }
+                       case TYPE_FLOAT:
+                               if (normalized) {
+                                       return (double) fileBuffer.getFloat();
+                               } else {
+                                       return fileBuffer.getFloat();
+                               }
+                       case TYPE_DOUBLE:
+                               return fileBuffer.getDouble();
+                       case TYPE_STRING:
+                               int stringSize = fileBuffer.getInt();
+                               byte[] buffer = new byte[stringSize];
+                               fileBuffer.get(buffer);
+                               return new String(buffer);
+                       case TYPE_BYTES:
+                               int bytessize = fileBuffer.getInt();
+                               byte[] bytebuffer = new byte[bytessize];
+                               fileBuffer.get(bytebuffer);
+                               return bytebuffer;
+                       case TYPE_NULL:
+                               return null;
+                       default:
+                               throw new IllegalArgumentException("Unknown 
TypeID encountered: " + type);
+               }
+       }
+
+       
//=====Buffered-API=================================================================================================
+       /**
+        * Reads a buffer of the given size from the memory-mapped file, and 
collects all records contained. This method
+        * assumes that all values in the buffer are of the same type. This 
method does NOT take care of synchronization.
+        * The user must guarantee that the buffer was completely written 
before calling this method.
+        *
+        * @param c Collector to collect records
+        * @param bufferSize size of the buffer
+        * @throws IOException
+        */
+       public void collectBuffer(Collector c, int bufferSize) throws 
IOException {
+               fileBuffer.position(0);
+
+               if (deserializer == null) {
+                       byte type = fileBuffer.get();
+                       deserializer = getDeserializer(type);
+               }
+               while (fileBuffer.position() < bufferSize) {
+                       c.collect(deserializer.deserialize());
+               }
+       }
+
+       
//=====Deserializer=================================================================================================
+       private Deserializer getDeserializer(byte type) {
+               switch (type) {
+                       case TYPE_TUPLE:
+                               return new TupleDeserializer();
+                       case TYPE_BOOLEAN:
+                               return new BooleanDeserializer();
+                       case TYPE_BYTE:
+                               return new ByteDeserializer();
+                       case TYPE_BYTES:
+                               return new BytesDeserializer();
+                       case TYPE_SHORT:
+                               return new ShortDeserializer();
+                       case TYPE_INTEGER:
+                               return new IntDeserializer();
+                       case TYPE_LONG:
+                               return new LongDeserializer();
+                       case TYPE_STRING:
+                               return new StringDeserializer();
+                       case TYPE_FLOAT:
+                               return new FloatDeserializer();
+                       case TYPE_DOUBLE:
+                               return new DoubleDeserializer();
+                       case TYPE_NULL:
+                               return new NullDeserializer();
+                       default:
+                               throw new IllegalArgumentException("Unknown 
TypeID encountered: " + type);
+
+               }
+       }
+
+       private interface Deserializer<T> {
+               public T deserialize();
+
+       }
+
+       private class BooleanDeserializer implements Deserializer<Boolean> {
+               @Override
+               public Boolean deserialize() {
+                       return fileBuffer.get() == 1;
+               }
+       }
+
+       private class ByteDeserializer implements Deserializer<Byte> {
+               @Override
+               public Byte deserialize() {
+                       return fileBuffer.get();
+               }
+       }
+
+       private class ShortDeserializer implements Deserializer<Short> {
+               @Override
+               public Short deserialize() {
+                       return fileBuffer.getShort();
+               }
+       }
+
+       private class IntDeserializer implements Deserializer<Integer> {
+               @Override
+               public Integer deserialize() {
+                       return fileBuffer.getInt();
+               }
+       }
+
+       private class LongDeserializer implements Deserializer<Long> {
+               @Override
+               public Long deserialize() {
+                       return fileBuffer.getLong();
+               }
+       }
+
+       private class FloatDeserializer implements Deserializer<Float> {
+               @Override
+               public Float deserialize() {
+                       return fileBuffer.getFloat();
+               }
+       }
+
+       private class DoubleDeserializer implements Deserializer<Double> {
+               @Override
+               public Double deserialize() {
+                       return fileBuffer.getDouble();
+               }
+       }
+
+       private class StringDeserializer implements Deserializer<String> {
+               private int size;
+
+               @Override
+               public String deserialize() {
+                       size = fileBuffer.getInt();
+                       byte[] buffer = new byte[size];
+                       fileBuffer.get(buffer);
+                       return new String(buffer);
+               }
+       }
+
+       private class NullDeserializer implements Deserializer<Object> {
+               @Override
+               public Object deserialize() {
+                       return null;
+               }
+       }
+
+       private class BytesDeserializer implements Deserializer<byte[]> {
+               @Override
+               public byte[] deserialize() {
+                       int length = fileBuffer.getInt();
+                       byte[] result = new byte[length];
+                       fileBuffer.get(result);
+                       return result;
+               }
+
+       }
+
+       private class TupleDeserializer implements Deserializer<Tuple> {
+               Deserializer[] deserializer = null;
+               Tuple reuse;
+
+               public TupleDeserializer() {
+                       int size = fileBuffer.getInt();
+                       reuse = createTuple(size);
+                       deserializer = new Deserializer[size];
+                       for (int x = 0; x < deserializer.length; x++) {
+                               deserializer[x] = 
getDeserializer(fileBuffer.get());
+                       }
+               }
+
+               @Override
+               public Tuple deserialize() {
+                       for (int x = 0; x < deserializer.length; x++) {
+                               reuse.setField(deserializer[x].deserialize(), 
x);
+                       }
+                       return reuse;
+               }
+       }
+
+       public static Tuple createTuple(int size) {
+               switch (size) {
+                       case 0:
+                               return new Tuple0();
+                       case 1:
+                               return new Tuple1();
+                       case 2:
+                               return new Tuple2();
+                       case 3:
+                               return new Tuple3();
+                       case 4:
+                               return new Tuple4();
+                       case 5:
+                               return new Tuple5();
+                       case 6:
+                               return new Tuple6();
+                       case 7:
+                               return new Tuple7();
+                       case 8:
+                               return new Tuple8();
+                       case 9:
+                               return new Tuple9();
+                       case 10:
+                               return new Tuple10();
+                       case 11:
+                               return new Tuple11();
+                       case 12:
+                               return new Tuple12();
+                       case 13:
+                               return new Tuple13();
+                       case 14:
+                               return new Tuple14();
+                       case 15:
+                               return new Tuple15();
+                       case 16:
+                               return new Tuple16();
+                       case 17:
+                               return new Tuple17();
+                       case 18:
+                               return new Tuple18();
+                       case 19:
+                               return new Tuple19();
+                       case 20:
+                               return new Tuple20();
+                       case 21:
+                               return new Tuple21();
+                       case 22:
+                               return new Tuple22();
+                       case 23:
+                               return new Tuple23();
+                       case 24:
+                               return new Tuple24();
+                       case 25:
+                               return new Tuple25();
+                       default:
+                               throw new IllegalArgumentException("Tuple size 
not supported: " + size);
+               }
+       }
+}

Reply via email to