[FLINK-1622][java-api][scala-api] add a GroupCombine operator

The GroupCombine operator acts like a the optional combine step in the
GroupReduceFunction. It is more general because it combines from an
input to an arbitrary output type. Combining is performed on the
partitions with as much data in memory as possible. This may lead to
partial results.

The operator can be used to pre-combine elements into an intermediate
output format before applying a proper groupReduce to produce the final
output format.

* make Combine and FlatCombine generic by adding an output type

* add documentation

* Reuse GroupReduceCombineDriver and SynchronousChainedCombineDriver for 
GroupCombine operator
** make them more generic by specifying input and output type
** implement AllCombineDriver

* add Java tests
* add Scala test

This closes #466


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e93e0cb8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e93e0cb8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e93e0cb8

Branch: refs/heads/master
Commit: e93e0cb868087a8ab707c7e96cfccf220b07a4aa
Parents: 4a49a73
Author: Maximilian Michels <m...@apache.org>
Authored: Tue Mar 3 17:27:13 2015 +0100
Committer: Maximilian Michels <m...@apache.org>
Committed: Wed Mar 18 18:16:05 2015 +0100

----------------------------------------------------------------------
 docs/dataset_transformations.md                 | 115 +++-
 .../org/apache/flink/compiler/PactCompiler.java |  48 +-
 .../flink/compiler/costs/CostEstimator.java     |   5 +-
 .../flink/compiler/dag/GroupCombineNode.java    | 106 ++++
 .../operators/AllGroupCombineProperties.java    |  73 +++
 .../AllGroupWithPartialPreGroupProperties.java  |   2 +-
 .../operators/GroupCombineProperties.java       | 117 +++++
 .../operators/OperatorDescriptorSingle.java     |   5 +-
 .../operators/PartialGroupProperties.java       |  91 ----
 .../plandump/PlanJSONDumpGenerator.java         |   2 +-
 .../compiler/postpass/JavaApiPostPass.java      |   2 +-
 .../java/GroupReduceCompilationTest.java        |   2 +-
 .../api/common/functions/CombineFunction.java   |   7 +-
 .../common/functions/FlatCombineFunction.java   |   7 +-
 .../functions/RichFlatCombineFunction.java      |   7 +-
 .../functions/RichGroupReduceFunction.java      |   2 +-
 .../base/GroupCombineOperatorBase.java          | 151 ++++++
 .../java/org/apache/flink/api/java/DataSet.java |  26 +-
 .../flink/api/java/functions/FirstReducer.java  |   2 +-
 .../java/operators/GroupCombineOperator.java    | 227 ++++++++
 .../api/java/operators/SortedGrouping.java      |  22 +
 .../api/java/operators/UnsortedGrouping.java    |  24 +-
 .../PlanUnwrappingGroupCombineOperator.java     |  70 +++
 .../PlanUnwrappingReduceGroupOperator.java      |   2 +-
 ...lanUnwrappingSortedGroupCombineOperator.java |  70 +++
 ...PlanUnwrappingSortedReduceGroupOperator.java |   2 +-
 .../java/record/operators/ReduceOperator.java   |   2 +-
 .../flink/api/java/typeutils/TypeExtractor.java |  11 +
 .../java/record/ReduceWrappingFunctionTest.java |   4 +-
 .../operators/AllGroupCombineDriver.java        | 127 +++++
 .../runtime/operators/AllGroupReduceDriver.java |  46 +-
 .../flink/runtime/operators/DriverStrategy.java |   5 +-
 .../operators/GroupReduceCombineDriver.java     |  70 +--
 .../runtime/operators/PactTaskContext.java      |   2 +-
 .../chaining/GroupCombineChainedDriver.java     | 239 +++++++++
 .../SynchronousChainedCombineDriver.java        |  50 +-
 .../sort/CombiningUnilateralSortMerger.java     |  10 +-
 .../operators/CombineTaskExternalITCase.java    |   4 +-
 .../runtime/operators/CombineTaskTest.java      |   6 +-
 .../org/apache/flink/api/scala/DataSet.scala    |  56 ++
 .../apache/flink/api/scala/GroupedDataSet.scala |  53 +-
 .../CustomRankCombiner.java                     |   2 +-
 .../javaApiOperators/GroupCombineITCase.java    | 522 +++++++++++++++++++
 .../scala/operators/GroupCombineITCase.scala    |  77 +++
 44 files changed, 2248 insertions(+), 225 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/docs/dataset_transformations.md
----------------------------------------------------------------------
diff --git a/docs/dataset_transformations.md b/docs/dataset_transformations.md
index 13082c1..2bec61b 100644
--- a/docs/dataset_transformations.md
+++ b/docs/dataset_transformations.md
@@ -260,7 +260,7 @@ class WC(val word: String, val count: Int) {
 }
 
 val words: DataSet[WC] = // [...]
