[Flink-1780] Rename FlatCombineFunction to GroupCombineFunction

This closes #530


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/033c69f9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/033c69f9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/033c69f9

Branch: refs/heads/master
Commit: 033c69f9477c6352865e7e0da01296dd778ffe59
Parents: ae04025
Author: Suneel Marthi <[email protected]>
Authored: Tue Mar 24 16:19:32 2015 -0400
Committer: Maximilian Michels <[email protected]>
Committed: Wed Mar 25 17:13:51 2015 +0100

----------------------------------------------------------------------
 docs/dataset_transformations.md                 |  2 +-
 .../api/common/functions/CombineFunction.java   |  2 +-
 .../common/functions/FlatCombineFunction.java   | 51 --------------------
 .../common/functions/GroupCombineFunction.java  | 51 ++++++++++++++++++++
 .../functions/RichFlatCombineFunction.java      | 42 ----------------
 .../functions/RichGroupCombineFunction.java     | 39 +++++++++++++++
 .../functions/RichGroupReduceFunction.java      |  4 +-
 .../base/GroupCombineOperatorBase.java          |  6 +--
 .../operators/base/GroupReduceOperatorBase.java |  8 +--
 .../java/org/apache/flink/api/java/DataSet.java |  4 +-
 .../flink/api/java/functions/FirstReducer.java  |  4 +-
 .../java/operators/GroupCombineOperator.java    | 22 ++++-----
 .../api/java/operators/GroupReduceOperator.java |  6 +--
 .../api/java/operators/SortedGrouping.java      |  4 +-
 .../api/java/operators/UnsortedGrouping.java    |  4 +-
 .../PlanUnwrappingGroupCombineOperator.java     | 12 ++---
 .../PlanUnwrappingReduceGroupOperator.java      | 10 ++--
 ...lanUnwrappingSortedGroupCombineOperator.java | 12 ++---
 ...PlanUnwrappingSortedReduceGroupOperator.java | 10 ++--
 .../java/record/operators/ReduceOperator.java   |  4 +-
 .../flink/api/java/typeutils/TypeExtractor.java | 24 ++++-----
 .../java/record/ReduceWrappingFunctionTest.java | 16 +++---
 .../operators/AllGroupCombineDriver.java        | 16 +++---
 .../runtime/operators/AllGroupReduceDriver.java | 10 ++--
 .../operators/GroupReduceCombineDriver.java     | 20 ++++----
 .../runtime/operators/RegularPactTask.java      | 14 ++++--
 .../SynchronousChainedCombineDriver.java        | 12 ++---
 .../sort/CombiningUnilateralSortMerger.java     | 16 +++---
 .../runtime/blob/BlobCacheSuccessTest.java      | 18 +++----
 .../org/apache/flink/api/scala/DataSet.scala    |  4 +-
 .../apache/flink/api/scala/GroupedDataSet.scala |  6 +--
 .../CustomRankCombiner.java                     |  4 +-
 .../javaApiOperators/GroupCombineITCase.java    | 20 ++++----
 .../scala/operators/GroupCombineITCase.scala    |  6 +--
 34 files changed, 242 insertions(+), 241 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/docs/dataset_transformations.md
----------------------------------------------------------------------
diff --git a/docs/dataset_transformations.md b/docs/dataset_transformations.md
index 2bec61b..3bb3cec 100644
--- a/docs/dataset_transformations.md
+++ b/docs/dataset_transformations.md
@@ -554,7 +554,7 @@ an alternative WordCount implementation. In the 
implementation,
 DataSet<String> input = [..] // The words received as input
 DataSet<String> groupedInput = input.groupBy(0); // group identical words
 