-val wordCounts = words.groupBy { _.word } reduce { 
+val wordCounts = words.groupBy { _.word } reduce {
   (w1, w2) => new WC(w1.word, w1.count + w2.count)
 }
 ~~~
@@ -298,7 +298,7 @@ val reducedTuples = tuples.groupBy(0, 1).reduce { ... }
 
 #### Reduce on DataSet grouped by Case Class Fields
 
-When using Case Classes you can also specify the grouping key using the names 
of the fields: 
+When using Case Classes you can also specify the grouping key using the names 
of the fields:
 
 ~~~scala
 case class MyClass(val a: String, b: Int, c: Double)
@@ -334,7 +334,7 @@ public class DistinctReduce
 
     Set<String> uniqStrings = new HashSet<String>();
     Integer key = null;
-  
+
     // add all strings of the group to the set
     for (Tuple2<Integer, String> t : in) {
       key = t.f0;
@@ -524,6 +524,99 @@ class MyCombinableGroupReducer
 </div>
 </div>
 
+### GroupCombine on a Grouped DataSet
+
+The GroupCombine transformation is the generalized form of the combine step in
+the Combinable GroupReduceFunction. It is generalized in the sense that it
+allows combining of input type `I` to an arbitrary output type `O`. In 
contrast,
+the combine step in the GroupReduce only allows combining from input type `I` 
to
+output type `I`. This is because the reduce step in the GroupReduceFunction
+expects input type `I`.
+
+In some applications, it is desirable to combine a DataSet into an intermediate
+format before performing additional transformations (e.g. to reduce data
+size). This can be achieved with a ComineGroup transformation with very little
+costs.
+
+**Note:** The GroupCombine on a Grouped DataSet is performed in memory with a
+  greedy strategy which may not process all data at once but in multiple
+  steps. It is also performed on the individual partitions without a data
+  exchange like in a GroupReduce transformation. This may lead to partial
+  results.
+
+The following example demonstrates the use of a CombineGroup transformation for
+an alternative WordCount implementation. In the implementation,
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+~~~java
+DataSet<String> input = [..] // The words received as input
+DataSet<String> groupedInput = input.groupBy(0); // group identical words
+
+DataSet<Tuple2<String, Integer>> combinedWords = groupedInput.combineGroup(new 
FlatCombineFunction<String, Tuple2<String, Integer>() {
+
+    public void combine(Iterable<String> words, Collector<Tuple2<String, 
Integer>>) { // combine
+        int count = 0;
+        for (String word : words) {
+            count++;
+        }
+        out.collect(new Tuple2(word, count));
+    }
+});
+
+DataSet<Tuple2<String, Integer>> groupedCombinedWords = 
combinedWords.groupBy(0); // group by words again
+
+DataSet<Tuple2<String, Integer>> output = combinedWords.groupReduce(new 
GroupReduceFunction() { // group reduce with full data exchange
+
+    public void reduce(Iterable<Tuple2<String, Integer>>, 
Collector<Tuple2<String, Integer>>) {
+        int count = 0;
+        for (Tuple2<String, Integer> word : words) {
+            count++;
+        }
+        out.collect(new Tuple2(word, count));
+    }
+});
+~~~
+
+</div>
+<div data-lang="scala" markdown="1">
+
+~~~scala
+val input: DataSet[String] = [..] // The words received as input
+val groupedInput: DataSet[String] = input.groupBy(0)
+
+val combinedWords: DataSet[(String, Int)] = groupedInput.groupCombine {
+    (words, out: Collector[(String, Int)]) =>
+        var count = 0
+        for (word <- words) {
+            count++
+        }
+        out.collect(word, count)
+}
+
+val groupedCombinedWords: DataSet[(String, Int)] = combinedWords.groupBy(0)
+
+val output: DataSet[(String, Int)] = groupedInput.groupCombine {
+    (words, out: Collector[(String, Int)]) =>
+        var count = 0
+        for ((word, Int) <- words) {
+            count++
+        }
+        out.collect(word, count)
+}
+
+~~~
+
+</div>
+</div>
+
+The above alternative WordCount implementation demonstrates how the 
GroupCombine
+combines words before performing the GroupReduce transformation. The above
+example is just a proof of concept. Note, how the combine step changes the type
+of the DataSet which would normally required an additional Map transformation
+before executing the GroupReduce.
+
 ### Aggregate on Grouped Tuple DataSet
 
 There are some common aggregation operations that are frequently used. The 
Aggregate transformation provides the following build-in aggregation functions:
@@ -558,7 +651,7 @@ val output = input.groupBy(1).aggregate(SUM, 0).and(MIN, 2)
 </div>
 </div>
 
-To apply multiple aggregations on a DataSet it is necessary to use the 
`.and()` function after the first aggregate, that means `.aggregate(SUM, 
0).and(MIN, 2)` produces the sum of field 0 and the minimum of field 2 of the 
original DataSet. 
+To apply multiple aggregations on a DataSet it is necessary to use the 
`.and()` function after the first aggregate, that means `.aggregate(SUM, 
0).and(MIN, 2)` produces the sum of field 0 and the minimum of field 2 of the 
original DataSet.
 In contrast to that `.aggregate(SUM, 0).aggregate(MIN, 2)` will apply an 
aggregation on an aggregation. In the given example it would produce the 
minimum of field 2 after calculating the sum of field 0 grouped by field 1.
 
 **Note:** The set of aggregation functions will be extended in the future.
@@ -632,6 +725,12 @@ group-reduce function is not combinable. Therefore, this 
can be a very compute i
 See the paragraph on "Combineable Group-Reduce Functions" above to learn how 
to implement a
 combinable group-reduce function.
 
+### GroupCombine on a full DataSet
+
+The GroupCombine on a full DataSet works similar to the GroupCombine on a
+grouped DataSet. The data is partitioned on all nodes and then combined in a
+greedy fashion (i.e. only data fitting into memory is combined at once).
+
 ### Aggregate on full Tuple DataSet
 
 There are some common aggregation operations that are frequently used. The 
Aggregate transformation
@@ -898,7 +997,7 @@ to manually pick a strategy, in case you want to enforce a 
specific way of execu
 DataSet<SomeType> input1 = // [...]
 DataSet<AnotherType> input2 = // [...]
 
-DataSet<Tuple2<SomeType, AnotherType> result = 
+DataSet<Tuple2<SomeType, AnotherType> result =
       input1.join(input2, BROADCAST_HASH_FIRST)
             .where("id").equalTo("key");
 ~~~
@@ -1199,7 +1298,7 @@ val out = in.rebalance().map { ... }
 
 ### Hash-Partition
 
-Hash-partitions a DataSet on a given key. 
+Hash-partitions a DataSet on a given key.
 Keys can be specified as key expressions or field position keys (see [Reduce 
examples](#reduce-on-grouped-dataset) for how to specify keys).
 
 <div class="codetabs" markdown="1">
@@ -1235,7 +1334,7 @@ Partitions can be sorted on multiple fields by chaining 
`sortPartition()` calls.
 
 ~~~java
 DataSet<Tuple2<String, Integer>> in = // [...]
-// Locally sort partitions in ascending order on the second String field and 
+// Locally sort partitions in ascending order on the second String field and
 // in descending order on the first String field.
 // Apply a MapPartition transformation on the sorted partitions.
 DataSet<Tuple2<String, String>> out = in.sortPartition(1, Order.ASCENDING)
@@ -1248,7 +1347,7 @@ DataSet<Tuple2<String, String>> out = in.sortPartition(1, 
Order.ASCENDING)
 
 ~~~scala
 val in: DataSet[(String, Int)] = // [...]
-// Locally sort partitions in ascending order on the second String field and 
+// Locally sort partitions in ascending order on the second String field and
 // in descending order on the first String field.
 // Apply a MapPartition transformation on the sorted partitions.
 val out = in.sortPartition(1, Order.ASCENDING)

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java 
b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
index 0ea8724..160b506 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
@@ -18,6 +18,26 @@
 
 package org.apache.flink.compiler;
 
+import org.apache.flink.compiler.costs.CostEstimator;
+import org.apache.flink.compiler.costs.DefaultCostEstimator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.flink.api.common.operators.base.BulkIterationBase;
+import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
+import org.apache.flink.api.common.operators.base.CrossOperatorBase;
+import org.apache.flink.api.common.operators.base.DeltaIterationBase;
+import org.apache.flink.api.common.operators.base.FilterOperatorBase;
+import org.apache.flink.api.common.operators.base.FlatMapOperatorBase;
+import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
+import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase;
+import org.apache.flink.api.common.operators.base.MapOperatorBase;
+import org.apache.flink.api.common.operators.base.MapPartitionOperatorBase;
+import org.apache.flink.api.common.operators.base.PartitionOperatorBase;
+import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
+import org.apache.flink.api.common.operators.base.SortPartitionOperatorBase;
+
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Deque;
@@ -28,11 +48,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.flink.api.common.ExecutionMode;
-import org.apache.flink.api.common.operators.base.SortPartitionOperatorBase;
 import org.apache.flink.compiler.dag.SortPartitionNode;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.Plan;
@@ -40,23 +56,10 @@ import 
org.apache.flink.api.common.operators.GenericDataSinkBase;
 import org.apache.flink.api.common.operators.GenericDataSourceBase;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.Union;
-import org.apache.flink.api.common.operators.base.BulkIterationBase;
-import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
-import org.apache.flink.api.common.operators.base.CrossOperatorBase;
-import org.apache.flink.api.common.operators.base.DeltaIterationBase;
-import org.apache.flink.api.common.operators.base.FilterOperatorBase;
-import org.apache.flink.api.common.operators.base.FlatMapOperatorBase;
-import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase;
-import org.apache.flink.api.common.operators.base.MapOperatorBase;
-import org.apache.flink.api.common.operators.base.MapPartitionOperatorBase;
-import org.apache.flink.api.common.operators.base.PartitionOperatorBase;
-import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
 import 
org.apache.flink.api.common.operators.base.BulkIterationBase.PartialSolutionPlaceHolder;
 import 
org.apache.flink.api.common.operators.base.DeltaIterationBase.SolutionSetPlaceHolder;
 import 
org.apache.flink.api.common.operators.base.DeltaIterationBase.WorksetPlaceHolder;
-import org.apache.flink.compiler.costs.CostEstimator;
-import org.apache.flink.compiler.costs.DefaultCostEstimator;
+
 import org.apache.flink.compiler.dag.BinaryUnionNode;
 import org.apache.flink.compiler.dag.BulkIterationNode;
 import org.apache.flink.compiler.dag.BulkPartialSolutionNode;
@@ -68,10 +71,11 @@ import org.apache.flink.compiler.dag.DataSourceNode;
 import org.apache.flink.compiler.dag.FilterNode;
 import org.apache.flink.compiler.dag.FlatMapNode;
 import org.apache.flink.compiler.dag.GroupReduceNode;
+import org.apache.flink.compiler.dag.GroupCombineNode;
 import org.apache.flink.compiler.dag.IterationNode;
+import org.apache.flink.compiler.dag.JoinNode;
 import org.apache.flink.compiler.dag.MapNode;
 import org.apache.flink.compiler.dag.MapPartitionNode;
-import org.apache.flink.compiler.dag.JoinNode;
 import org.apache.flink.compiler.dag.OptimizerNode;
 import org.apache.flink.compiler.dag.PactConnection;
 import org.apache.flink.compiler.dag.PartitionNode;
@@ -97,11 +101,14 @@ import org.apache.flink.compiler.plan.SourcePlanNode;
 import org.apache.flink.compiler.plan.WorksetIterationPlanNode;
 import org.apache.flink.compiler.plan.WorksetPlanNode;
 import org.apache.flink.compiler.postpass.OptimizerPostPass;
+
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
+
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
+
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Visitor;
 
@@ -686,6 +693,9 @@ public class PactCompiler {
                        else if (c instanceof GroupReduceOperatorBase) {
                                n = new 
GroupReduceNode((GroupReduceOperatorBase<?, ?, ?>) c);
                        }
+                       else if (c instanceof GroupCombineOperatorBase) {
+                               n = new 
GroupCombineNode((GroupCombineOperatorBase<?, ?, ?>) c);
+                       }
                        else if (c instanceof JoinOperatorBase) {
                                n = new JoinNode((JoinOperatorBase<?, ?, ?, ?>) 
c);
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-compiler/src/main/java/org/apache/flink/compiler/costs/CostEstimator.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/costs/CostEstimator.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/costs/CostEstimator.java
index b13c1be..091fbf6 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/costs/CostEstimator.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/costs/CostEstimator.java
@@ -186,7 +186,10 @@ public abstract class CostEstimator {
                        
                case SORTED_GROUP_COMBINE:
                        // partial grouping is always local and main memory 
resident. we should add a relative cpu cost at some point
-               
+
+                       // partial grouping is always local and main memory 
resident. we should add a relative cpu cost at some point
+               case ALL_GROUP_COMBINE:
+                       
                case UNION:
                        // pipelined local union is for free
                        

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-compiler/src/main/java/org/apache/flink/compiler/dag/GroupCombineNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/GroupCombineNode.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/GroupCombineNode.java
new file mode 100644
index 0000000..50ae50d
--- /dev/null
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/GroupCombineNode.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.dag;
+
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
+import org.apache.flink.compiler.DataStatistics;
+import org.apache.flink.compiler.operators.AllGroupCombineProperties;
+import org.apache.flink.compiler.operators.GroupCombineProperties;
+import org.apache.flink.compiler.operators.OperatorDescriptorSingle;
+import org.apache.flink.configuration.Configuration;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * The optimizer representation of a <i>GroupCombineNode</i> operation.
+ */
+public class GroupCombineNode extends SingleInputNode {
+
+       private final List<OperatorDescriptorSingle> possibleProperties;
+
+       /**
+        * Creates a new optimizer node for the given operator.
+        *
+        * @param operator The reduce operation.
+        */
+       public GroupCombineNode(GroupCombineOperatorBase<?, ?, ?> operator) {
+               super(operator);
+
+               if (this.keys == null) {
+                       // case of a key-less reducer. force a parallelism of 1
+                       setDegreeOfParallelism(1);
+               }
+
+               this.possibleProperties = initPossibleProperties();
+       }
+
+       private List<OperatorDescriptorSingle> initPossibleProperties() {
+               // see if an internal hint dictates the strategy to use
+               final Configuration conf = getPactContract().getParameters();
+
+               // check if we can work with a grouping (simple reducer), or if 
we need ordering because of a group order
+               Ordering groupOrder = null;
+               if (getPactContract() instanceof GroupCombineOperatorBase) {
+                       groupOrder = getPactContract().getGroupOrder();
+                       if (groupOrder != null && 
groupOrder.getNumberOfFields() == 0) {
+                               groupOrder = null;
+                       }
+               }
+
+               OperatorDescriptorSingle props = (this.keys == null ?
+                               new AllGroupCombineProperties() :
+                               new GroupCombineProperties(this.keys, 
groupOrder));
+
+               return Collections.singletonList(props);
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Gets the operator represented by this optimizer node.
+        *
+        * @return The operator represented by this optimizer node.
+        */
+       @Override
+       public GroupCombineOperatorBase<?, ?, ?> getPactContract() {
+               return (GroupCombineOperatorBase<?, ?, ?>) 
super.getPactContract();
+       }
+
+       @Override
+       public String getName() {
+               return "GroupCombine";
+       }
+
+       @Override
+       protected List<OperatorDescriptorSingle> getPossibleProperties() {
+               return this.possibleProperties;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Estimates
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       protected void computeOperatorSpecificDefaultEstimates(DataStatistics 
statistics) {
+               // no real estimates possible for a reducer.
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllGroupCombineProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllGroupCombineProperties.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllGroupCombineProperties.java
new file mode 100644
index 0000000..7919d28
--- /dev/null
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllGroupCombineProperties.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.operators;
+
+import org.apache.flink.compiler.dag.SingleInputNode;
+import org.apache.flink.compiler.dataproperties.GlobalProperties;
+import org.apache.flink.compiler.dataproperties.LocalProperties;
+import org.apache.flink.compiler.dataproperties.PartitioningProperty;
+import org.apache.flink.compiler.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.compiler.dataproperties.RequestedLocalProperties;
+import org.apache.flink.compiler.plan.Channel;
+import org.apache.flink.compiler.plan.SingleInputPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+import java.util.Collections;
+import java.util.List;
+
+public final class AllGroupCombineProperties extends OperatorDescriptorSingle {
+
+       @Override
+       public DriverStrategy getStrategy() {
+               return DriverStrategy.ALL_GROUP_COMBINE;
+       }
+
+       @Override
+       public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
+               return new SingleInputPlanNode(node, "GroupCombine 
("+node.getPactContract().getName()+")", in, DriverStrategy.ALL_GROUP_COMBINE);
+       }
+
+       @Override
+       protected List<RequestedGlobalProperties> 
createPossibleGlobalProperties() {
+               return Collections.singletonList(new 
RequestedGlobalProperties());
+       }
+
+       @Override
+       protected List<RequestedLocalProperties> 
createPossibleLocalProperties() {
+               return Collections.singletonList(new 
RequestedLocalProperties());
+       }
+
+
+       @Override
+       public GlobalProperties computeGlobalProperties(GlobalProperties 
gProps) {
+               if (gProps.getUniqueFieldCombination() != null && 
gProps.getUniqueFieldCombination().size() > 0 &&
+                               gProps.getPartitioning() == 
PartitioningProperty.RANDOM_PARTITIONED)
+               {
+                       
gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList());
+               }
+               gProps.clearUniqueFieldCombinations();
+               return gProps;
+       }
+
+
+       @Override
+       public LocalProperties computeLocalProperties(LocalProperties lProps) {
+               return lProps.clearUniqueFieldSets();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllGroupWithPartialPreGroupProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllGroupWithPartialPreGroupProperties.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllGroupWithPartialPreGroupProperties.java
index ec38b47..d6a3c5f 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllGroupWithPartialPreGroupProperties.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllGroupWithPartialPreGroupProperties.java
@@ -58,7 +58,7 @@ public final class AllGroupWithPartialPreGroupProperties 
extends OperatorDescrip
                        
combinerNode.setDegreeOfParallelism(in.getSource().getDegreeOfParallelism());
 
                        SingleInputPlanNode combiner = new 
SingleInputPlanNode(combinerNode,
-                                       "Combine 
("+node.getPactContract().getName()+")", toCombiner, 
DriverStrategy.ALL_GROUP_COMBINE);
+                                       "Combine 
("+node.getPactContract().getName()+")", toCombiner, 
DriverStrategy.ALL_GROUP_REDUCE_COMBINE);
                        combiner.setCosts(new Costs(0, 0));
                        
combiner.initProperties(toCombiner.getGlobalProperties(), 
toCombiner.getLocalProperties());
                        

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupCombineProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupCombineProperties.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupCombineProperties.java
new file mode 100644
index 0000000..17f0980
--- /dev/null
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupCombineProperties.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.operators;
+
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.compiler.dag.SingleInputNode;
+import org.apache.flink.compiler.dataproperties.GlobalProperties;
+import org.apache.flink.compiler.dataproperties.LocalProperties;
+import org.apache.flink.compiler.dataproperties.PartitioningProperty;
+import org.apache.flink.compiler.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.compiler.dataproperties.RequestedLocalProperties;
+import org.apache.flink.compiler.plan.Channel;
+import org.apache.flink.compiler.plan.SingleInputPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * The properties file belonging to the GroupCombineNode. It translates the 
GroupCombine operation
+ * to the driver strategy SORTED_GROUP_COMBINE and sets the relevant grouping 
and sorting keys.
+ * @see org.apache.flink.compiler.dag.GroupCombineNode
+ */
+public final class GroupCombineProperties extends OperatorDescriptorSingle {
+
+       private final Ordering ordering;        // ordering that we need to use 
if an additional ordering is requested 
+
+       public GroupCombineProperties(FieldSet groupKeys, Ordering 
additionalOrderKeys) {
+               super(groupKeys);
+
+               // if we have an additional ordering, construct the ordering to 
have primarily the grouping fields
+               
+               this.ordering = new Ordering();
+               for (Integer key : this.keyList) {
+                       this.ordering.appendOrdering(key, null, Order.ANY);
+               }
+
+               // and next the additional order fields
+               if (additionalOrderKeys != null) {
+                       for (int i = 0; i < 
additionalOrderKeys.getNumberOfFields(); i++) {
+                               Integer field = 
additionalOrderKeys.getFieldNumber(i);
+                               Order order = additionalOrderKeys.getOrder(i);
+                               this.ordering.appendOrdering(field, 
additionalOrderKeys.getType(i), order);
+                       }
+               }
+
+       }
+
+       @Override
+       public DriverStrategy getStrategy() {
+               return DriverStrategy.SORTED_GROUP_COMBINE;
+       }
+
+       @Override
+       public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
+               
node.setDegreeOfParallelism(in.getSource().getDegreeOfParallelism());
+               
+               // sorting key info
+               SingleInputPlanNode singleInputPlanNode = new 
SingleInputPlanNode(
+                               node, 
+                               "GroupCombine (" + 
node.getPactContract().getName() + ")",
+                               in, // reuse the combine strategy also used in 
the group reduce
+                               DriverStrategy.SORTED_GROUP_COMBINE, 
this.keyList);
+
+               // set sorting comparator key info
+               
singleInputPlanNode.setDriverKeyInfo(this.ordering.getInvolvedIndexes(), 
this.ordering.getFieldSortDirections(), 0);
+               // set grouping comparator key info
+               singleInputPlanNode.setDriverKeyInfo(this.keyList, 1);
+               
+               return singleInputPlanNode;
+       }
+
+       @Override
+       protected List<RequestedGlobalProperties> 
createPossibleGlobalProperties() {
+               RequestedGlobalProperties props = new 
RequestedGlobalProperties();
+               props.setRandomPartitioning();
+               return Collections.singletonList(props);
+       }
+
+       @Override
+       protected List<RequestedLocalProperties> 
createPossibleLocalProperties() {
+               return Collections.singletonList(new 
RequestedLocalProperties());
+       }
+
+       @Override
+       public GlobalProperties computeGlobalProperties(GlobalProperties 
gProps) {
+               if (gProps.getUniqueFieldCombination() != null && 
gProps.getUniqueFieldCombination().size() > 0 &&
+                               gProps.getPartitioning() == 
PartitioningProperty.RANDOM_PARTITIONED) {
+                       
gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList());
+               }
+               gProps.clearUniqueFieldCombinations();
+               return gProps;
+       }
+
+       @Override
+       public LocalProperties computeLocalProperties(LocalProperties lProps) {
+               return lProps.clearUniqueFieldSets();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorSingle.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorSingle.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorSingle.java
index 7919b2b..16e7c72 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorSingle.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorSingle.java
@@ -32,7 +32,10 @@ import org.apache.flink.compiler.plan.Channel;
 import org.apache.flink.compiler.plan.SingleInputPlanNode;
 
 /**
- * 
+ * Abstract base class for Operator descriptions which instantiates the node 
and sets the driver
+ * strategy and the sorting and grouping keys. Returns possible local and 
global properties and
+ * updates them after the operation has been performed.
+ * @see org.apache.flink.compiler.dag.SingleInputNode
  */
 public abstract class OperatorDescriptorSingle implements 
AbstractOperatorDescriptor {
        

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-compiler/src/main/java/org/apache/flink/compiler/operators/PartialGroupProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/PartialGroupProperties.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/PartialGroupProperties.java
deleted file mode 100644
index 7954773..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/PartialGroupProperties.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.compiler.operators;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
-import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.compiler.dag.GroupReduceNode;
-import org.apache.flink.compiler.dag.SingleInputNode;
-import org.apache.flink.compiler.dataproperties.GlobalProperties;
-import org.apache.flink.compiler.dataproperties.LocalProperties;
-import org.apache.flink.compiler.dataproperties.PartitioningProperty;
-import org.apache.flink.compiler.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.compiler.dataproperties.RequestedLocalProperties;
-import org.apache.flink.compiler.plan.Channel;
-import org.apache.flink.compiler.plan.SingleInputPlanNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-
-public final class PartialGroupProperties extends OperatorDescriptorSingle {
-       
-       public PartialGroupProperties(FieldSet keys) {
-               super(keys);
-       }
-       
-       @Override
-       public DriverStrategy getStrategy() {
-               return DriverStrategy.SORTED_GROUP_COMBINE;
-       }
-
-       @Override
-       public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
-               // create in input node for combine with same DOP as input node
-               GroupReduceNode combinerNode = new 
GroupReduceNode((GroupReduceOperatorBase<?, ?, ?>) node.getPactContract());
-               
combinerNode.setDegreeOfParallelism(in.getSource().getDegreeOfParallelism());
-
-               SingleInputPlanNode combiner = new 
SingleInputPlanNode(combinerNode, 
"Combine("+node.getPactContract().getName()+")", in,
-                               DriverStrategy.SORTED_GROUP_COMBINE);
-               // sorting key info
-               combiner.setDriverKeyInfo(in.getLocalStrategyKeys(), 
in.getLocalStrategySortOrder(), 0);
-               // set grouping comparator key info
-               combiner.setDriverKeyInfo(this.keyList, 1);
-               
-               return combiner;
-       }
-
-       @Override
-       protected List<RequestedGlobalProperties> 
createPossibleGlobalProperties() {
-               return Collections.singletonList(new 
RequestedGlobalProperties());
-       }
-
-       @Override
-       protected List<RequestedLocalProperties> 
createPossibleLocalProperties() {
-               RequestedLocalProperties props = new RequestedLocalProperties();
-               props.setGroupedFields(this.keys);
-               return Collections.singletonList(props);
-       }
-       
-       @Override
-       public GlobalProperties computeGlobalProperties(GlobalProperties 
gProps) {
-               if (gProps.getUniqueFieldCombination() != null && 
gProps.getUniqueFieldCombination().size() > 0 &&
-                               gProps.getPartitioning() == 
PartitioningProperty.RANDOM_PARTITIONED)
-               {
-                       
gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList());
-               }
-               gProps.clearUniqueFieldCombinations();
-               return gProps;
-       }
-       
-       @Override
-       public LocalProperties computeLocalProperties(LocalProperties lProps) {
-               return lProps.clearUniqueFieldSets();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-compiler/src/main/java/org/apache/flink/compiler/plandump/PlanJSONDumpGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/plandump/PlanJSONDumpGenerator.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/plandump/PlanJSONDumpGenerator.java
index 7728948..ab21b69 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/plandump/PlanJSONDumpGenerator.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/plandump/PlanJSONDumpGenerator.java
@@ -433,7 +433,7 @@ public class PlanJSONDumpGenerator {
                                break;
                        
                        case ALL_GROUP_REDUCE:
-                       case ALL_GROUP_COMBINE:
+                       case ALL_GROUP_REDUCE_COMBINE:
                                locString = "Group Reduce All";
                                break;
                                

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java
index 11ac231..4081a4b 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java
@@ -239,7 +239,7 @@ public class JavaApiPostPass implements OptimizerPostPass {
 
 
                if(javaOp instanceof GroupReduceOperatorBase &&
-                               (source.getDriverStrategy() == 
DriverStrategy.SORTED_GROUP_COMBINE || source.getDriverStrategy() == 
DriverStrategy.ALL_GROUP_COMBINE)) {
+                               (source.getDriverStrategy() == 
DriverStrategy.SORTED_GROUP_COMBINE || source.getDriverStrategy() == 
DriverStrategy.ALL_GROUP_REDUCE_COMBINE)) {
                        GroupReduceOperatorBase<?, ?, ?> groupNode = 
(GroupReduceOperatorBase<?, ?, ?>) javaOp;
                        type = 
groupNode.getInput().getOperatorInfo().getOutputType();
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-compiler/src/test/java/org/apache/flink/compiler/java/GroupReduceCompilationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/compiler/java/GroupReduceCompilationTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/compiler/java/GroupReduceCompilationTest.java
index 454438c..8f2292b 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/compiler/java/GroupReduceCompilationTest.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/compiler/java/GroupReduceCompilationTest.java
@@ -118,7 +118,7 @@ public class GroupReduceCompilationTest extends 
CompilerTestBase implements java
                        
                        // check that both reduce and combiner have the same 
strategy
                        assertEquals(DriverStrategy.ALL_GROUP_REDUCE, 
reduceNode.getDriverStrategy());
-                       assertEquals(DriverStrategy.ALL_GROUP_COMBINE, 
combineNode.getDriverStrategy());
+                       assertEquals(DriverStrategy.ALL_GROUP_REDUCE_COMBINE, 
combineNode.getDriverStrategy());
                        
                        // check DOP
                        assertEquals(8, sourceNode.getDegreeOfParallelism());

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
index 29ebafe..ef52b32 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
@@ -31,9 +31,10 @@ import java.io.Serializable;
  * This special variant of the combine function reduces the group of elements 
into a single element. A variant
  * that can return multiple values per group is defined in {@link 
FlatCombineFunction}.
  * 
- * @param <T> The data type processed by the combine function.
+ * @param <IN> The data type processed by the combine function.
+ * @param <OUT> The data type emitted by the combine function.
  */
-public interface CombineFunction<T> extends Function, Serializable {
+public interface CombineFunction<IN, OUT> extends Function, Serializable {
 
        /**
         * The combine method, called (potentially multiple timed) with 
subgroups of elements.
@@ -44,5 +45,5 @@ public interface CombineFunction<T> extends Function, 
Serializable {
         * @throws Exception The function may throw Exceptions, which will 
cause the program to cancel,
         *                   and may trigger the recovery logic.
         */
-       T combine(Iterable<T> values) throws Exception;
+       OUT combine(Iterable<IN> values) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java
index 53a2edc..b90b3ce 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java
@@ -33,9 +33,10 @@ import org.apache.flink.util.Collector;
  * This special variant of the combine function supports to return more than 
one element per group.
  * It is frequently less efficient to use than the {@link CombineFunction}.
  * 
- * @param <T> The data type processed by the combine function.
+ * @param <IN> The data type processed by the combine function.
+ * @param <OUT> The data type emitted by the combine function.
  */
-public interface FlatCombineFunction<T> extends Function, Serializable {
+public interface FlatCombineFunction<IN, OUT> extends Function, Serializable {
 
        /**
         * The combine method, called (potentially multiple timed) with 
subgroups of elements.
@@ -46,5 +47,5 @@ public interface FlatCombineFunction<T> extends Function, 
Serializable {
         * @throws Exception The function may throw Exceptions, which will 
cause the program to cancel,
         *                   and may trigger the recovery logic.
         */
-       void combine(Iterable<T> values, Collector<T> out) throws Exception;
+       void combine(Iterable<IN> values, Collector<OUT> out) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatCombineFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatCombineFunction.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatCombineFunction.java
index a7e7b70..17aca88 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatCombineFunction.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatCombineFunction.java
@@ -30,12 +30,13 @@ import org.apache.flink.util.Collector;
  * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
  * {@link RichFunction#close()}.
  *
- * @param <T> The data type of the elements to be combined.
+ * @param <IN> The data type of the elements to be combined.
+ * @param <OUT> The resulting data type of the elements to be combined.
  */
-public abstract class RichFlatCombineFunction<T> extends AbstractRichFunction 
implements FlatCombineFunction<T> {
+public abstract class RichFlatCombineFunction<IN, OUT> extends 
AbstractRichFunction implements FlatCombineFunction<IN, OUT> {
 
        private static final long serialVersionUID = 1L;
 
        @Override
-       public abstract void combine(Iterable<T> values, Collector<T> out) 
throws Exception;
+       public abstract void combine(Iterable<IN> values, Collector<OUT> out) 
throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java
index e3b8632..b6c92c2 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java
@@ -34,7 +34,7 @@ import org.apache.flink.util.Collector;
  * @param <IN> Type of the elements that this function processes.
  * @param <OUT> The type of the elements returned by the user-defined function.
  */
-public abstract class RichGroupReduceFunction<IN, OUT> extends 
AbstractRichFunction implements GroupReduceFunction<IN, OUT>, 
FlatCombineFunction<IN> {
+public abstract class RichGroupReduceFunction<IN, OUT> extends 
AbstractRichFunction implements GroupReduceFunction<IN, OUT>, 
FlatCombineFunction<IN, IN> {
        
        private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java
new file mode 100644
index 0000000..2a47c45
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators.base;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.CopyingListCollector;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.SingleInputOperator;
+import org.apache.flink.api.common.operators.UnaryOperatorInformation;
+import org.apache.flink.api.common.operators.util.ListKeyGroupedIterator;
+import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ * Base operator for the combineGroup transformation. It receives the UDF 
GroupCombineOperator as an input.
+ * This class is later processed by the compiler to generate the plan.
+ * @see org.apache.flink.api.common.functions.CombineFunction
+ */
+public class GroupCombineOperatorBase<IN, OUT, FT extends 
FlatCombineFunction<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> {
+
+
+       /** The ordering for the order inside a reduce group. */
+       private Ordering groupOrder;
+
+       public GroupCombineOperatorBase(FT udf, UnaryOperatorInformation<IN, 
OUT> operatorInfo, int[] keyPositions, String name) {
+               super(new UserCodeObjectWrapper<FT>(udf), operatorInfo, 
keyPositions, name);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * Sets the order of the elements within a reduce group.
+        *
+        * @param order The order for the elements in a reduce group.
+        */
+       public void setGroupOrder(Ordering order) {
+               this.groupOrder = order;
+       }
+
+       /**
+        * Gets the order of elements within a reduce group. If no such order 
has been
+        * set, this method returns null.
+        *
+        * @return The secondary order.
+        */
+       public Ordering getGroupOrder() {
+               return this.groupOrder;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       protected List<OUT> executeOnCollections(List<IN> inputData, 
RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
+               FlatCombineFunction<IN, OUT> function = 
this.userFunction.getUserCodeObject();
+
+               UnaryOperatorInformation<IN, OUT> operatorInfo = 
getOperatorInfo();
+               TypeInformation<IN> inputType = operatorInfo.getInputType();
+
+               int[] keyColumns = getKeyColumns(0);
+
+               if (!(inputType instanceof CompositeType) && (keyColumns.length 
> 0 || groupOrder != null)) {
+                       throw new InvalidProgramException("Grouping or 
group-sorting is only possible on composite type.");
+               }
+
+               int[] sortColumns = keyColumns;
+               boolean[] sortOrderings = new boolean[sortColumns.length];
+
+               if (groupOrder != null) {
+                       sortColumns = ArrayUtils.addAll(sortColumns, 
groupOrder.getFieldPositions());
+                       sortOrderings = ArrayUtils.addAll(sortOrderings, 
groupOrder.getFieldSortDirections());
+               }
+
+               if (inputType instanceof CompositeType) {
+                       if(sortColumns.length == 0) { // => all reduce. No 
comparator
+                               
Preconditions.checkArgument(sortOrderings.length == 0);
+                       } else {
+                               final TypeComparator<IN> sortComparator = 
((CompositeType<IN>) inputType).createComparator(sortColumns, sortOrderings, 0, 
executionConfig);
+
+                               Collections.sort(inputData, new 
Comparator<IN>() {
+                                       @Override
+                                       public int compare(IN o1, IN o2) {
+                                               return 
sortComparator.compare(o1, o2);
+                                       }
+                               });
+                       }
+               }
+
+               FunctionUtils.setFunctionRuntimeContext(function, ctx);
+               FunctionUtils.openFunction(function, this.parameters);
+
+               ArrayList<OUT> result = new ArrayList<OUT>();
+
+               if (keyColumns.length == 0) {
+                       final TypeSerializer<IN> inputSerializer = 
inputType.createSerializer(executionConfig);
+                       TypeSerializer<OUT> outSerializer = 
getOperatorInfo().getOutputType().createSerializer(executionConfig);
+                       List<IN> inputDataCopy = new 
ArrayList<IN>(inputData.size());
+                       for (IN in: inputData) {
+                               inputDataCopy.add(inputSerializer.copy(in));
+                       }
+                       CopyingListCollector<OUT> collector = new 
CopyingListCollector<OUT>(result, outSerializer);
+
+                       function.combine(inputDataCopy, collector);
+               } else {
+                       final TypeSerializer<IN> inputSerializer = 
inputType.createSerializer(executionConfig);
+                       boolean[] keyOrderings = new boolean[keyColumns.length];
+                       final TypeComparator<IN> comparator = 
((CompositeType<IN>) inputType).createComparator(keyColumns, keyOrderings, 0, 
executionConfig);
+
+                       ListKeyGroupedIterator<IN> keyedIterator = new 
ListKeyGroupedIterator<IN>(inputData, inputSerializer, comparator);
+
+                       TypeSerializer<OUT> outSerializer = 
getOperatorInfo().getOutputType().createSerializer(executionConfig);
+                       CopyingListCollector<OUT> collector = new 
CopyingListCollector<OUT>(result, outSerializer);
+
+                       while (keyedIterator.nextKey()) {
+                               function.combine(keyedIterator.getValues(), 
collector);
+                       }
+               }
+
+               FunctionUtils.closeFunction(function);
+               return result;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java 
b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index d7e6e94..a884f6d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -24,6 +24,7 @@ import org.apache.commons.lang3.Validate;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatCombineFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.InvalidTypesException;
@@ -58,15 +59,16 @@ import org.apache.flink.api.java.operators.DataSink;
 import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.operators.DistinctOperator;
 import org.apache.flink.api.java.operators.FilterOperator;
-import org.apache.flink.api.java.operators.ProjectOperator;
 import org.apache.flink.api.java.operators.FlatMapOperator;
 import org.apache.flink.api.java.operators.GroupReduceOperator;
+import org.apache.flink.api.java.operators.GroupCombineOperator;
 import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets;
 import org.apache.flink.api.java.operators.Keys;
 import org.apache.flink.api.java.operators.MapOperator;
 import org.apache.flink.api.java.operators.MapPartitionOperator;
 import org.apache.flink.api.java.operators.PartitionOperator;
+import org.apache.flink.api.java.operators.ProjectOperator;
 import org.apache.flink.api.java.operators.ProjectOperator.Projection;
 import org.apache.flink.api.java.operators.ReduceOperator;
 import org.apache.flink.api.java.operators.SortedGrouping;
@@ -459,6 +461,28 @@ public abstract class DataSet<T> {
        }
 
        /**
+        * Applies a CombineFunction on a non-grouped {@link DataSet}.
+        * A CombineFunction is similar to a GroupReduceFunction but does not 
perform a full data exchange. Instead, the
+        * CombineFunction calls the combine method once per partition for 
combining a group of results. This
+        * operator is suitable for combining values into an intermediate 
format before doing a proper groupReduce where
+        * the data is shuffled across the node for further reduction. The 
GroupReduce operator can also be supplied with
+        * a combiner by implementing the RichGroupReduce function. The combine 
method of the RichGroupReduce function
+        * demands input and output type to be the same. The CombineFunction, 
on the other side, can have an arbitrary
+        * output type.
+        * @param combiner The CombineFunction that is applied on the DataSet.
+        * @return A GroupCombineOperator which represents the combined DataSet.
+        */
+       public <R> GroupCombineOperator<T, R> 
combineGroup(FlatCombineFunction<T, R> combiner) {
+               if (combiner == null) {
+                       throw new NullPointerException("GroupReduce function 
must not be null.");
+               }
+
+               String callLocation = Utils.getCallLocationName();
+               TypeInformation<R> resultType = 
TypeExtractor.getGroupCombineReturnTypes(combiner, getType(), callLocation, 
true);
+               return new GroupCombineOperator<T, R>(this, resultType, 
clean(combiner), callLocation);
+       }
+
+       /**
         * Selects an element with minimum value.
         * <p>
         * The minimum is computed over the specified fields in lexicographical 
order.

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java
index 890a0ca..fbb7029 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java
@@ -23,7 +23,7 @@ import 
org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
 import org.apache.flink.util.Collector;
 
 @Combinable
-public class FirstReducer<T> implements GroupReduceFunction<T, T>, 
FlatCombineFunction<T> {
+public class FirstReducer<T> implements GroupReduceFunction<T, T>, 
FlatCombineFunction<T, T> {
        private static final long serialVersionUID = 1L;
 
        private final int count;

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
new file mode 100644
index 0000000..617162b
--- /dev/null
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operators;
+
+import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.UnaryOperatorInformation;
+import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
+import org.apache.flink.api.common.operators.base.MapOperatorBase;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
+import 
org.apache.flink.api.java.operators.translation.PlanUnwrappingGroupCombineOperator;
+import 
org.apache.flink.api.java.operators.translation.PlanUnwrappingSortedGroupCombineOperator;
+import org.apache.flink.api.java.operators.translation.TwoKeyExtractingMapper;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+
+/**
+ * This operator behaves like the GroupReduceOperator with Combine but only 
runs the Combine part which reduces all data
+ * locally in their partitions. The combine part can return an arbitrary data 
type. This is useful to pre-combine values 
+ * into an intermediate representation before applying a proper reduce 
operation.
+ *
+ * @param <IN> The type of the data set consumed by the operator.
+ * @param <OUT> The type of the data set created by the operator.
+ */
+public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, 
OUT, GroupCombineOperator<IN, OUT>> {
+
+       private final FlatCombineFunction<IN, OUT> function;
+
+       private final Grouping<IN> grouper;
+
+       private final String defaultName;
+
+       /**
+        * Constructor for a non-grouped reduce (all reduce).
+        *
+        * @param input The input data set to the groupReduce function.
+        * @param resultType The type information for the resulting type.
+        * @param function The user-defined GroupReduce function.
+        * @param defaultName The operator's name.
+        */
+       public GroupCombineOperator(DataSet<IN> input, TypeInformation<OUT> 
resultType, FlatCombineFunction<IN, OUT> function, String defaultName) {
+               super(input, resultType);
+               this.function = function;
+               this.grouper = null;
+               this.defaultName = defaultName;
+       }
+
+       /**
+        * Constructor for a grouped reduce.
+        *
+        * @param input The grouped input to be processed group-wise by the 
groupReduce function.
+        * @param function The user-defined GroupReduce function.
+        */
+       public GroupCombineOperator(Grouping<IN> input, TypeInformation<OUT> 
resultType, FlatCombineFunction<IN, OUT> function, String defaultName) {
+               super(input != null ? input.getDataSet() : null, resultType);
+
+               this.function = function;
+               this.grouper = input;
+               this.defaultName = defaultName;
+       }
+
+       @Override
+       protected FlatCombineFunction<IN, OUT> getFunction() {
+               return function;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Translation
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       protected GroupCombineOperatorBase<?, OUT, ?> 
translateToDataFlow(Operator<IN> input) {
+
+               String name = getName() != null ? getName() : "GroupCombine at 
" + defaultName;
+
+               // distinguish between grouped reduce and non-grouped reduce
+               if (grouper == null) {
+                       // non grouped reduce
+                       UnaryOperatorInformation<IN, OUT> operatorInfo = new 
UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType());
+                       GroupCombineOperatorBase<IN, OUT, 
FlatCombineFunction<IN, OUT>> po =
+                                       new GroupCombineOperatorBase<IN, OUT, 
FlatCombineFunction<IN, OUT>>(function, operatorInfo, new int[0], name);
+
+                       po.setInput(input);
+                       // the degree of parallelism for a non grouped reduce 
can only be 1
+                       po.setDegreeOfParallelism(1);
+                       return po;
+               }
+
+               if (grouper.getKeys() instanceof Keys.SelectorFunctionKeys) {
+
+                       @SuppressWarnings("unchecked")
+                       Keys.SelectorFunctionKeys<IN, ?> selectorKeys = 
(Keys.SelectorFunctionKeys<IN, ?>) grouper.getKeys();
+
+                       if (grouper instanceof SortedGrouping) {
+                               SortedGrouping<IN> sortedGrouper = 
(SortedGrouping<IN>) grouper;
+                               Keys.SelectorFunctionKeys<IN, ?> sortKeys = 
sortedGrouper.getSortSelectionFunctionKey();
+
+                               PlanUnwrappingSortedGroupCombineOperator<IN, 
OUT, ?, ?> po = translateSelectorFunctionSortedReducer(
+                                               selectorKeys, sortKeys, 
function, getInputType(), getResultType(), name, input);
+
+                               // set group order
+                               int[] sortKeyPositions = 
sortedGrouper.getGroupSortKeyPositions();
+                               Order[] sortOrders = 
sortedGrouper.getGroupSortOrders();
+
+                               Ordering o = new Ordering();
+                               for(int i=0; i < sortKeyPositions.length; i++) {
+                                       o.appendOrdering(sortKeyPositions[i], 
null, sortOrders[i]);
+                               }
+                               po.setGroupOrder(o);
+
+                               
po.setDegreeOfParallelism(this.getParallelism());
+                               return po;
+                       } else {
+                               PlanUnwrappingGroupCombineOperator<IN, OUT, ?> 
po = translateSelectorFunctionReducer(
+                                               selectorKeys, function, 
getInputType(), getResultType(), name, input);
+
+                               
po.setDegreeOfParallelism(this.getParallelism());
+                               return po;
+                       }
+               }
+               else if (grouper.getKeys() instanceof Keys.ExpressionKeys) {
+
+                       int[] logicalKeyPositions = 
grouper.getKeys().computeLogicalKeyPositions();
+                       UnaryOperatorInformation<IN, OUT> operatorInfo = new 
UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType());
+                       GroupCombineOperatorBase<IN, OUT, 
FlatCombineFunction<IN, OUT>> po =
+                                       new GroupCombineOperatorBase<IN, OUT, 
FlatCombineFunction<IN, OUT>>(function, operatorInfo, logicalKeyPositions, 
name);
+
+                       po.setInput(input);
+                       po.setDegreeOfParallelism(getParallelism());
+
+                       // set group order
+                       if (grouper instanceof SortedGrouping) {
+                               SortedGrouping<IN> sortedGrouper = 
(SortedGrouping<IN>) grouper;
+
+                               int[] sortKeyPositions = 
sortedGrouper.getGroupSortKeyPositions();
+                               Order[] sortOrders = 
sortedGrouper.getGroupSortOrders();
+
+                               Ordering o = new Ordering();
+                               for(int i=0; i < sortKeyPositions.length; i++) {
+                                       o.appendOrdering(sortKeyPositions[i], 
null, sortOrders[i]);
+                               }
+                               po.setGroupOrder(o);
+                       }
+
+                       return po;
+               }
+               else {
+                       throw new UnsupportedOperationException("Unrecognized 
key type.");
+               }
+       }
+
+
+       // 
--------------------------------------------------------------------------------------------
+
+       private static <IN, OUT, K> PlanUnwrappingGroupCombineOperator<IN, OUT, 
K> translateSelectorFunctionReducer(
+                       Keys.SelectorFunctionKeys<IN, ?> rawKeys, 
FlatCombineFunction<IN, OUT> function,
+                       TypeInformation<IN> inputType, TypeInformation<OUT> 
outputType, String name, Operator<IN> input)
+       {
+               @SuppressWarnings("unchecked")
+               final Keys.SelectorFunctionKeys<IN, K> keys = 
(Keys.SelectorFunctionKeys<IN, K>) rawKeys;
+
+               TypeInformation<Tuple2<K, IN>> typeInfoWithKey = new 
TupleTypeInfo<Tuple2<K, IN>>(keys.getKeyType(), inputType);
+
+               KeyExtractingMapper<IN, K> extractor = new 
KeyExtractingMapper<IN, K>(keys.getKeyExtractor());
+
+               PlanUnwrappingGroupCombineOperator<IN, OUT, K> reducer = new 
PlanUnwrappingGroupCombineOperator<IN, OUT, K>(function, keys, name, 
outputType, typeInfoWithKey);
+
+               MapOperatorBase<IN, Tuple2<K, IN>, MapFunction<IN, Tuple2<K, 
IN>>> mapper = new MapOperatorBase<IN, Tuple2<K, IN>, MapFunction<IN, Tuple2<K, 
IN>>>(extractor, new UnaryOperatorInformation<IN, Tuple2<K, IN>>(inputType, 
typeInfoWithKey), "Key Extractor");
+
+               reducer.setInput(mapper);
+               mapper.setInput(input);
+
+               // set the mapper's parallelism to the input parallelism to 
make sure it is chained
+               mapper.setDegreeOfParallelism(input.getDegreeOfParallelism());
+
+               return reducer;
+       }
+
+       private static <IN, OUT, K1, K2> 
PlanUnwrappingSortedGroupCombineOperator<IN, OUT, K1, K2> 
translateSelectorFunctionSortedReducer(
+                       Keys.SelectorFunctionKeys<IN, ?> rawGroupingKey, 
Keys.SelectorFunctionKeys<IN, ?> rawSortingKey, FlatCombineFunction<IN, OUT> 
function,
+                       TypeInformation<IN> inputType, TypeInformation<OUT> 
outputType, String name, Operator<IN> input)
+       {
+               @SuppressWarnings("unchecked")
+               final Keys.SelectorFunctionKeys<IN, K1> groupingKey = 
(Keys.SelectorFunctionKeys<IN, K1>) rawGroupingKey;
+
+               @SuppressWarnings("unchecked")
+               final Keys.SelectorFunctionKeys<IN, K2> sortingKey = 
(Keys.SelectorFunctionKeys<IN, K2>) rawSortingKey;
+
+               TypeInformation<Tuple3<K1, K2, IN>> typeInfoWithKey = new 
TupleTypeInfo<Tuple3<K1, K2, IN>>(groupingKey.getKeyType(), 
sortingKey.getKeyType(), inputType);
+
+               TwoKeyExtractingMapper<IN, K1, K2> extractor = new 
TwoKeyExtractingMapper<IN, K1, K2>(groupingKey.getKeyExtractor(), 
sortingKey.getKeyExtractor());
+
+               PlanUnwrappingSortedGroupCombineOperator<IN, OUT, K1, K2> 
reducer = new PlanUnwrappingSortedGroupCombineOperator<IN, OUT, K1, 
K2>(function, groupingKey, sortingKey, name, outputType, typeInfoWithKey);
+
+               MapOperatorBase<IN, Tuple3<K1, K2, IN>, MapFunction<IN, 
Tuple3<K1, K2, IN>>> mapper = new MapOperatorBase<IN, Tuple3<K1, K2, IN>, 
MapFunction<IN, Tuple3<K1, K2, IN>>>(extractor, new 
UnaryOperatorInformation<IN, Tuple3<K1, K2, IN>>(inputType, typeInfoWithKey), 
"Key Extractor");
+
+               reducer.setInput(mapper);
+               mapper.setInput(input);
+
+               // set the mapper's parallelism to the input parallelism to 
make sure it is chained
+               mapper.setDegreeOfParallelism(input.getDegreeOfParallelism());
+
+               return reducer;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
index 38c6c68..b2054bf 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.java.operators;
 
+import org.apache.flink.api.common.functions.FlatCombineFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.Utils;
@@ -156,6 +157,27 @@ public class SortedGrouping<T> extends Grouping<T> {
                return new GroupReduceOperator<T, R>(this, resultType, 
dataSet.clean(reducer), Utils.getCallLocationName() );
        }
 
+       /**
+        * Applies a CombineFunction on a grouped {@link DataSet}.
+        * A CombineFunction is similar to a GroupReduceFunction but does not 
perform a full data exchange. Instead, the
+        * CombineFunction calls the combine method once per partition for 
combining a group of results. This
+        * operator is suitable for combining values into an intermediate 
format before doing a proper groupReduce where
+        * the data is shuffled across the node for further reduction. The 
GroupReduce operator can also be supplied with
+        * a combiner by implementing the RichGroupReduce function. The combine 
method of the RichGroupReduce function
+        * demands input and output type to be the same. The CombineFunction, 
on the other side, can have an arbitrary
+        * output type.
+        * @param combiner The CombineFunction that is applied on the DataSet.
+        * @return A GroupCombineOperator which represents the combined DataSet.
+        */
+       public <R> GroupCombineOperator<T, R> 
combineGroup(FlatCombineFunction<T, R> combiner) {
+               if (combiner == null) {
+                       throw new NullPointerException("GroupReduce function 
must not be null.");
+               }
+               TypeInformation<R> resultType = 
TypeExtractor.getGroupCombineReturnTypes(combiner, this.getDataSet().getType());
+
+               return new GroupCombineOperator<T, R>(this, resultType, 
dataSet.clean(combiner), Utils.getCallLocationName());
+       }
+
        
        /**
         * Returns a new set containing the first n elements in this grouped 
and sorted {@link DataSet}.<br/>

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
index 732c59b..0f3faa0 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.java.operators;
 
 import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.functions.FlatCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.functions.ReduceFunction;
@@ -159,7 +160,28 @@ public class UnsortedGrouping<T> extends Grouping<T> {
 
                return new GroupReduceOperator<T, R>(this, resultType, 
dataSet.clean(reducer), Utils.getCallLocationName());
        }
-       
+
+       /**
+        * Applies a CombineFunction on a grouped {@link DataSet}.
+        * A CombineFunction is similar to a GroupReduceFunction but does not 
perform a full data exchange. Instead, the
+        * CombineFunction calls the combine method once per partition for 
combining a group of results. This
+        * operator is suitable for combining values into an intermediate 
format before doing a proper groupReduce where
+        * the data is shuffled across the node for further reduction. The 
GroupReduce operator can also be supplied with
+        * a combiner by implementing the RichGroupReduce function. The combine 
method of the RichGroupReduce function
+        * demands input and output type to be the same. The CombineFunction, 
on the other side, can have an arbitrary
+        * output type.
+        * @param combiner The CombineFunction that is applied on the DataSet.
+        * @return A GroupCombineOperator which represents the combined DataSet.
+        */
+       public <R> GroupCombineOperator<T, R> 
combineGroup(FlatCombineFunction<T, R> combiner) {
+               if (combiner == null) {
+                       throw new NullPointerException("GroupReduce function 
must not be null.");
+               }
+               TypeInformation<R> resultType = 
TypeExtractor.getGroupCombineReturnTypes(combiner, this.getDataSet().getType());
+
+               return new GroupCombineOperator<T, R>(this, resultType, 
dataSet.clean(combiner), Utils.getCallLocationName());
+       }
+
        /**
         * Returns a new set containing the first n elements in this grouped 
{@link DataSet}.<br/>
         * @param n The desired number of elements for each group.

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java
new file mode 100644
index 0000000..ae4ba11
--- /dev/null
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operators.translation;
+
+import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.operators.UnaryOperatorInformation;
+import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+
+/**
+ * A group combine operator that takes 2-tuples (key-value pairs), and applies 
the group combine operation only
+ * on the unwrapped values.
+ */
+public class PlanUnwrappingGroupCombineOperator<IN, OUT, K> extends 
GroupCombineOperatorBase<Tuple2<K, IN>, OUT, FlatCombineFunction<Tuple2<K, IN>, 
OUT>> {
+
+       public PlanUnwrappingGroupCombineOperator(FlatCombineFunction<IN, OUT> 
udf, Keys.SelectorFunctionKeys<IN, K> key, String name,
+                                                                               
                TypeInformation<OUT> outType, TypeInformation<Tuple2<K, IN>> 
typeInfoWithKey)
+       {
+               super(new TupleUnwrappingGroupCombiner<IN, OUT, K>(udf),
+                               new UnaryOperatorInformation<Tuple2<K, IN>, 
OUT>(typeInfoWithKey, outType), key.computeLogicalKeyPositions(), name);
+               
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       public static final class TupleUnwrappingGroupCombiner<IN, OUT, K> 
extends WrappingFunction<FlatCombineFunction<IN, OUT>>
+               implements FlatCombineFunction<Tuple2<K, IN>, OUT>
+       {
+       
+               private static final long serialVersionUID = 1L;
+               
+               private final TupleUnwrappingIterator<IN, K> iter; 
+               
+               private TupleUnwrappingGroupCombiner(FlatCombineFunction<IN, 
OUT> wrapped) {
+                       super(wrapped);
+                       this.iter = new TupleUnwrappingIterator<IN, K>();
+               }
+       
+       
+               @Override
+               public void combine(Iterable<Tuple2<K, IN>> values, 
Collector<OUT> out) throws Exception {
+                       iter.set(values.iterator());
+                       this.wrappedFunction.combine(iter, out);
+               }
+               
+               @Override
+               public String toString() {
+                       return this.wrappedFunction.toString();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
index e4a041b..1d59a21 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
@@ -47,7 +47,7 @@ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> 
extends GroupReduceOp
        
        @RichGroupReduceFunction.Combinable
        public static final class TupleUnwrappingFlatCombinableGroupReducer<IN, 
OUT, K> extends WrappingFunction<RichGroupReduceFunction<IN, OUT>>
-               implements GroupReduceFunction<Tuple2<K, IN>, OUT>, 
FlatCombineFunction<Tuple2<K, IN>>
+               implements GroupReduceFunction<Tuple2<K, IN>, OUT>, 
FlatCombineFunction<Tuple2<K, IN>, Tuple2<K, IN>>
        {
 
                private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java
new file mode 100644
index 0000000..b3d8470
--- /dev/null
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operators.translation;
+
+import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.operators.UnaryOperatorInformation;
+import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.util.Collector;
+
+/**
+ * A reduce operator that takes 3-tuples (groupKey, sortKey, value), and 
applies the sorted partial group reduce
+ * operation only on the unwrapped values.
+ */
+public class PlanUnwrappingSortedGroupCombineOperator<IN, OUT, K1, K2> extends 
GroupCombineOperatorBase<Tuple3<K1, K2, IN>, OUT, 
FlatCombineFunction<Tuple3<K1, K2, IN>,OUT>> {
+
+       public PlanUnwrappingSortedGroupCombineOperator(FlatCombineFunction<IN, 
OUT> udf, Keys.SelectorFunctionKeys<IN, K1> groupingKey, 
Keys.SelectorFunctionKeys<IN, K2> sortingKey, String name,
+                                                                               
                        TypeInformation<OUT> outType, 
TypeInformation<Tuple3<K1, K2, IN>> typeInfoWithKey)
+       {
+               super(new TupleUnwrappingGroupReducer<IN, OUT, K1, K2>(udf),
+                               new UnaryOperatorInformation<Tuple3<K1, K2, 
IN>, OUT>(typeInfoWithKey, outType),
+                               groupingKey.computeLogicalKeyPositions(), 
+                               name);
+
+       }
+
+       public static final class TupleUnwrappingGroupReducer<IN, OUT, K1, K2> 
extends WrappingFunction<FlatCombineFunction<IN, OUT>>
+                       implements FlatCombineFunction<Tuple3<K1, K2, IN>, OUT>
+       {
+
+               private static final long serialVersionUID = 1L;
+
+               private final Tuple3UnwrappingIterator<IN, K1, K2> iter;
+
+               private TupleUnwrappingGroupReducer(FlatCombineFunction<IN, 
OUT> wrapped) {
+                       super(wrapped);
+                       this.iter = new Tuple3UnwrappingIterator<IN, K1, K2>();
+               }
+
+
+               @Override
+               public void combine(Iterable<Tuple3<K1, K2, IN>> values, 
Collector<OUT> out) throws Exception {
+                       iter.set(values.iterator());
+                       this.wrappedFunction.combine(iter, out);
+               }
+
+               @Override
+               public String toString() {
+                       return this.wrappedFunction.toString();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
index 46d247a..757ff56 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
@@ -47,7 +47,7 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, 
K1, K2> extends Gr
 
        @RichGroupReduceFunction.Combinable
        public static final class TupleUnwrappingFlatCombinableGroupReducer<IN, 
OUT, K1, K2> extends WrappingFunction<RichGroupReduceFunction<IN, OUT>>
-               implements GroupReduceFunction<Tuple3<K1, K2, IN>, OUT>, 
FlatCombineFunction<Tuple3<K1, K2, IN>>
+               implements GroupReduceFunction<Tuple3<K1, K2, IN>, OUT>, 
FlatCombineFunction<Tuple3<K1, K2, IN>, Tuple3<K1, K2, IN>>
        {
 
                private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-java/src/main/java/org/apache/flink/api/java/record/operators/ReduceOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/ReduceOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/ReduceOperator.java
index ff1bd28..875e9c1 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/ReduceOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/ReduceOperator.java
@@ -367,7 +367,7 @@ public class ReduceOperator extends 
GroupReduceOperatorBase<Record, Record, Grou
        
        // 
============================================================================================
        
-       public static class WrappingReduceFunction extends 
WrappingFunction<ReduceFunction> implements GroupReduceFunction<Record, 
Record>, FlatCombineFunction<Record> {
+       public static class WrappingReduceFunction extends 
WrappingFunction<ReduceFunction> implements GroupReduceFunction<Record, 
Record>, FlatCombineFunction<Record, Record> {
                
                private static final long serialVersionUID = 1L;
                

http://git-wip-us.apache.org/repos/asf/flink/blob/e93e0cb8/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index fdfe941..4527aa0 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -34,6 +34,7 @@ import org.apache.avro.specific.SpecificRecordBase;
 import org.apache.commons.lang3.Validate;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.CrossFunction;
+import org.apache.flink.api.common.functions.FlatCombineFunction;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.FoldFunction;
@@ -133,6 +134,16 @@ public class TypeExtractor {
        {
                return getUnaryOperatorReturnType((Function) 
groupReduceInterface, GroupReduceFunction.class, true, true, inType, 
functionName, allowMissing);
        }
+
+       public static <IN, OUT> TypeInformation<OUT> 
getGroupCombineReturnTypes(FlatCombineFunction<IN, OUT> combineInterface, 
TypeInformation<IN> inType) {
+               return getGroupCombineReturnTypes(combineInterface, inType, 
null, false);
+       }
+
+       public static <IN, OUT> TypeInformation<OUT> 
getGroupCombineReturnTypes(FlatCombineFunction<IN, OUT> combineInterface, 
TypeInformation<IN> inType,
+                                                                               
                                                                        String 
functionName, boolean allowMissing)
+       {
+               return getUnaryOperatorReturnType((Function) combineInterface, 
FlatCombineFunction.class, true, true, inType, functionName, allowMissing);
+       }
        
        
        public static <IN1, IN2, OUT> TypeInformation<OUT> 
getFlatJoinReturnTypes(FlatJoinFunction<IN1, IN2, OUT> joinInterface,

Reply via email to