-DataSet<Tuple2<String, Integer>> combinedWords = groupedInput.combineGroup(new 
FlatCombineFunction<String, Tuple2<String, Integer>() {
+DataSet<Tuple2<String, Integer>> combinedWords = groupedInput.combineGroup(new 
GroupCombineFunction<String, Tuple2<String, Integer>() {
 
     public void combine(Iterable<String> words, Collector<Tuple2<String, 
Integer>>) { // combine
         int count = 0;

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
index ef52b32..af115b0 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
@@ -29,7 +29,7 @@ import java.io.Serializable;
  * reduce the data volume earlier, before the entire groups have been 
collected.
  * <p>
  * This special variant of the combine function reduces the group of elements 
into a single element. A variant
- * that can return multiple values per group is defined in {@link 
FlatCombineFunction}.
+ * that can return multiple values per group is defined in {@link 
GroupCombineFunction}.
  * 
  * @param <IN> The data type processed by the combine function.
  * @param <OUT> The data type emitted by the combine function.

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java
deleted file mode 100644
index b90b3ce..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.functions;
-
-import java.io.Serializable;
-
-import org.apache.flink.util.Collector;
-
-/**
- * Generic interface used for combine functions ("combiners"). Combiners act 
as auxiliaries to a {@link GroupReduceFunction}
- * and "pre-reduce" the data. The combine functions typically do not see the 
entire group of elements, but
- * only a sub-group.
- * <p>
- * Combine functions are frequently helpful in increasing the program 
efficiency, because they allow the system to
- * reduce the data volume earlier, before the entire groups have been 
collected.
- * <p>
- * This special variant of the combine function supports to return more than 
one element per group.
- * It is frequently less efficient to use than the {@link CombineFunction}.
- * 
- * @param <IN> The data type processed by the combine function.
- * @param <OUT> The data type emitted by the combine function.
- */
-public interface FlatCombineFunction<IN, OUT> extends Function, Serializable {
-
-       /**
-        * The combine method, called (potentially multiple timed) with 
subgroups of elements.
-        * 
-        * @param values The elements to be combined.
-        * @param out The collector to use to return values from the function.
-        * 
-        * @throws Exception The function may throw Exceptions, which will 
cause the program to cancel,
-        *                   and may trigger the recovery logic.
-        */
-       void combine(Iterable<IN> values, Collector<OUT> out) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupCombineFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupCombineFunction.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupCombineFunction.java
new file mode 100644
index 0000000..c0b153b
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupCombineFunction.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import java.io.Serializable;
+
+import org.apache.flink.util.Collector;
+
+/**
+ * Generic interface used for combine functions ("combiners"). Combiners act 
as auxiliaries to a {@link GroupReduceFunction}
+ * and "pre-reduce" the data. The combine functions typically do not see the 
entire group of elements, but
+ * only a sub-group.
+ * <p>
+ * Combine functions are frequently helpful in increasing the program 
efficiency, because they allow the system to
+ * reduce the data volume earlier, before the entire groups have been 
collected.
+ * <p>
+ * This special variant of the combine function supports to return more than 
one element per group.
+ * It is frequently less efficient to use than the {@link CombineFunction}.
+ * 
+ * @param <IN> The data type processed by the combine function.
+ * @param <OUT> The data type emitted by the combine function.
+ */
+public interface GroupCombineFunction<IN, OUT> extends Function, Serializable {
+
+       /**
+        * The combine method, called (potentially multiple timed) with 
subgroups of elements.
+        *
+        * @param values The elements to be combined.
+        * @param out The collector to use to return values from the function.
+        *
+        * @throws Exception The function may throw Exceptions, which will 
cause the program to cancel,
+        *                   and may trigger the recovery logic.
+        */
+       void combine(Iterable<IN> values, Collector<OUT> out) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatCombineFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatCombineFunction.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatCombineFunction.java
deleted file mode 100644
index 17aca88..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatCombineFunction.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.functions;
-
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.util.Collector;
-
-/**
- * Rich variant of the {@link FlatCombineFunction}. As a {@link RichFunction}, 
it gives access to the
- * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides 
setup and teardown methods:
- * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
- * {@link RichFunction#close()}.
- *
- * @param <IN> The data type of the elements to be combined.
- * @param <OUT> The resulting data type of the elements to be combined.
- */
-public abstract class RichFlatCombineFunction<IN, OUT> extends 
AbstractRichFunction implements FlatCombineFunction<IN, OUT> {
-
-       private static final long serialVersionUID = 1L;
-
-       @Override
-       public abstract void combine(Iterable<IN> values, Collector<OUT> out) 
throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupCombineFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupCombineFunction.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupCombineFunction.java
new file mode 100644
index 0000000..55df232
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupCombineFunction.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+
+import org.apache.flink.util.Collector;
+
+/**
+ * Rich variant of the {@link GroupCombineFunction}. As a {@link 
RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides 
setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
+ *
+ * @param <IN> The data type of the elements to be combined.
+ * @param <OUT> The resulting data type of the elements to be combined.
+ */
+public abstract class RichGroupCombineFunction<IN, OUT> extends 
AbstractRichFunction implements GroupCombineFunction<IN, OUT> {
+
+       private static final long serialVersionUID = 1L;
+
+       @Override
+       public abstract void combine(Iterable<IN> values, Collector<OUT> out) 
throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java
index b6c92c2..48e27d3 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java
@@ -34,7 +34,7 @@ import org.apache.flink.util.Collector;
  * @param <IN> Type of the elements that this function processes.
  * @param <OUT> The type of the elements returned by the user-defined function.
  */
-public abstract class RichGroupReduceFunction<IN, OUT> extends 
AbstractRichFunction implements GroupReduceFunction<IN, OUT>, 
FlatCombineFunction<IN, IN> {
+public abstract class RichGroupReduceFunction<IN, OUT> extends 
AbstractRichFunction implements GroupReduceFunction<IN, OUT>, 
GroupCombineFunction<IN, IN> {
        
        private static final long serialVersionUID = 1L;
 
@@ -83,5 +83,5 @@ public abstract class RichGroupReduceFunction<IN, OUT> 
extends AbstractRichFunct
         */
        @Retention(RetentionPolicy.RUNTIME)
        @Target(ElementType.TYPE)
-       public static @interface Combinable {};
+       public static @interface Combinable {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java
index 2a47c45..27fbc1c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java
@@ -22,7 +22,7 @@ import com.google.common.base.Preconditions;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.CopyingListCollector;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
@@ -46,7 +46,7 @@ import java.util.List;
  * This class is later processed by the compiler to generate the plan.
  * @see org.apache.flink.api.common.functions.CombineFunction
  */
-public class GroupCombineOperatorBase<IN, OUT, FT extends 
FlatCombineFunction<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> {
+public class GroupCombineOperatorBase<IN, OUT, FT extends 
GroupCombineFunction<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> {
 
 
        /** The ordering for the order inside a reduce group. */
@@ -81,7 +81,7 @@ public class GroupCombineOperatorBase<IN, OUT, FT extends 
FlatCombineFunction<IN
 
        @Override
        protected List<OUT> executeOnCollections(List<IN> inputData, 
RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
-               FlatCombineFunction<IN, OUT> function = 
this.userFunction.getUserCodeObject();
+               GroupCombineFunction<IN, OUT> function = 
this.userFunction.getUserCodeObject();
 
                UnaryOperatorInformation<IN, OUT> operatorInfo = 
getOperatorInfo();
                TypeInformation<IN> inputType = operatorInfo.getInputType();

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
index 6d7db89..57f07f3 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
@@ -21,7 +21,7 @@ package org.apache.flink.api.common.operators.base;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.functions.RuntimeContext;
@@ -107,15 +107,15 @@ public class GroupReduceOperatorBase<IN, OUT, FT extends 
GroupReduceFunction<IN,
        /**
         * Marks the group reduce operation as combinable. Combinable 
operations may pre-reduce the
         * data before the actual group reduce operations. Combinable 
user-defined functions
-        * must implement the interface {@link 
org.apache.flink.api.common.functions.FlatCombineFunction}.
+        * must implement the interface {@link GroupCombineFunction}.
         * 
         * @param combinable Flag to mark the group reduce operation as 
combinable.
         */
        public void setCombinable(boolean combinable) {
                // sanity check
-               if (combinable && 
!FlatCombineFunction.class.isAssignableFrom(this.userFunction.getUserCodeClass()))
 {
+               if (combinable && 
!GroupCombineFunction.class.isAssignableFrom(this.userFunction.getUserCodeClass()))
 {
                        throw new IllegalArgumentException("Cannot set a UDF as 
combinable if it does not implement the interface " +
-                                       FlatCombineFunction.class.getName());
+                                       GroupCombineFunction.class.getName());
                } else {
                        this.combinable = combinable;
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java 
b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index ed8d1ca..1e91eeb 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -24,7 +24,7 @@ import org.apache.commons.lang3.Validate;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.InvalidTypesException;
@@ -473,7 +473,7 @@ public abstract class DataSet<T> {
         * @param combiner The CombineFunction that is applied on the DataSet.
         * @return A GroupCombineOperator which represents the combined DataSet.
         */
-       public <R> GroupCombineOperator<T, R> 
combineGroup(FlatCombineFunction<T, R> combiner) {
+       public <R> GroupCombineOperator<T, R> 
combineGroup(GroupCombineFunction<T, R> combiner) {
                if (combiner == null) {
                        throw new NullPointerException("GroupReduce function 
must not be null.");
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java
index fbb7029..a604cc0 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java
@@ -17,13 +17,13 @@
  */
 
 package org.apache.flink.api.java.functions;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import 
org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
 import org.apache.flink.util.Collector;
 
 @Combinable
-public class FirstReducer<T> implements GroupReduceFunction<T, T>, 
FlatCombineFunction<T, T> {
+public class FirstReducer<T> implements GroupReduceFunction<T, T>, 
GroupCombineFunction<T, T> {
        private static final long serialVersionUID = 1L;
 
        private final int count;

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
index 3c1d47c..911c608 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.api.java.operators;
 
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.Order;
@@ -46,7 +46,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo;
  */
 public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, 
OUT, GroupCombineOperator<IN, OUT>> {
 
-       private final FlatCombineFunction<IN, OUT> function;
+       private final GroupCombineFunction<IN, OUT> function;
 
        private final Grouping<IN> grouper;
 
@@ -60,7 +60,7 @@ public class GroupCombineOperator<IN, OUT> extends 
SingleInputUdfOperator<IN, OU
         * @param function The user-defined GroupReduce function.
         * @param defaultName The operator's name.
         */
-       public GroupCombineOperator(DataSet<IN> input, TypeInformation<OUT> 
resultType, FlatCombineFunction<IN, OUT> function, String defaultName) {
+       public GroupCombineOperator(DataSet<IN> input, TypeInformation<OUT> 
resultType, GroupCombineFunction<IN, OUT> function, String defaultName) {
                super(input, resultType);
                this.function = function;
                this.grouper = null;
@@ -73,7 +73,7 @@ public class GroupCombineOperator<IN, OUT> extends 
SingleInputUdfOperator<IN, OU
         * @param input The grouped input to be processed group-wise by the 
groupReduce function.
         * @param function The user-defined GroupReduce function.
         */
-       public GroupCombineOperator(Grouping<IN> input, TypeInformation<OUT> 
resultType, FlatCombineFunction<IN, OUT> function, String defaultName) {
+       public GroupCombineOperator(Grouping<IN> input, TypeInformation<OUT> 
resultType, GroupCombineFunction<IN, OUT> function, String defaultName) {
                super(input != null ? input.getDataSet() : null, resultType);
 
                this.function = function;
@@ -82,7 +82,7 @@ public class GroupCombineOperator<IN, OUT> extends 
SingleInputUdfOperator<IN, OU
        }
 
        @Override
-       protected FlatCombineFunction<IN, OUT> getFunction() {
+       protected GroupCombineFunction<IN, OUT> getFunction() {
                return function;
        }
 
@@ -99,8 +99,8 @@ public class GroupCombineOperator<IN, OUT> extends 
SingleInputUdfOperator<IN, OU
                if (grouper == null) {
                        // non grouped reduce
                        UnaryOperatorInformation<IN, OUT> operatorInfo = new 
UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType());
-                       GroupCombineOperatorBase<IN, OUT, 
FlatCombineFunction<IN, OUT>> po =
-                                       new GroupCombineOperatorBase<IN, OUT, 
FlatCombineFunction<IN, OUT>>(function, operatorInfo, new int[0], name);
+                       GroupCombineOperatorBase<IN, OUT, 
GroupCombineFunction<IN, OUT>> po =
+                                       new GroupCombineOperatorBase<IN, OUT, 
GroupCombineFunction<IN, OUT>>(function, operatorInfo, new int[0], name);
 
                        po.setInput(input);
                        // the parallelism for a non grouped reduce can only be 
1
@@ -144,8 +144,8 @@ public class GroupCombineOperator<IN, OUT> extends 
SingleInputUdfOperator<IN, OU
 
                        int[] logicalKeyPositions = 
grouper.getKeys().computeLogicalKeyPositions();
                        UnaryOperatorInformation<IN, OUT> operatorInfo = new 
UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType());
-                       GroupCombineOperatorBase<IN, OUT, 
FlatCombineFunction<IN, OUT>> po =
-                                       new GroupCombineOperatorBase<IN, OUT, 
FlatCombineFunction<IN, OUT>>(function, operatorInfo, logicalKeyPositions, 
name);
+                       GroupCombineOperatorBase<IN, OUT, 
GroupCombineFunction<IN, OUT>> po =
+                                       new GroupCombineOperatorBase<IN, OUT, 
GroupCombineFunction<IN, OUT>>(function, operatorInfo, logicalKeyPositions, 
name);
 
                        po.setInput(input);
                        po.setParallelism(getParallelism());
@@ -175,7 +175,7 @@ public class GroupCombineOperator<IN, OUT> extends 
SingleInputUdfOperator<IN, OU
        // 
--------------------------------------------------------------------------------------------
 
        private static <IN, OUT, K> PlanUnwrappingGroupCombineOperator<IN, OUT, 
K> translateSelectorFunctionReducer(
-                       Keys.SelectorFunctionKeys<IN, ?> rawKeys, 
FlatCombineFunction<IN, OUT> function,
+                       Keys.SelectorFunctionKeys<IN, ?> rawKeys, 
GroupCombineFunction<IN, OUT> function,
                        TypeInformation<IN> inputType, TypeInformation<OUT> 
outputType, String name, Operator<IN> input)
        {
                @SuppressWarnings("unchecked")
@@ -199,7 +199,7 @@ public class GroupCombineOperator<IN, OUT> extends 
SingleInputUdfOperator<IN, OU
        }
 
        private static <IN, OUT, K1, K2> 
PlanUnwrappingSortedGroupCombineOperator<IN, OUT, K1, K2> 
translateSelectorFunctionSortedReducer(
-                       Keys.SelectorFunctionKeys<IN, ?> rawGroupingKey, 
Keys.SelectorFunctionKeys<IN, ?> rawSortingKey, FlatCombineFunction<IN, OUT> 
function,
+                       Keys.SelectorFunctionKeys<IN, ?> rawGroupingKey, 
Keys.SelectorFunctionKeys<IN, ?> rawSortingKey, GroupCombineFunction<IN, OUT> 
function,
                        TypeInformation<IN> inputType, TypeInformation<OUT> 
outputType, String name, Operator<IN> input)
        {
                @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
index e809623..30f2cc4 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.api.java.operators;
 
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.Operator;
@@ -88,7 +88,7 @@ public class GroupReduceOperator<IN, OUT> extends 
SingleInputUdfOperator<IN, OUT
        }
 
        private void checkCombinability() {
-               if (function instanceof FlatCombineFunction &&
+               if (function instanceof GroupCombineFunction &&
                                
function.getClass().getAnnotation(RichGroupReduceFunction.Combinable.class) != 
null) {
                        this.combinable = true;
                }
@@ -111,7 +111,7 @@ public class GroupReduceOperator<IN, OUT> extends 
SingleInputUdfOperator<IN, OUT
        
        public GroupReduceOperator<IN, OUT> setCombinable(boolean combinable) {
                // sanity check that the function is a subclass of the combine 
interface
-               if (combinable && !(function instanceof FlatCombineFunction)) {
+               if (combinable && !(function instanceof GroupCombineFunction)) {
                        throw new IllegalArgumentException("The function does 
not implement the combine interface.");
                }
                

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
index b2054bf..287bf82 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.api.java.operators;
 
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.Utils;
@@ -169,7 +169,7 @@ public class SortedGrouping<T> extends Grouping<T> {
         * @param combiner The CombineFunction that is applied on the DataSet.
         * @return A GroupCombineOperator which represents the combined DataSet.
         */
-       public <R> GroupCombineOperator<T, R> 
combineGroup(FlatCombineFunction<T, R> combiner) {
+       public <R> GroupCombineOperator<T, R> 
combineGroup(GroupCombineFunction<T, R> combiner) {
                if (combiner == null) {
                        throw new NullPointerException("GroupReduce function 
must not be null.");
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
index 0f3faa0..319a599 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
@@ -19,7 +19,7 @@
 package org.apache.flink.api.java.operators;
 
 import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.functions.ReduceFunction;
@@ -173,7 +173,7 @@ public class UnsortedGrouping<T> extends Grouping<T> {
         * @param combiner The CombineFunction that is applied on the DataSet.
         * @return A GroupCombineOperator which represents the combined DataSet.
         */
-       public <R> GroupCombineOperator<T, R> 
combineGroup(FlatCombineFunction<T, R> combiner) {
+       public <R> GroupCombineOperator<T, R> 
combineGroup(GroupCombineFunction<T, R> combiner) {
                if (combiner == null) {
                        throw new NullPointerException("GroupReduce function 
must not be null.");
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java
index ae4ba11..c8e40ce 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.api.java.operators.translation;
 
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -30,9 +30,9 @@ import org.apache.flink.util.Collector;
  * A group combine operator that takes 2-tuples (key-value pairs), and applies 
the group combine operation only
  * on the unwrapped values.
  */
-public class PlanUnwrappingGroupCombineOperator<IN, OUT, K> extends 
GroupCombineOperatorBase<Tuple2<K, IN>, OUT, FlatCombineFunction<Tuple2<K, IN>, 
OUT>> {
+public class PlanUnwrappingGroupCombineOperator<IN, OUT, K> extends 
GroupCombineOperatorBase<Tuple2<K, IN>, OUT, GroupCombineFunction<Tuple2<K, 
IN>, OUT>> {
 
-       public PlanUnwrappingGroupCombineOperator(FlatCombineFunction<IN, OUT> 
udf, Keys.SelectorFunctionKeys<IN, K> key, String name,
+       public PlanUnwrappingGroupCombineOperator(GroupCombineFunction<IN, OUT> 
udf, Keys.SelectorFunctionKeys<IN, K> key, String name,
                                                                                
                TypeInformation<OUT> outType, TypeInformation<Tuple2<K, IN>> 
typeInfoWithKey)
        {
                super(new TupleUnwrappingGroupCombiner<IN, OUT, K>(udf),
@@ -42,15 +42,15 @@ public class PlanUnwrappingGroupCombineOperator<IN, OUT, K> 
extends GroupCombine
        
        // 
--------------------------------------------------------------------------------------------
        
-       public static final class TupleUnwrappingGroupCombiner<IN, OUT, K> 
extends WrappingFunction<FlatCombineFunction<IN, OUT>>
-               implements FlatCombineFunction<Tuple2<K, IN>, OUT>
+       public static final class TupleUnwrappingGroupCombiner<IN, OUT, K> 
extends WrappingFunction<GroupCombineFunction<IN, OUT>>
+               implements GroupCombineFunction<Tuple2<K, IN>, OUT>
        {
        
                private static final long serialVersionUID = 1L;
                
                private final TupleUnwrappingIterator<IN, K> iter; 
                
-               private TupleUnwrappingGroupCombiner(FlatCombineFunction<IN, 
OUT> wrapped) {
+               private TupleUnwrappingGroupCombiner(GroupCombineFunction<IN, 
OUT> wrapped) {
                        super(wrapped);
                        this.iter = new TupleUnwrappingIterator<IN, K>();
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
index 1d59a21..e01af50 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.api.java.operators.translation;
 
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
@@ -37,7 +37,7 @@ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> 
extends GroupReduceOp
        public PlanUnwrappingReduceGroupOperator(GroupReduceFunction<IN, OUT> 
udf, Keys.SelectorFunctionKeys<IN, K> key, String name,
                        TypeInformation<OUT> outType, TypeInformation<Tuple2<K, 
IN>> typeInfoWithKey, boolean combinable)
        {
-               super(combinable ? new 
TupleUnwrappingFlatCombinableGroupReducer<IN, OUT, 
K>((RichGroupReduceFunction<IN, OUT>) udf) : new 
TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K>(udf),
+               super(combinable ? new 
TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, 
K>((RichGroupReduceFunction<IN, OUT>) udf) : new 
TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K>(udf),
                                new UnaryOperatorInformation<Tuple2<K, IN>, 
OUT>(typeInfoWithKey, outType), key.computeLogicalKeyPositions(), name);
                
                super.setCombinable(combinable);
@@ -46,8 +46,8 @@ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> 
extends GroupReduceOp
        // 
--------------------------------------------------------------------------------------------
        
        @RichGroupReduceFunction.Combinable
-       public static final class TupleUnwrappingFlatCombinableGroupReducer<IN, 
OUT, K> extends WrappingFunction<RichGroupReduceFunction<IN, OUT>>
-               implements GroupReduceFunction<Tuple2<K, IN>, OUT>, 
FlatCombineFunction<Tuple2<K, IN>, Tuple2<K, IN>>
+       public static final class 
TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K> extends 
WrappingFunction<RichGroupReduceFunction<IN, OUT>>
+               implements GroupReduceFunction<Tuple2<K, IN>, OUT>, 
GroupCombineFunction<Tuple2<K, IN>, Tuple2<K, IN>>
        {
 
                private static final long serialVersionUID = 1L;
@@ -55,7 +55,7 @@ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> 
extends GroupReduceOp
                private TupleUnwrappingIterator<IN, K> iter;
                private TupleWrappingCollector<IN, K> coll; 
                
-               private 
TupleUnwrappingFlatCombinableGroupReducer(RichGroupReduceFunction<IN, OUT> 
wrapped) {
+               private 
TupleUnwrappingGroupCombinableGroupReducer(RichGroupReduceFunction<IN, OUT> 
wrapped) {
                        super(wrapped);
                        this.iter = new TupleUnwrappingIterator<IN, K>();
                        this.coll = new TupleWrappingCollector<IN, 
K>(this.iter);

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java
index b3d8470..e52a5c4 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.api.java.operators.translation;
 
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -30,9 +30,9 @@ import org.apache.flink.util.Collector;
  * A reduce operator that takes 3-tuples (groupKey, sortKey, value), and 
applies the sorted partial group reduce
  * operation only on the unwrapped values.
  */
-public class PlanUnwrappingSortedGroupCombineOperator<IN, OUT, K1, K2> extends 
GroupCombineOperatorBase<Tuple3<K1, K2, IN>, OUT, 
FlatCombineFunction<Tuple3<K1, K2, IN>,OUT>> {
+public class PlanUnwrappingSortedGroupCombineOperator<IN, OUT, K1, K2> extends 
GroupCombineOperatorBase<Tuple3<K1, K2, IN>, OUT, 
GroupCombineFunction<Tuple3<K1, K2, IN>,OUT>> {
 
-       public PlanUnwrappingSortedGroupCombineOperator(FlatCombineFunction<IN, 
OUT> udf, Keys.SelectorFunctionKeys<IN, K1> groupingKey, 
Keys.SelectorFunctionKeys<IN, K2> sortingKey, String name,
+       public 
PlanUnwrappingSortedGroupCombineOperator(GroupCombineFunction<IN, OUT> udf, 
Keys.SelectorFunctionKeys<IN, K1> groupingKey, Keys.SelectorFunctionKeys<IN, 
K2> sortingKey, String name,
                                                                                
                        TypeInformation<OUT> outType, 
TypeInformation<Tuple3<K1, K2, IN>> typeInfoWithKey)
        {
                super(new TupleUnwrappingGroupReducer<IN, OUT, K1, K2>(udf),
@@ -42,15 +42,15 @@ public class PlanUnwrappingSortedGroupCombineOperator<IN, 
OUT, K1, K2> extends G
 
        }
 
-       public static final class TupleUnwrappingGroupReducer<IN, OUT, K1, K2> 
extends WrappingFunction<FlatCombineFunction<IN, OUT>>
-                       implements FlatCombineFunction<Tuple3<K1, K2, IN>, OUT>
+       public static final class TupleUnwrappingGroupReducer<IN, OUT, K1, K2> 
extends WrappingFunction<GroupCombineFunction<IN, OUT>>
+                       implements GroupCombineFunction<Tuple3<K1, K2, IN>, OUT>
        {
 
                private static final long serialVersionUID = 1L;
 
                private final Tuple3UnwrappingIterator<IN, K1, K2> iter;
 
-               private TupleUnwrappingGroupReducer(FlatCombineFunction<IN, 
OUT> wrapped) {
+               private TupleUnwrappingGroupReducer(GroupCombineFunction<IN, 
OUT> wrapped) {
                        super(wrapped);
                        this.iter = new Tuple3UnwrappingIterator<IN, K1, K2>();
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
index 757ff56..63ebfa4 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.api.java.operators.translation;
 
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
@@ -37,7 +37,7 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, 
K1, K2> extends Gr
        public PlanUnwrappingSortedReduceGroupOperator(GroupReduceFunction<IN, 
OUT> udf, Keys.SelectorFunctionKeys<IN, K1> groupingKey, 
Keys.SelectorFunctionKeys<IN, K2> sortingKey, String name,
                                                                                
        TypeInformation<OUT> outType, TypeInformation<Tuple3<K1, K2, IN>> 
typeInfoWithKey, boolean combinable)
        {
-               super(combinable ? new 
TupleUnwrappingFlatCombinableGroupReducer<IN, OUT, K1, 
K2>((RichGroupReduceFunction<IN, OUT>) udf) : new 
TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K1, K2>(udf),
+               super(combinable ? new 
TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K1, 
K2>((RichGroupReduceFunction<IN, OUT>) udf) : new 
TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K1, K2>(udf),
                        new UnaryOperatorInformation<Tuple3<K1, K2, IN>, 
OUT>(typeInfoWithKey, outType), groupingKey.computeLogicalKeyPositions(), name);
 
                super.setCombinable(combinable);
@@ -46,8 +46,8 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, 
K1, K2> extends Gr
        // 
--------------------------------------------------------------------------------------------
 
        @RichGroupReduceFunction.Combinable
-       public static final class TupleUnwrappingFlatCombinableGroupReducer<IN, 
OUT, K1, K2> extends WrappingFunction<RichGroupReduceFunction<IN, OUT>>
-               implements GroupReduceFunction<Tuple3<K1, K2, IN>, OUT>, 
FlatCombineFunction<Tuple3<K1, K2, IN>, Tuple3<K1, K2, IN>>
+       public static final class 
TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K1, K2> extends 
WrappingFunction<RichGroupReduceFunction<IN, OUT>>
+               implements GroupReduceFunction<Tuple3<K1, K2, IN>, OUT>, 
GroupCombineFunction<Tuple3<K1, K2, IN>, Tuple3<K1, K2, IN>>
        {
 
                private static final long serialVersionUID = 1L;
@@ -55,7 +55,7 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, 
K1, K2> extends Gr
                private Tuple3UnwrappingIterator<IN, K1, K2> iter;
                private Tuple3WrappingCollector<IN, K1, K2> coll;
 
-               private 
TupleUnwrappingFlatCombinableGroupReducer(RichGroupReduceFunction<IN, OUT> 
wrapped) {
+               private 
TupleUnwrappingGroupCombinableGroupReducer(RichGroupReduceFunction<IN, OUT> 
wrapped) {
                        super(wrapped);
                        this.iter = new Tuple3UnwrappingIterator<IN, K1, K2>();
                        this.coll = new Tuple3WrappingCollector<IN, K1, 
K2>(this.iter);

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-java/src/main/java/org/apache/flink/api/java/record/operators/ReduceOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/ReduceOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/ReduceOperator.java
index 875e9c1..1866fea 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/ReduceOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/ReduceOperator.java
@@ -32,7 +32,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.lang3.Validate;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.Ordering;
@@ -367,7 +367,7 @@ public class ReduceOperator extends 
GroupReduceOperatorBase<Record, Record, Grou
        
        // 
============================================================================================
        
-       public static class WrappingReduceFunction extends 
WrappingFunction<ReduceFunction> implements GroupReduceFunction<Record, 
Record>, FlatCombineFunction<Record, Record> {
+       public static class WrappingReduceFunction extends 
WrappingFunction<ReduceFunction> implements GroupReduceFunction<Record, 
Record>, GroupCombineFunction<Record, Record> {
                
                private static final long serialVersionUID = 1L;
                

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 4527aa0..ae6063a 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -34,7 +34,7 @@ import org.apache.avro.specific.SpecificRecordBase;
 import org.apache.commons.lang3.Validate;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.CrossFunction;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.FoldFunction;
@@ -135,14 +135,14 @@ public class TypeExtractor {
                return getUnaryOperatorReturnType((Function) 
groupReduceInterface, GroupReduceFunction.class, true, true, inType, 
functionName, allowMissing);
        }
 
-       public static <IN, OUT> TypeInformation<OUT> 
getGroupCombineReturnTypes(FlatCombineFunction<IN, OUT> combineInterface, 
TypeInformation<IN> inType) {
+       public static <IN, OUT> TypeInformation<OUT> 
getGroupCombineReturnTypes(GroupCombineFunction<IN, OUT> combineInterface, 
TypeInformation<IN> inType) {
                return getGroupCombineReturnTypes(combineInterface, inType, 
null, false);
        }
 
-       public static <IN, OUT> TypeInformation<OUT> 
getGroupCombineReturnTypes(FlatCombineFunction<IN, OUT> combineInterface, 
TypeInformation<IN> inType,
+       public static <IN, OUT> TypeInformation<OUT> 
getGroupCombineReturnTypes(GroupCombineFunction<IN, OUT> combineInterface, 
TypeInformation<IN> inType,
                                                                                
                                                                        String 
functionName, boolean allowMissing)
        {
-               return getUnaryOperatorReturnType((Function) combineInterface, 
FlatCombineFunction.class, true, true, inType, functionName, allowMissing);
+               return getUnaryOperatorReturnType((Function) combineInterface, 
GroupCombineFunction.class, true, true, inType, functionName, allowMissing);
        }
        
        
@@ -600,7 +600,7 @@ public class TypeExtractor {
                // the input is a tuple
                else if (inTypeInfo instanceof TupleTypeInfo && 
isClassType(inType) 
                                && 
Tuple.class.isAssignableFrom(typeToClass(inType))) {
-                       ParameterizedType tupleBaseClass = null;
+                       ParameterizedType tupleBaseClass;
                        
                        // get tuple from possible tuple subclass
                        while (!(isClassType(inType) && 
typeToClass(inType).getSuperclass().equals(Tuple.class))) {
@@ -737,7 +737,7 @@ public class TypeExtractor {
                        // check for basic type
                        if (typeInfo.isBasicType()) {
                                
-                               TypeInformation<?> actual = null;
+                               TypeInformation<?> actual;
                                // check if basic type at all
                                if (!(type instanceof Class<?>) || (actual = 
BasicTypeInfo.getInfoFor((Class<?>) type)) == null) {
                                        throw new InvalidTypesException("Basic 
type expected.");
@@ -792,7 +792,7 @@ public class TypeExtractor {
                                }
                                
                                // check writable type contents
-                               Class<?> clazz = null;
+                               Class<?> clazz;
                                if (((WritableTypeInfo<?>) 
typeInfo).getTypeClass() != (clazz = (Class<?>) type)) {
                                        throw new 
InvalidTypesException("Writable type '"
                                                        + 
((WritableTypeInfo<?>) typeInfo).getTypeClass().getCanonicalName() + "' 
expected but was '"
@@ -801,7 +801,7 @@ public class TypeExtractor {
                        }
                        // check for primitive array
                        else if (typeInfo instanceof PrimitiveArrayTypeInfo) {
-                               Type component = null;
+                               Type component;
                                // check if array at all
                                if (!(type instanceof Class<?> && ((Class<?>) 
type).isArray() && (component = ((Class<?>) type).getComponentType()) != null)
                                                && !(type instanceof 
GenericArrayType && (component = ((GenericArrayType) 
type).getGenericComponentType()) != null)) {
@@ -819,7 +819,7 @@ public class TypeExtractor {
                        }
                        // check for basic array
                        else if (typeInfo instanceof BasicArrayTypeInfo<?, ?>) {
-                               Type component = null;
+                               Type component;
                                // check if array at all
                                if (!(type instanceof Class<?> && ((Class<?>) 
type).isArray() && (component = ((Class<?>) type).getComponentType()) != null)
                                                && !(type instanceof 
GenericArrayType && (component = ((GenericArrayType) 
type).getGenericComponentType()) != null)) {
@@ -844,7 +844,7 @@ public class TypeExtractor {
                                }
                                
                                // check component
-                               Type component = null;
+                               Type component;
                                if (type instanceof Class<?>) {
                                        component = ((Class<?>) 
type).getComponentType();
                                } else {
@@ -1428,8 +1428,8 @@ public class TypeExtractor {
                if (!(t1 instanceof TypeVariable) || !(t2 instanceof 
TypeVariable)) {
                        return false;
                }
-               return ((TypeVariable<?>) 
t1).getName().equals(((TypeVariable<?>)t2).getName())
-                               && ((TypeVariable<?>) 
t1).getGenericDeclaration().equals(((TypeVariable<?>)t2).getGenericDeclaration());
+               return ((TypeVariable<?>) 
t1).getName().equals(((TypeVariable<?>) t2).getName())
+                               && ((TypeVariable<?>) 
t1).getGenericDeclaration().equals(((TypeVariable<?>) 
t2).getGenericDeclaration());
        }
        
        private static TypeInformation<?> getTypeOfPojoField(TypeInformation<?> 
pojoInfo, Field field) {

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java
index 2216217..89baa98 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java
@@ -18,15 +18,12 @@
 
 package org.apache.flink.api.java.record;
 
-import static org.junit.Assert.*;
-
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.commons.lang3.SerializationUtils;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
@@ -43,6 +40,11 @@ import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 @SuppressWarnings({ "serial", "deprecation" })
 public class ReduceWrappingFunctionTest {
 
@@ -86,7 +88,7 @@ public class ReduceWrappingFunctionTest {
                        target.clear();
                        
                        // test combine
-                       ((FlatCombineFunction<Record, Record>) 
reducer).combine(source, collector);
+                       ((GroupCombineFunction<Record, Record>) 
reducer).combine(source, collector);
                        assertEquals(2, target.size());
                        assertEquals(new IntValue(42), 
target.get(0).getField(0, IntValue.class));
                        assertEquals(new LongValue(11), 
target.get(0).getField(1, LongValue.class));
@@ -138,7 +140,7 @@ public class ReduceWrappingFunctionTest {
                        target.clear();
                        
                        // test combine
-                       ((FlatCombineFunction<Record, Record>) 
reducer).combine(source, collector);
+                       ((GroupCombineFunction<Record, Record>) 
reducer).combine(source, collector);
                        assertEquals(2, target.size());
                        assertEquals(new IntValue(42), 
target.get(0).getField(0, IntValue.class));
                        assertEquals(new LongValue(11), 
target.get(0).getField(1, LongValue.class));
@@ -227,5 +229,5 @@ public class ReduceWrappingFunctionTest {
                        methodCounter.incrementAndGet();
                        super.open(parameters);
                }
-       };
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java
index 7d87a6b..7b279ee 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java
@@ -20,7 +20,7 @@
 package org.apache.flink.runtime.operators;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.runtime.util.NonReusingMutableToRegularIteratorWrapper;
@@ -35,20 +35,20 @@ import org.slf4j.LoggerFactory;
 * Like @org.apache.flink.runtime.operators.GroupCombineDriver but without 
grouping and sorting. May emit partially
 * reduced results.
 *
-* @see org.apache.flink.api.common.functions.FlatCombineFunction
+* @see GroupCombineFunction
 */
-public class AllGroupCombineDriver<IN, OUT> implements 
PactDriver<FlatCombineFunction<IN, OUT>, OUT> {
+public class AllGroupCombineDriver<IN, OUT> implements 
PactDriver<GroupCombineFunction<IN, OUT>, OUT> {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(AllGroupCombineDriver.class);
 
-       private PactTaskContext<FlatCombineFunction<IN, OUT>, OUT> taskContext;
+       private PactTaskContext<GroupCombineFunction<IN, OUT>, OUT> taskContext;
 
        private boolean objectReuseEnabled = false;
 
        // 
------------------------------------------------------------------------
 
        @Override
-       public void setup(PactTaskContext<FlatCombineFunction<IN, OUT>, OUT> 
context) {
+       public void setup(PactTaskContext<GroupCombineFunction<IN, OUT>, OUT> 
context) {
                this.taskContext = context;
        }
 
@@ -58,9 +58,9 @@ public class AllGroupCombineDriver<IN, OUT> implements 
PactDriver<FlatCombineFun
        }
 
        @Override
-       public Class<FlatCombineFunction<IN, OUT>> getStubType() {
+       public Class<GroupCombineFunction<IN, OUT>> getStubType() {
                @SuppressWarnings("unchecked")
-               final Class<FlatCombineFunction<IN, OUT>> clazz = 
(Class<FlatCombineFunction<IN, OUT>>) (Class<?>) FlatCombineFunction.class;
+               final Class<GroupCombineFunction<IN, OUT>> clazz = 
(Class<GroupCombineFunction<IN, OUT>>) (Class<?>) GroupCombineFunction.class;
                return clazz;
        }
 
@@ -95,7 +95,7 @@ public class AllGroupCombineDriver<IN, OUT> implements 
PactDriver<FlatCombineFun
                TypeSerializer<IN> serializer = 
serializerFactory.getSerializer();
 
                final MutableObjectIterator<IN> in = 
this.taskContext.getInput(0);
-               final FlatCombineFunction<IN, OUT> reducer = 
this.taskContext.getStub();
+               final GroupCombineFunction<IN, OUT> reducer = 
this.taskContext.getStub();
                final Collector<OUT> output = 
this.taskContext.getOutputCollector();
 
                if (objectReuseEnabled) {

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
index ad1afdb..a20fddf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.runtime.util.NonReusingMutableToRegularIteratorWrapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.operators.util.TaskConfig;
@@ -92,8 +92,8 @@ public class AllGroupReduceDriver<IT, OT> implements 
PactDriver<GroupReduceFunct
 
                switch (this.strategy) {
                        case ALL_GROUP_REDUCE_COMBINE:
-                               if (!(this.taskContext.getStub() instanceof 
FlatCombineFunction)) {
-                                       throw new Exception("Using combiner on 
a UDF that does not implement the combiner interface " + 
FlatCombineFunction.class.getName());
+                               if (!(this.taskContext.getStub() instanceof 
GroupCombineFunction)) {
+                                       throw new Exception("Using combiner on 
a UDF that does not implement the combiner interface " + 
GroupCombineFunction.class.getName());
                                }
                        case ALL_GROUP_REDUCE:
                        case ALL_GROUP_COMBINE:
@@ -129,7 +129,7 @@ public class AllGroupReduceDriver<IT, OT> implements 
PactDriver<GroupReduceFunct
                                        final Collector<OT> output = 
this.taskContext.getOutputCollector();
                                        reducer.reduce(inIter, output);
                                } else if (strategy == 
DriverStrategy.ALL_GROUP_REDUCE_COMBINE || strategy == 
DriverStrategy.ALL_GROUP_COMBINE) {
-                                       @SuppressWarnings("unchecked") final 
FlatCombineFunction<IT, OT> combiner = (FlatCombineFunction<IT, OT>) 
this.taskContext.getStub();
+                                       @SuppressWarnings("unchecked") final 
GroupCombineFunction<IT, OT> combiner = (GroupCombineFunction<IT, OT>) 
this.taskContext.getStub();
                                        final Collector<OT> output = 
this.taskContext.getOutputCollector();
                                        combiner.combine(inIter, output);
                                } else {
@@ -147,7 +147,7 @@ public class AllGroupReduceDriver<IT, OT> implements 
PactDriver<GroupReduceFunct
                                        final Collector<OT> output = 
this.taskContext.getOutputCollector();
                                        reducer.reduce(inIter, output);
                                } else if (strategy == 
DriverStrategy.ALL_GROUP_REDUCE_COMBINE || strategy == 
DriverStrategy.ALL_GROUP_COMBINE) {
-                                       @SuppressWarnings("unchecked") final 
FlatCombineFunction<IT, OT> combiner = (FlatCombineFunction<IT, OT>) 
this.taskContext.getStub();
+                                       @SuppressWarnings("unchecked") final 
GroupCombineFunction<IT, OT> combiner = (GroupCombineFunction<IT, OT>) 
this.taskContext.getStub();
                                        final Collector<OT> output = 
this.taskContext.getOutputCollector();
                                        combiner.combine(inIter, output);
                                } else {

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
index dacd436..493eb4f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
@@ -45,7 +45,7 @@ import java.util.List;
  * the user supplied a RichGroupReduceFunction with a combine method. The 
combining is performed in memory with a
  * lazy approach which only combines elements which currently fit in the 
sorter. This may lead to a partial solution.
  * In the case of the RichGroupReduceFunction this partial result is 
transformed into a proper deterministic result.
- * The CombineGroup uses the FlatCombineFunction interface which allows to 
combine values of type <IN> to any type
+ * The CombineGroup uses the GroupCombineFunction interface which allows to 
combine values of type <IN> to any type
  * of type <OUT>. In contrast, the RichGroupReduceFunction requires the 
combine method to have the same input and
  * output type to be able to reduce the elements after the combine from <IN> 
to <OUT>.
  *
@@ -54,18 +54,18 @@ import java.util.List;
  * @param <IN> The data type consumed by the combiner.
  * @param <OUT> The data type produced by the combiner.
  */
-public class GroupReduceCombineDriver<IN, OUT> implements 
PactDriver<FlatCombineFunction<IN, OUT>, OUT> {
+public class GroupReduceCombineDriver<IN, OUT> implements 
PactDriver<GroupCombineFunction<IN, OUT>, OUT> {
        
        private static final Logger LOG = 
LoggerFactory.getLogger(GroupReduceCombineDriver.class);
 
        /** Fix length records with a length below this threshold will be 
in-place sorted, if possible. */
        private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
 
-       private PactTaskContext<FlatCombineFunction<IN, OUT>, OUT> taskContext;
+       private PactTaskContext<GroupCombineFunction<IN, OUT>, OUT> taskContext;
 
        private InMemorySorter<IN> sorter;
 
-       private FlatCombineFunction<IN, OUT> combiner;
+       private GroupCombineFunction<IN, OUT> combiner;
 
        private TypeSerializer<IN> serializer;
 
@@ -86,7 +86,7 @@ public class GroupReduceCombineDriver<IN, OUT> implements 
PactDriver<FlatCombine
        // 
------------------------------------------------------------------------
 
        @Override
-       public void setup(PactTaskContext<FlatCombineFunction<IN, OUT>, OUT> 
context) {
+       public void setup(PactTaskContext<GroupCombineFunction<IN, OUT>, OUT> 
context) {
                this.taskContext = context;
                this.running = true;
        }
@@ -97,9 +97,9 @@ public class GroupReduceCombineDriver<IN, OUT> implements 
PactDriver<FlatCombine
        }
        
        @Override
-       public Class<FlatCombineFunction<IN, OUT>> getStubType() {
+       public Class<GroupCombineFunction<IN, OUT>> getStubType() {
                @SuppressWarnings("unchecked")
-               final Class<FlatCombineFunction<IN, OUT>> clazz = 
(Class<FlatCombineFunction<IN, OUT>>) (Class<?>) FlatCombineFunction.class;
+               final Class<GroupCombineFunction<IN, OUT>> clazz = 
(Class<GroupCombineFunction<IN, OUT>>) (Class<?>) GroupCombineFunction.class;
                return clazz;
        }
 
@@ -188,7 +188,7 @@ public class GroupReduceCombineDriver<IN, OUT> implements 
PactDriver<FlatCombine
                                final ReusingKeyGroupedIterator<IN> keyIter = 
                                                new 
ReusingKeyGroupedIterator<IN>(sorter.getIterator(), this.serializer, 
this.groupingComparator);
 
-                               final FlatCombineFunction<IN, OUT> combiner = 
this.combiner;
+                               final GroupCombineFunction<IN, OUT> combiner = 
this.combiner;
                                final Collector<OUT> output = this.output;
 
                                // iterate over key groups
@@ -203,7 +203,7 @@ public class GroupReduceCombineDriver<IN, OUT> implements 
PactDriver<FlatCombine
                                final NonReusingKeyGroupedIterator<IN> keyIter 
= 
                                                new 
NonReusingKeyGroupedIterator<IN>(sorter.getIterator(), this.groupingComparator);
 
-                               final FlatCombineFunction<IN, OUT> combiner = 
this.combiner;
+                               final GroupCombineFunction<IN, OUT> combiner = 
this.combiner;
                                final Collector<OUT> output = this.output;
 
                                // iterate over key groups

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index ca110c2..71b8afc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.api.common.distributions.DataDistribution;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
@@ -525,7 +525,9 @@ public class RegularPactTask<S extends Function, OT> 
extends AbstractInvokable i
                                try {
                                        FunctionUtils.closeFunction(this.stub);
                                }
-                               catch (Throwable t) {}
+                               catch (Throwable t) {
+                                       // do nothing
+                               }
                        }
                        
                        // if resettable driver invoke teardown
@@ -1006,13 +1008,13 @@ public class RegularPactTask<S extends Function, OT> 
extends AbstractInvokable i
                                                        (e.getMessage() == null 
? "." : ": " + e.getMessage()), e);
                                }
                                
-                               if (!(localStub instanceof 
FlatCombineFunction)) {
+                               if (!(localStub instanceof 
GroupCombineFunction)) {
                                        throw new 
IllegalStateException("Performing combining sort outside a reduce task!");
                                }
 
                                @SuppressWarnings({ "rawtypes", "unchecked" })
                                CombiningUnilateralSortMerger<?> cSorter = new 
CombiningUnilateralSortMerger(
-                                       (FlatCombineFunction) localStub, 
getMemoryManager(), getIOManager(), this.inputIterators[inputNum],
+                                       (GroupCombineFunction) localStub, 
getMemoryManager(), getIOManager(), this.inputIterators[inputNum],
                                        this, this.inputSerializers[inputNum], 
getLocalStrategyComparator(inputNum),
                                        
this.config.getRelativeMemoryInput(inputNum), 
this.config.getFilehandlesInput(inputNum),
                                        
this.config.getSpillingThresholdInput(inputNum));
@@ -1467,7 +1469,9 @@ public class RegularPactTask<S extends Function, OT> 
extends AbstractInvokable i
                for (int i = 0; i < tasks.size(); i++) {
                        try {
                                tasks.get(i).cancelTask();
-                       } catch (Throwable t) {}
+                       } catch (Throwable t) {
+                               // do nothing
+                       }
                }
        }
        

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
index 7e36b49..4116145 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.operators.chaining;
 import java.io.IOException;
 import java.util.List;
 
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.typeutils.TypeComparator;
@@ -67,7 +67,7 @@ public class SynchronousChainedCombineDriver<IN, OUT> extends 
ChainedDriver<IN,
 
        private InMemorySorter<IN> sorter;
 
-       private FlatCombineFunction<IN, OUT> combiner;
+       private GroupCombineFunction<IN, OUT> combiner;
 
        private TypeSerializer<IN> serializer;
 
@@ -90,8 +90,8 @@ public class SynchronousChainedCombineDriver<IN, OUT> extends 
ChainedDriver<IN,
                this.parent = parent;
 
                @SuppressWarnings("unchecked")
-               final FlatCombineFunction<IN, OUT> combiner =
-                       RegularPactTask.instantiateUserCode(this.config, 
userCodeClassLoader, FlatCombineFunction.class);
+               final GroupCombineFunction<IN, OUT> combiner =
+                       RegularPactTask.instantiateUserCode(this.config, 
userCodeClassLoader, GroupCombineFunction.class);
                this.combiner = combiner;
                FunctionUtils.setFunctionRuntimeContext(combiner, 
getUdfRuntimeContext());
        }
@@ -210,7 +210,7 @@ public class SynchronousChainedCombineDriver<IN, OUT> 
extends ChainedDriver<IN,
 
 
                                // cache references on the stack
-                               final FlatCombineFunction<IN, OUT> stub = 
this.combiner;
+                               final GroupCombineFunction<IN, OUT> stub = 
this.combiner;
                                final Collector<OUT> output = 
this.outputCollector;
 
                                // run stub implementation
@@ -226,7 +226,7 @@ public class SynchronousChainedCombineDriver<IN, OUT> 
extends ChainedDriver<IN,
 
 
                                // cache references on the stack
-                               final FlatCombineFunction<IN, OUT> stub = 
this.combiner;
+                               final GroupCombineFunction<IN, OUT> stub = 
this.combiner;
                                final Collector<OUT> output = 
this.outputCollector;
 
                                // run stub implementation

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
index 9282fd4..8da9413 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.operators.sort;
 
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -70,7 +70,7 @@ public class CombiningUnilateralSortMerger<E> extends 
UnilateralSortMerger<E> {
         */
        private static final Logger LOG = 
LoggerFactory.getLogger(CombiningUnilateralSortMerger.class);
 
-       private final FlatCombineFunction<E, E> combineStub;    // the user 
code stub that does the combining
+       private final GroupCombineFunction<E, E> combineStub;   // the user 
code stub that does the combining
        
        private Configuration udfConfig;
        
@@ -100,7 +100,7 @@ public class CombiningUnilateralSortMerger<E> extends 
UnilateralSortMerger<E> {
         * @throws MemoryAllocationException Thrown, if not enough memory can 
be obtained from the memory manager to
         *                                   perform the sort.
         */
-       public CombiningUnilateralSortMerger(FlatCombineFunction<E, E> 
combineStub, MemoryManager memoryManager, IOManager ioManager,
+       public CombiningUnilateralSortMerger(GroupCombineFunction<E, E> 
combineStub, MemoryManager memoryManager, IOManager ioManager,
                        MutableObjectIterator<E> input, AbstractInvokable 
parentTask, 
                        TypeSerializerFactory<E> serializerFactory, 
TypeComparator<E> comparator,
                        double memoryFraction, int maxNumFileHandles, float 
startSpillingFraction)
@@ -132,7 +132,7 @@ public class CombiningUnilateralSortMerger<E> extends 
UnilateralSortMerger<E> {
         * @throws MemoryAllocationException Thrown, if not enough memory can 
be obtained from the memory manager to
         *                                   perform the sort.
         */
-       public CombiningUnilateralSortMerger(FlatCombineFunction<E, E> 
combineStub, MemoryManager memoryManager, IOManager ioManager,
+       public CombiningUnilateralSortMerger(GroupCombineFunction<E, E> 
combineStub, MemoryManager memoryManager, IOManager ioManager,
                        MutableObjectIterator<E> input, AbstractInvokable 
parentTask, 
                        TypeSerializerFactory<E> serializerFactory, 
TypeComparator<E> comparator,
                        double memoryFraction, int numSortBuffers, int 
maxNumFileHandles,
@@ -189,7 +189,7 @@ public class CombiningUnilateralSortMerger<E> extends 
UnilateralSortMerger<E> {
                        // ------------------- In-Memory Cache 
------------------------
                        
                        final Queue<CircularElement<E>> cache = new 
ArrayDeque<CircularElement<E>>();
-                       CircularElement<E> element = null;
+                       CircularElement<E> element;
                        boolean cacheOnly = false;
                        
                        // fill cache
@@ -253,7 +253,7 @@ public class CombiningUnilateralSortMerger<E> extends 
UnilateralSortMerger<E> {
                        
                        // ------------------- Spilling Phase 
------------------------
                        
-                       final FlatCombineFunction<E, E> combineStub = 
CombiningUnilateralSortMerger.this.combineStub;
+                       final GroupCombineFunction<E, E> combineStub = 
CombiningUnilateralSortMerger.this.combineStub;
                        
                        // now that we are actually spilling, take the 
combiner, and open it
                        try {
@@ -463,7 +463,7 @@ public class CombiningUnilateralSortMerger<E> extends 
UnilateralSortMerger<E> {
                                                                                
                                                                        
this.memManager.getPageSize());
                        
                        final WriterCollector<E> collector = new 
WriterCollector<E>(output, this.serializer);
-                       final FlatCombineFunction<E, E> combineStub = 
CombiningUnilateralSortMerger.this.combineStub;
+                       final GroupCombineFunction<E, E> combineStub = 
CombiningUnilateralSortMerger.this.combineStub;
 
                        // combine and write to disk
                        try {
@@ -573,7 +573,7 @@ public class CombiningUnilateralSortMerger<E> extends 
UnilateralSortMerger<E> {
                                throw new TraversableOnceException();
                        }
                }
-       };
+       }
 
        // 
------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
index 4b92b71..5c3ecf3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
@@ -18,11 +18,6 @@
 
 package org.apache.flink.runtime.blob;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
 import java.io.File;
 import java.net.InetSocketAddress;
 import java.net.URISyntaxException;
@@ -33,6 +28,11 @@ import java.util.List;
 import org.apache.flink.configuration.Configuration;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 /**
  * This class contains unit tests for the {@link BlobCache}.
  */
@@ -70,8 +70,8 @@ public class BlobCacheSuccessTest {
 
                        blobCache = new BlobCache(serverAddress, new 
Configuration());
 
-                       for(int i = 0; i < blobKeys.size(); i++){
-                               blobCache.getURL(blobKeys.get(i));
+                       for (BlobKey blobKey : blobKeys) {
+                               blobCache.getURL(blobKey);
                        }
 
                        // Now, shut down the BLOB server, the BLOBs must still 
be accessible through the cache.
@@ -87,9 +87,7 @@ public class BlobCacheSuccessTest {
                        // Verify the result
                        assertEquals(blobKeys.size(), urls.length);
 
-                       for (int i = 0; i < urls.length; ++i) {
-
-                               final URL url = urls[i];
+                       for (final URL url : urls) {
 
                                assertNotNull(url);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index 1291181..2732112 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -641,7 +641,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
    *  arbitrary output type.
    */
   def combineGroup[R: TypeInformation: ClassTag](
-      combiner: FlatCombineFunction[T, R]): DataSet[R] = {
+      combiner: GroupCombineFunction[T, R]): DataSet[R] = {
     if (combiner == null) {
       throw new NullPointerException("Combine function must not be null.")
     }
@@ -670,7 +670,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
     if (fun == null) {
       throw new NullPointerException("Combine function must not be null.")
     }
-    val combiner = new FlatCombineFunction[T, R] {
+    val combiner = new GroupCombineFunction[T, R] {
       val cleanFun = clean(fun)
       def combine(in: java.lang.Iterable[T], out: Collector[R]) {
         cleanFun(in.iterator().asScala, out)

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala
index eca4563..d547ea4 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.java.functions.{KeySelector, 
FirstReducer}
 import org.apache.flink.api.scala.operators.ScalaAggregateOperator
 import scala.collection.JavaConverters._
 import org.apache.commons.lang3.Validate
-import org.apache.flink.api.common.functions.{FlatCombineFunction, 
GroupReduceFunction, ReduceFunction, Partitioner}
+import org.apache.flink.api.common.functions.{GroupCombineFunction, 
GroupReduceFunction, ReduceFunction, Partitioner}
 import org.apache.flink.api.common.operators.Order
 import org.apache.flink.api.java.aggregation.Aggregations
 import org.apache.flink.api.java.operators._
@@ -370,7 +370,7 @@ class GroupedDataSet[T: ClassTag](
   def combineGroup[R: TypeInformation: ClassTag](
                                           fun: (Iterator[T], Collector[R]) => 
Unit): DataSet[R] = {
     Validate.notNull(fun, "GroupCombine function must not be null.")
-    val combiner = new FlatCombineFunction[T, R] {
+    val combiner = new GroupCombineFunction[T, R] {
       val cleanFun = set.clean(fun)
       def combine(in: java.lang.Iterable[T], out: Collector[R]) {
         cleanFun(in.iterator().asScala, out)
@@ -396,7 +396,7 @@ class GroupedDataSet[T: ClassTag](
    *  arbitrary output type.
    */
   def combineGroup[R: TypeInformation: ClassTag](
-      combiner: FlatCombineFunction[T, R]): DataSet[R] = {
+      combiner: GroupCombineFunction[T, R]): DataSet[R] = {
     Validate.notNull(combiner, "GroupCombine function must not be null.")
     wrap(
       new GroupCombineOperator[T, R](maybeCreateSortedGrouping(),

http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java
 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java
index 6631f07..e2a160d 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java
@@ -21,14 +21,14 @@ package 
org.apache.flink.test.iterative.nephele.customdanglingpagerank;
 import java.util.Iterator;
 
 import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import 
org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRank;
 import org.apache.flink.util.Collector;
 
 
 public class CustomRankCombiner extends AbstractRichFunction implements 
GroupReduceFunction<VertexWithRank, VertexWithRank>,
-               FlatCombineFunction<VertexWithRank, VertexWithRank>
+               GroupCombineFunction<VertexWithRank, VertexWithRank>
 {
        private static final long serialVersionUID = 1L;
        

Reply via email to