[Flink-1780] Rename FlatCombineFunction to GroupCombineFunction This closes #530
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/033c69f9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/033c69f9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/033c69f9 Branch: refs/heads/master Commit: 033c69f9477c6352865e7e0da01296dd778ffe59 Parents: ae04025 Author: Suneel Marthi <[email protected]> Authored: Tue Mar 24 16:19:32 2015 -0400 Committer: Maximilian Michels <[email protected]> Committed: Wed Mar 25 17:13:51 2015 +0100 ---------------------------------------------------------------------- docs/dataset_transformations.md | 2 +- .../api/common/functions/CombineFunction.java | 2 +- .../common/functions/FlatCombineFunction.java | 51 -------------------- .../common/functions/GroupCombineFunction.java | 51 ++++++++++++++++++++ .../functions/RichFlatCombineFunction.java | 42 ---------------- .../functions/RichGroupCombineFunction.java | 39 +++++++++++++++ .../functions/RichGroupReduceFunction.java | 4 +- .../base/GroupCombineOperatorBase.java | 6 +-- .../operators/base/GroupReduceOperatorBase.java | 8 +-- .../java/org/apache/flink/api/java/DataSet.java | 4 +- .../flink/api/java/functions/FirstReducer.java | 4 +- .../java/operators/GroupCombineOperator.java | 22 ++++----- .../api/java/operators/GroupReduceOperator.java | 6 +-- .../api/java/operators/SortedGrouping.java | 4 +- .../api/java/operators/UnsortedGrouping.java | 4 +- .../PlanUnwrappingGroupCombineOperator.java | 12 ++--- .../PlanUnwrappingReduceGroupOperator.java | 10 ++-- ...lanUnwrappingSortedGroupCombineOperator.java | 12 ++--- ...PlanUnwrappingSortedReduceGroupOperator.java | 10 ++-- .../java/record/operators/ReduceOperator.java | 4 +- .../flink/api/java/typeutils/TypeExtractor.java | 24 ++++----- .../java/record/ReduceWrappingFunctionTest.java | 16 +++--- .../operators/AllGroupCombineDriver.java | 16 +++--- .../runtime/operators/AllGroupReduceDriver.java | 10 ++-- .../operators/GroupReduceCombineDriver.java | 20 ++++---- .../runtime/operators/RegularPactTask.java | 14 ++++-- .../SynchronousChainedCombineDriver.java | 12 ++--- .../sort/CombiningUnilateralSortMerger.java | 16 +++--- .../runtime/blob/BlobCacheSuccessTest.java | 18 +++---- .../org/apache/flink/api/scala/DataSet.scala | 4 +- .../apache/flink/api/scala/GroupedDataSet.scala | 6 +-- .../CustomRankCombiner.java | 4 +- .../javaApiOperators/GroupCombineITCase.java | 20 ++++---- .../scala/operators/GroupCombineITCase.scala | 6 +-- 34 files changed, 242 insertions(+), 241 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/docs/dataset_transformations.md ---------------------------------------------------------------------- diff --git a/docs/dataset_transformations.md b/docs/dataset_transformations.md index 2bec61b..3bb3cec 100644 --- a/docs/dataset_transformations.md +++ b/docs/dataset_transformations.md @@ -554,7 +554,7 @@ an alternative WordCount implementation. In the implementation, DataSet<String> input = [..] // The words received as input DataSet<String> groupedInput = input.groupBy(0); // group identical words -DataSet<Tuple2<String, Integer>> combinedWords = groupedInput.combineGroup(new FlatCombineFunction<String, Tuple2<String, Integer>() { +DataSet<Tuple2<String, Integer>> combinedWords = groupedInput.combineGroup(new GroupCombineFunction<String, Tuple2<String, Integer>() { public void combine(Iterable<String> words, Collector<Tuple2<String, Integer>>) { // combine int count = 0; http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java index ef52b32..af115b0 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java @@ -29,7 +29,7 @@ import java.io.Serializable; * reduce the data volume earlier, before the entire groups have been collected. * <p> * This special variant of the combine function reduces the group of elements into a single element. A variant - * that can return multiple values per group is defined in {@link FlatCombineFunction}. + * that can return multiple values per group is defined in {@link GroupCombineFunction}. * * @param <IN> The data type processed by the combine function. * @param <OUT> The data type emitted by the combine function. http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java deleted file mode 100644 index b90b3ce..0000000 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.common.functions; - -import java.io.Serializable; - -import org.apache.flink.util.Collector; - -/** - * Generic interface used for combine functions ("combiners"). Combiners act as auxiliaries to a {@link GroupReduceFunction} - * and "pre-reduce" the data. The combine functions typically do not see the entire group of elements, but - * only a sub-group. - * <p> - * Combine functions are frequently helpful in increasing the program efficiency, because they allow the system to - * reduce the data volume earlier, before the entire groups have been collected. - * <p> - * This special variant of the combine function supports to return more than one element per group. - * It is frequently less efficient to use than the {@link CombineFunction}. - * - * @param <IN> The data type processed by the combine function. - * @param <OUT> The data type emitted by the combine function. - */ -public interface FlatCombineFunction<IN, OUT> extends Function, Serializable { - - /** - * The combine method, called (potentially multiple timed) with subgroups of elements. - * - * @param values The elements to be combined. - * @param out The collector to use to return values from the function. - * - * @throws Exception The function may throw Exceptions, which will cause the program to cancel, - * and may trigger the recovery logic. - */ - void combine(Iterable<IN> values, Collector<OUT> out) throws Exception; -} http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupCombineFunction.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupCombineFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupCombineFunction.java new file mode 100644 index 0000000..c0b153b --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupCombineFunction.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.functions; + +import java.io.Serializable; + +import org.apache.flink.util.Collector; + +/** + * Generic interface used for combine functions ("combiners"). Combiners act as auxiliaries to a {@link GroupReduceFunction} + * and "pre-reduce" the data. The combine functions typically do not see the entire group of elements, but + * only a sub-group. + * <p> + * Combine functions are frequently helpful in increasing the program efficiency, because they allow the system to + * reduce the data volume earlier, before the entire groups have been collected. + * <p> + * This special variant of the combine function supports to return more than one element per group. + * It is frequently less efficient to use than the {@link CombineFunction}. + * + * @param <IN> The data type processed by the combine function. + * @param <OUT> The data type emitted by the combine function. + */ +public interface GroupCombineFunction<IN, OUT> extends Function, Serializable { + + /** + * The combine method, called (potentially multiple timed) with subgroups of elements. + * + * @param values The elements to be combined. + * @param out The collector to use to return values from the function. + * + * @throws Exception The function may throw Exceptions, which will cause the program to cancel, + * and may trigger the recovery logic. + */ + void combine(Iterable<IN> values, Collector<OUT> out) throws Exception; +} http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatCombineFunction.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatCombineFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatCombineFunction.java deleted file mode 100644 index 17aca88..0000000 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatCombineFunction.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.common.functions; - - -import org.apache.flink.api.common.functions.AbstractRichFunction; -import org.apache.flink.api.common.functions.FlatCombineFunction; -import org.apache.flink.api.common.functions.RichFunction; -import org.apache.flink.util.Collector; - -/** - * Rich variant of the {@link FlatCombineFunction}. As a {@link RichFunction}, it gives access to the - * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods: - * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and - * {@link RichFunction#close()}. - * - * @param <IN> The data type of the elements to be combined. - * @param <OUT> The resulting data type of the elements to be combined. - */ -public abstract class RichFlatCombineFunction<IN, OUT> extends AbstractRichFunction implements FlatCombineFunction<IN, OUT> { - - private static final long serialVersionUID = 1L; - - @Override - public abstract void combine(Iterable<IN> values, Collector<OUT> out) throws Exception; -} http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupCombineFunction.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupCombineFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupCombineFunction.java new file mode 100644 index 0000000..55df232 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupCombineFunction.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.functions; + + +import org.apache.flink.util.Collector; + +/** + * Rich variant of the {@link GroupCombineFunction}. As a {@link RichFunction}, it gives access to the + * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods: + * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and + * {@link RichFunction#close()}. + * + * @param <IN> The data type of the elements to be combined. + * @param <OUT> The resulting data type of the elements to be combined. + */ +public abstract class RichGroupCombineFunction<IN, OUT> extends AbstractRichFunction implements GroupCombineFunction<IN, OUT> { + + private static final long serialVersionUID = 1L; + + @Override + public abstract void combine(Iterable<IN> values, Collector<OUT> out) throws Exception; +} http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java index b6c92c2..48e27d3 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java @@ -34,7 +34,7 @@ import org.apache.flink.util.Collector; * @param <IN> Type of the elements that this function processes. * @param <OUT> The type of the elements returned by the user-defined function. */ -public abstract class RichGroupReduceFunction<IN, OUT> extends AbstractRichFunction implements GroupReduceFunction<IN, OUT>, FlatCombineFunction<IN, IN> { +public abstract class RichGroupReduceFunction<IN, OUT> extends AbstractRichFunction implements GroupReduceFunction<IN, OUT>, GroupCombineFunction<IN, IN> { private static final long serialVersionUID = 1L; @@ -83,5 +83,5 @@ public abstract class RichGroupReduceFunction<IN, OUT> extends AbstractRichFunct */ @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.TYPE) - public static @interface Combinable {}; + public static @interface Combinable {} } http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java index 2a47c45..27fbc1c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java @@ -22,7 +22,7 @@ import com.google.common.base.Preconditions; import org.apache.commons.lang3.ArrayUtils; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.InvalidProgramException; -import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.functions.util.CopyingListCollector; import org.apache.flink.api.common.functions.util.FunctionUtils; @@ -46,7 +46,7 @@ import java.util.List; * This class is later processed by the compiler to generate the plan. * @see org.apache.flink.api.common.functions.CombineFunction */ -public class GroupCombineOperatorBase<IN, OUT, FT extends FlatCombineFunction<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> { +public class GroupCombineOperatorBase<IN, OUT, FT extends GroupCombineFunction<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> { /** The ordering for the order inside a reduce group. */ @@ -81,7 +81,7 @@ public class GroupCombineOperatorBase<IN, OUT, FT extends FlatCombineFunction<IN @Override protected List<OUT> executeOnCollections(List<IN> inputData, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception { - FlatCombineFunction<IN, OUT> function = this.userFunction.getUserCodeObject(); + GroupCombineFunction<IN, OUT> function = this.userFunction.getUserCodeObject(); UnaryOperatorInformation<IN, OUT> operatorInfo = getOperatorInfo(); TypeInformation<IN> inputType = operatorInfo.getInputType(); http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java index 6d7db89..57f07f3 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java @@ -21,7 +21,7 @@ package org.apache.flink.api.common.operators.base; import org.apache.commons.lang3.ArrayUtils; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.InvalidProgramException; -import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.common.functions.RuntimeContext; @@ -107,15 +107,15 @@ public class GroupReduceOperatorBase<IN, OUT, FT extends GroupReduceFunction<IN, /** * Marks the group reduce operation as combinable. Combinable operations may pre-reduce the * data before the actual group reduce operations. Combinable user-defined functions - * must implement the interface {@link org.apache.flink.api.common.functions.FlatCombineFunction}. + * must implement the interface {@link GroupCombineFunction}. * * @param combinable Flag to mark the group reduce operation as combinable. */ public void setCombinable(boolean combinable) { // sanity check - if (combinable && !FlatCombineFunction.class.isAssignableFrom(this.userFunction.getUserCodeClass())) { + if (combinable && !GroupCombineFunction.class.isAssignableFrom(this.userFunction.getUserCodeClass())) { throw new IllegalArgumentException("Cannot set a UDF as combinable if it does not implement the interface " + - FlatCombineFunction.class.getName()); + GroupCombineFunction.class.getName()); } else { this.combinable = combinable; } http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java index ed8d1ca..1e91eeb 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java @@ -24,7 +24,7 @@ import org.apache.commons.lang3.Validate; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.functions.FilterFunction; -import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.InvalidTypesException; @@ -473,7 +473,7 @@ public abstract class DataSet<T> { * @param combiner The CombineFunction that is applied on the DataSet. * @return A GroupCombineOperator which represents the combined DataSet. */ - public <R> GroupCombineOperator<T, R> combineGroup(FlatCombineFunction<T, R> combiner) { + public <R> GroupCombineOperator<T, R> combineGroup(GroupCombineFunction<T, R> combiner) { if (combiner == null) { throw new NullPointerException("GroupReduce function must not be null."); } http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java index fbb7029..a604cc0 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java @@ -17,13 +17,13 @@ */ package org.apache.flink.api.java.functions; -import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable; import org.apache.flink.util.Collector; @Combinable -public class FirstReducer<T> implements GroupReduceFunction<T, T>, FlatCombineFunction<T, T> { +public class FirstReducer<T> implements GroupReduceFunction<T, T>, GroupCombineFunction<T, T> { private static final long serialVersionUID = 1L; private final int count; http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java index 3c1d47c..911c608 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java @@ -18,7 +18,7 @@ package org.apache.flink.api.java.operators; -import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.Order; @@ -46,7 +46,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo; */ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, GroupCombineOperator<IN, OUT>> { - private final FlatCombineFunction<IN, OUT> function; + private final GroupCombineFunction<IN, OUT> function; private final Grouping<IN> grouper; @@ -60,7 +60,7 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU * @param function The user-defined GroupReduce function. * @param defaultName The operator's name. */ - public GroupCombineOperator(DataSet<IN> input, TypeInformation<OUT> resultType, FlatCombineFunction<IN, OUT> function, String defaultName) { + public GroupCombineOperator(DataSet<IN> input, TypeInformation<OUT> resultType, GroupCombineFunction<IN, OUT> function, String defaultName) { super(input, resultType); this.function = function; this.grouper = null; @@ -73,7 +73,7 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU * @param input The grouped input to be processed group-wise by the groupReduce function. * @param function The user-defined GroupReduce function. */ - public GroupCombineOperator(Grouping<IN> input, TypeInformation<OUT> resultType, FlatCombineFunction<IN, OUT> function, String defaultName) { + public GroupCombineOperator(Grouping<IN> input, TypeInformation<OUT> resultType, GroupCombineFunction<IN, OUT> function, String defaultName) { super(input != null ? input.getDataSet() : null, resultType); this.function = function; @@ -82,7 +82,7 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU } @Override - protected FlatCombineFunction<IN, OUT> getFunction() { + protected GroupCombineFunction<IN, OUT> getFunction() { return function; } @@ -99,8 +99,8 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU if (grouper == null) { // non grouped reduce UnaryOperatorInformation<IN, OUT> operatorInfo = new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()); - GroupCombineOperatorBase<IN, OUT, FlatCombineFunction<IN, OUT>> po = - new GroupCombineOperatorBase<IN, OUT, FlatCombineFunction<IN, OUT>>(function, operatorInfo, new int[0], name); + GroupCombineOperatorBase<IN, OUT, GroupCombineFunction<IN, OUT>> po = + new GroupCombineOperatorBase<IN, OUT, GroupCombineFunction<IN, OUT>>(function, operatorInfo, new int[0], name); po.setInput(input); // the parallelism for a non grouped reduce can only be 1 @@ -144,8 +144,8 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU int[] logicalKeyPositions = grouper.getKeys().computeLogicalKeyPositions(); UnaryOperatorInformation<IN, OUT> operatorInfo = new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()); - GroupCombineOperatorBase<IN, OUT, FlatCombineFunction<IN, OUT>> po = - new GroupCombineOperatorBase<IN, OUT, FlatCombineFunction<IN, OUT>>(function, operatorInfo, logicalKeyPositions, name); + GroupCombineOperatorBase<IN, OUT, GroupCombineFunction<IN, OUT>> po = + new GroupCombineOperatorBase<IN, OUT, GroupCombineFunction<IN, OUT>>(function, operatorInfo, logicalKeyPositions, name); po.setInput(input); po.setParallelism(getParallelism()); @@ -175,7 +175,7 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU // -------------------------------------------------------------------------------------------- private static <IN, OUT, K> PlanUnwrappingGroupCombineOperator<IN, OUT, K> translateSelectorFunctionReducer( - Keys.SelectorFunctionKeys<IN, ?> rawKeys, FlatCombineFunction<IN, OUT> function, + Keys.SelectorFunctionKeys<IN, ?> rawKeys, GroupCombineFunction<IN, OUT> function, TypeInformation<IN> inputType, TypeInformation<OUT> outputType, String name, Operator<IN> input) { @SuppressWarnings("unchecked") @@ -199,7 +199,7 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU } private static <IN, OUT, K1, K2> PlanUnwrappingSortedGroupCombineOperator<IN, OUT, K1, K2> translateSelectorFunctionSortedReducer( - Keys.SelectorFunctionKeys<IN, ?> rawGroupingKey, Keys.SelectorFunctionKeys<IN, ?> rawSortingKey, FlatCombineFunction<IN, OUT> function, + Keys.SelectorFunctionKeys<IN, ?> rawGroupingKey, Keys.SelectorFunctionKeys<IN, ?> rawSortingKey, GroupCombineFunction<IN, OUT> function, TypeInformation<IN> inputType, TypeInformation<OUT> outputType, String name, Operator<IN> input) { @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java index e809623..30f2cc4 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java @@ -18,7 +18,7 @@ package org.apache.flink.api.java.operators; -import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.operators.Operator; @@ -88,7 +88,7 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT } private void checkCombinability() { - if (function instanceof FlatCombineFunction && + if (function instanceof GroupCombineFunction && function.getClass().getAnnotation(RichGroupReduceFunction.Combinable.class) != null) { this.combinable = true; } @@ -111,7 +111,7 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT public GroupReduceOperator<IN, OUT> setCombinable(boolean combinable) { // sanity check that the function is a subclass of the combine interface - if (combinable && !(function instanceof FlatCombineFunction)) { + if (combinable && !(function instanceof GroupCombineFunction)) { throw new IllegalArgumentException("The function does not implement the combine interface."); } http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java index b2054bf..287bf82 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java @@ -18,7 +18,7 @@ package org.apache.flink.api.java.operators; -import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.java.Utils; @@ -169,7 +169,7 @@ public class SortedGrouping<T> extends Grouping<T> { * @param combiner The CombineFunction that is applied on the DataSet. * @return A GroupCombineOperator which represents the combined DataSet. */ - public <R> GroupCombineOperator<T, R> combineGroup(FlatCombineFunction<T, R> combiner) { + public <R> GroupCombineOperator<T, R> combineGroup(GroupCombineFunction<T, R> combiner) { if (combiner == null) { throw new NullPointerException("GroupReduce function must not be null."); } http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java index 0f3faa0..319a599 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java @@ -19,7 +19,7 @@ package org.apache.flink.api.java.operators; import org.apache.flink.api.common.InvalidProgramException; -import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.common.functions.ReduceFunction; @@ -173,7 +173,7 @@ public class UnsortedGrouping<T> extends Grouping<T> { * @param combiner The CombineFunction that is applied on the DataSet. * @return A GroupCombineOperator which represents the combined DataSet. */ - public <R> GroupCombineOperator<T, R> combineGroup(FlatCombineFunction<T, R> combiner) { + public <R> GroupCombineOperator<T, R> combineGroup(GroupCombineFunction<T, R> combiner) { if (combiner == null) { throw new NullPointerException("GroupReduce function must not be null."); } http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java index ae4ba11..c8e40ce 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java @@ -18,7 +18,7 @@ package org.apache.flink.api.java.operators.translation; -import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -30,9 +30,9 @@ import org.apache.flink.util.Collector; * A group combine operator that takes 2-tuples (key-value pairs), and applies the group combine operation only * on the unwrapped values. */ -public class PlanUnwrappingGroupCombineOperator<IN, OUT, K> extends GroupCombineOperatorBase<Tuple2<K, IN>, OUT, FlatCombineFunction<Tuple2<K, IN>, OUT>> { +public class PlanUnwrappingGroupCombineOperator<IN, OUT, K> extends GroupCombineOperatorBase<Tuple2<K, IN>, OUT, GroupCombineFunction<Tuple2<K, IN>, OUT>> { - public PlanUnwrappingGroupCombineOperator(FlatCombineFunction<IN, OUT> udf, Keys.SelectorFunctionKeys<IN, K> key, String name, + public PlanUnwrappingGroupCombineOperator(GroupCombineFunction<IN, OUT> udf, Keys.SelectorFunctionKeys<IN, K> key, String name, TypeInformation<OUT> outType, TypeInformation<Tuple2<K, IN>> typeInfoWithKey) { super(new TupleUnwrappingGroupCombiner<IN, OUT, K>(udf), @@ -42,15 +42,15 @@ public class PlanUnwrappingGroupCombineOperator<IN, OUT, K> extends GroupCombine // -------------------------------------------------------------------------------------------- - public static final class TupleUnwrappingGroupCombiner<IN, OUT, K> extends WrappingFunction<FlatCombineFunction<IN, OUT>> - implements FlatCombineFunction<Tuple2<K, IN>, OUT> + public static final class TupleUnwrappingGroupCombiner<IN, OUT, K> extends WrappingFunction<GroupCombineFunction<IN, OUT>> + implements GroupCombineFunction<Tuple2<K, IN>, OUT> { private static final long serialVersionUID = 1L; private final TupleUnwrappingIterator<IN, K> iter; - private TupleUnwrappingGroupCombiner(FlatCombineFunction<IN, OUT> wrapped) { + private TupleUnwrappingGroupCombiner(GroupCombineFunction<IN, OUT> wrapped) { super(wrapped); this.iter = new TupleUnwrappingIterator<IN, K>(); } http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java index 1d59a21..e01af50 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java @@ -18,7 +18,7 @@ package org.apache.flink.api.java.operators.translation; -import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase; @@ -37,7 +37,7 @@ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOp public PlanUnwrappingReduceGroupOperator(GroupReduceFunction<IN, OUT> udf, Keys.SelectorFunctionKeys<IN, K> key, String name, TypeInformation<OUT> outType, TypeInformation<Tuple2<K, IN>> typeInfoWithKey, boolean combinable) { - super(combinable ? new TupleUnwrappingFlatCombinableGroupReducer<IN, OUT, K>((RichGroupReduceFunction<IN, OUT>) udf) : new TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K>(udf), + super(combinable ? new TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K>((RichGroupReduceFunction<IN, OUT>) udf) : new TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K>(udf), new UnaryOperatorInformation<Tuple2<K, IN>, OUT>(typeInfoWithKey, outType), key.computeLogicalKeyPositions(), name); super.setCombinable(combinable); @@ -46,8 +46,8 @@ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOp // -------------------------------------------------------------------------------------------- @RichGroupReduceFunction.Combinable - public static final class TupleUnwrappingFlatCombinableGroupReducer<IN, OUT, K> extends WrappingFunction<RichGroupReduceFunction<IN, OUT>> - implements GroupReduceFunction<Tuple2<K, IN>, OUT>, FlatCombineFunction<Tuple2<K, IN>, Tuple2<K, IN>> + public static final class TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K> extends WrappingFunction<RichGroupReduceFunction<IN, OUT>> + implements GroupReduceFunction<Tuple2<K, IN>, OUT>, GroupCombineFunction<Tuple2<K, IN>, Tuple2<K, IN>> { private static final long serialVersionUID = 1L; @@ -55,7 +55,7 @@ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOp private TupleUnwrappingIterator<IN, K> iter; private TupleWrappingCollector<IN, K> coll; - private TupleUnwrappingFlatCombinableGroupReducer(RichGroupReduceFunction<IN, OUT> wrapped) { + private TupleUnwrappingGroupCombinableGroupReducer(RichGroupReduceFunction<IN, OUT> wrapped) { super(wrapped); this.iter = new TupleUnwrappingIterator<IN, K>(); this.coll = new TupleWrappingCollector<IN, K>(this.iter); http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java index b3d8470..e52a5c4 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java @@ -18,7 +18,7 @@ package org.apache.flink.api.java.operators.translation; -import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -30,9 +30,9 @@ import org.apache.flink.util.Collector; * A reduce operator that takes 3-tuples (groupKey, sortKey, value), and applies the sorted partial group reduce * operation only on the unwrapped values. */ -public class PlanUnwrappingSortedGroupCombineOperator<IN, OUT, K1, K2> extends GroupCombineOperatorBase<Tuple3<K1, K2, IN>, OUT, FlatCombineFunction<Tuple3<K1, K2, IN>,OUT>> { +public class PlanUnwrappingSortedGroupCombineOperator<IN, OUT, K1, K2> extends GroupCombineOperatorBase<Tuple3<K1, K2, IN>, OUT, GroupCombineFunction<Tuple3<K1, K2, IN>,OUT>> { - public PlanUnwrappingSortedGroupCombineOperator(FlatCombineFunction<IN, OUT> udf, Keys.SelectorFunctionKeys<IN, K1> groupingKey, Keys.SelectorFunctionKeys<IN, K2> sortingKey, String name, + public PlanUnwrappingSortedGroupCombineOperator(GroupCombineFunction<IN, OUT> udf, Keys.SelectorFunctionKeys<IN, K1> groupingKey, Keys.SelectorFunctionKeys<IN, K2> sortingKey, String name, TypeInformation<OUT> outType, TypeInformation<Tuple3<K1, K2, IN>> typeInfoWithKey) { super(new TupleUnwrappingGroupReducer<IN, OUT, K1, K2>(udf), @@ -42,15 +42,15 @@ public class PlanUnwrappingSortedGroupCombineOperator<IN, OUT, K1, K2> extends G } - public static final class TupleUnwrappingGroupReducer<IN, OUT, K1, K2> extends WrappingFunction<FlatCombineFunction<IN, OUT>> - implements FlatCombineFunction<Tuple3<K1, K2, IN>, OUT> + public static final class TupleUnwrappingGroupReducer<IN, OUT, K1, K2> extends WrappingFunction<GroupCombineFunction<IN, OUT>> + implements GroupCombineFunction<Tuple3<K1, K2, IN>, OUT> { private static final long serialVersionUID = 1L; private final Tuple3UnwrappingIterator<IN, K1, K2> iter; - private TupleUnwrappingGroupReducer(FlatCombineFunction<IN, OUT> wrapped) { + private TupleUnwrappingGroupReducer(GroupCombineFunction<IN, OUT> wrapped) { super(wrapped); this.iter = new Tuple3UnwrappingIterator<IN, K1, K2>(); } http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java index 757ff56..63ebfa4 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java @@ -18,7 +18,7 @@ package org.apache.flink.api.java.operators.translation; -import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase; @@ -37,7 +37,7 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends Gr public PlanUnwrappingSortedReduceGroupOperator(GroupReduceFunction<IN, OUT> udf, Keys.SelectorFunctionKeys<IN, K1> groupingKey, Keys.SelectorFunctionKeys<IN, K2> sortingKey, String name, TypeInformation<OUT> outType, TypeInformation<Tuple3<K1, K2, IN>> typeInfoWithKey, boolean combinable) { - super(combinable ? new TupleUnwrappingFlatCombinableGroupReducer<IN, OUT, K1, K2>((RichGroupReduceFunction<IN, OUT>) udf) : new TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K1, K2>(udf), + super(combinable ? new TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K1, K2>((RichGroupReduceFunction<IN, OUT>) udf) : new TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K1, K2>(udf), new UnaryOperatorInformation<Tuple3<K1, K2, IN>, OUT>(typeInfoWithKey, outType), groupingKey.computeLogicalKeyPositions(), name); super.setCombinable(combinable); @@ -46,8 +46,8 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends Gr // -------------------------------------------------------------------------------------------- @RichGroupReduceFunction.Combinable - public static final class TupleUnwrappingFlatCombinableGroupReducer<IN, OUT, K1, K2> extends WrappingFunction<RichGroupReduceFunction<IN, OUT>> - implements GroupReduceFunction<Tuple3<K1, K2, IN>, OUT>, FlatCombineFunction<Tuple3<K1, K2, IN>, Tuple3<K1, K2, IN>> + public static final class TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K1, K2> extends WrappingFunction<RichGroupReduceFunction<IN, OUT>> + implements GroupReduceFunction<Tuple3<K1, K2, IN>, OUT>, GroupCombineFunction<Tuple3<K1, K2, IN>, Tuple3<K1, K2, IN>> { private static final long serialVersionUID = 1L; @@ -55,7 +55,7 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends Gr private Tuple3UnwrappingIterator<IN, K1, K2> iter; private Tuple3WrappingCollector<IN, K1, K2> coll; - private TupleUnwrappingFlatCombinableGroupReducer(RichGroupReduceFunction<IN, OUT> wrapped) { + private TupleUnwrappingGroupCombinableGroupReducer(RichGroupReduceFunction<IN, OUT> wrapped) { super(wrapped); this.iter = new Tuple3UnwrappingIterator<IN, K1, K2>(); this.coll = new Tuple3WrappingCollector<IN, K1, K2>(this.iter); http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-java/src/main/java/org/apache/flink/api/java/record/operators/ReduceOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/ReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/ReduceOperator.java index 875e9c1..1866fea 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/ReduceOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/ReduceOperator.java @@ -32,7 +32,7 @@ import java.util.List; import java.util.Map; import org.apache.commons.lang3.Validate; -import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.Ordering; @@ -367,7 +367,7 @@ public class ReduceOperator extends GroupReduceOperatorBase<Record, Record, Grou // ============================================================================================ - public static class WrappingReduceFunction extends WrappingFunction<ReduceFunction> implements GroupReduceFunction<Record, Record>, FlatCombineFunction<Record, Record> { + public static class WrappingReduceFunction extends WrappingFunction<ReduceFunction> implements GroupReduceFunction<Record, Record>, GroupCombineFunction<Record, Record> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java index 4527aa0..ae6063a 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java @@ -34,7 +34,7 @@ import org.apache.avro.specific.SpecificRecordBase; import org.apache.commons.lang3.Validate; import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.functions.CrossFunction; -import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.FlatJoinFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.FoldFunction; @@ -135,14 +135,14 @@ public class TypeExtractor { return getUnaryOperatorReturnType((Function) groupReduceInterface, GroupReduceFunction.class, true, true, inType, functionName, allowMissing); } - public static <IN, OUT> TypeInformation<OUT> getGroupCombineReturnTypes(FlatCombineFunction<IN, OUT> combineInterface, TypeInformation<IN> inType) { + public static <IN, OUT> TypeInformation<OUT> getGroupCombineReturnTypes(GroupCombineFunction<IN, OUT> combineInterface, TypeInformation<IN> inType) { return getGroupCombineReturnTypes(combineInterface, inType, null, false); } - public static <IN, OUT> TypeInformation<OUT> getGroupCombineReturnTypes(FlatCombineFunction<IN, OUT> combineInterface, TypeInformation<IN> inType, + public static <IN, OUT> TypeInformation<OUT> getGroupCombineReturnTypes(GroupCombineFunction<IN, OUT> combineInterface, TypeInformation<IN> inType, String functionName, boolean allowMissing) { - return getUnaryOperatorReturnType((Function) combineInterface, FlatCombineFunction.class, true, true, inType, functionName, allowMissing); + return getUnaryOperatorReturnType((Function) combineInterface, GroupCombineFunction.class, true, true, inType, functionName, allowMissing); } @@ -600,7 +600,7 @@ public class TypeExtractor { // the input is a tuple else if (inTypeInfo instanceof TupleTypeInfo && isClassType(inType) && Tuple.class.isAssignableFrom(typeToClass(inType))) { - ParameterizedType tupleBaseClass = null; + ParameterizedType tupleBaseClass; // get tuple from possible tuple subclass while (!(isClassType(inType) && typeToClass(inType).getSuperclass().equals(Tuple.class))) { @@ -737,7 +737,7 @@ public class TypeExtractor { // check for basic type if (typeInfo.isBasicType()) { - TypeInformation<?> actual = null; + TypeInformation<?> actual; // check if basic type at all if (!(type instanceof Class<?>) || (actual = BasicTypeInfo.getInfoFor((Class<?>) type)) == null) { throw new InvalidTypesException("Basic type expected."); @@ -792,7 +792,7 @@ public class TypeExtractor { } // check writable type contents - Class<?> clazz = null; + Class<?> clazz; if (((WritableTypeInfo<?>) typeInfo).getTypeClass() != (clazz = (Class<?>) type)) { throw new InvalidTypesException("Writable type '" + ((WritableTypeInfo<?>) typeInfo).getTypeClass().getCanonicalName() + "' expected but was '" @@ -801,7 +801,7 @@ public class TypeExtractor { } // check for primitive array else if (typeInfo instanceof PrimitiveArrayTypeInfo) { - Type component = null; + Type component; // check if array at all if (!(type instanceof Class<?> && ((Class<?>) type).isArray() && (component = ((Class<?>) type).getComponentType()) != null) && !(type instanceof GenericArrayType && (component = ((GenericArrayType) type).getGenericComponentType()) != null)) { @@ -819,7 +819,7 @@ public class TypeExtractor { } // check for basic array else if (typeInfo instanceof BasicArrayTypeInfo<?, ?>) { - Type component = null; + Type component; // check if array at all if (!(type instanceof Class<?> && ((Class<?>) type).isArray() && (component = ((Class<?>) type).getComponentType()) != null) && !(type instanceof GenericArrayType && (component = ((GenericArrayType) type).getGenericComponentType()) != null)) { @@ -844,7 +844,7 @@ public class TypeExtractor { } // check component - Type component = null; + Type component; if (type instanceof Class<?>) { component = ((Class<?>) type).getComponentType(); } else { @@ -1428,8 +1428,8 @@ public class TypeExtractor { if (!(t1 instanceof TypeVariable) || !(t2 instanceof TypeVariable)) { return false; } - return ((TypeVariable<?>) t1).getName().equals(((TypeVariable<?>)t2).getName()) - && ((TypeVariable<?>) t1).getGenericDeclaration().equals(((TypeVariable<?>)t2).getGenericDeclaration()); + return ((TypeVariable<?>) t1).getName().equals(((TypeVariable<?>) t2).getName()) + && ((TypeVariable<?>) t1).getGenericDeclaration().equals(((TypeVariable<?>) t2).getGenericDeclaration()); } private static TypeInformation<?> getTypeOfPojoField(TypeInformation<?> pojoInfo, Field field) { http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java index 2216217..89baa98 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java @@ -18,15 +18,12 @@ package org.apache.flink.api.java.record; -import static org.junit.Assert.*; - import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; - import org.apache.commons.lang3.SerializationUtils; -import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.api.common.operators.SingleInputSemanticProperties; @@ -43,6 +40,11 @@ import org.apache.flink.types.Record; import org.apache.flink.util.Collector; import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + @SuppressWarnings({ "serial", "deprecation" }) public class ReduceWrappingFunctionTest { @@ -86,7 +88,7 @@ public class ReduceWrappingFunctionTest { target.clear(); // test combine - ((FlatCombineFunction<Record, Record>) reducer).combine(source, collector); + ((GroupCombineFunction<Record, Record>) reducer).combine(source, collector); assertEquals(2, target.size()); assertEquals(new IntValue(42), target.get(0).getField(0, IntValue.class)); assertEquals(new LongValue(11), target.get(0).getField(1, LongValue.class)); @@ -138,7 +140,7 @@ public class ReduceWrappingFunctionTest { target.clear(); // test combine - ((FlatCombineFunction<Record, Record>) reducer).combine(source, collector); + ((GroupCombineFunction<Record, Record>) reducer).combine(source, collector); assertEquals(2, target.size()); assertEquals(new IntValue(42), target.get(0).getField(0, IntValue.class)); assertEquals(new LongValue(11), target.get(0).getField(1, LongValue.class)); @@ -227,5 +229,5 @@ public class ReduceWrappingFunctionTest { methodCounter.incrementAndGet(); super.open(parameters); } - }; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java index 7d87a6b..7b279ee 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java @@ -20,7 +20,7 @@ package org.apache.flink.runtime.operators; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerFactory; import org.apache.flink.runtime.util.NonReusingMutableToRegularIteratorWrapper; @@ -35,20 +35,20 @@ import org.slf4j.LoggerFactory; * Like @org.apache.flink.runtime.operators.GroupCombineDriver but without grouping and sorting. May emit partially * reduced results. * -* @see org.apache.flink.api.common.functions.FlatCombineFunction +* @see GroupCombineFunction */ -public class AllGroupCombineDriver<IN, OUT> implements PactDriver<FlatCombineFunction<IN, OUT>, OUT> { +public class AllGroupCombineDriver<IN, OUT> implements PactDriver<GroupCombineFunction<IN, OUT>, OUT> { private static final Logger LOG = LoggerFactory.getLogger(AllGroupCombineDriver.class); - private PactTaskContext<FlatCombineFunction<IN, OUT>, OUT> taskContext; + private PactTaskContext<GroupCombineFunction<IN, OUT>, OUT> taskContext; private boolean objectReuseEnabled = false; // ------------------------------------------------------------------------ @Override - public void setup(PactTaskContext<FlatCombineFunction<IN, OUT>, OUT> context) { + public void setup(PactTaskContext<GroupCombineFunction<IN, OUT>, OUT> context) { this.taskContext = context; } @@ -58,9 +58,9 @@ public class AllGroupCombineDriver<IN, OUT> implements PactDriver<FlatCombineFun } @Override - public Class<FlatCombineFunction<IN, OUT>> getStubType() { + public Class<GroupCombineFunction<IN, OUT>> getStubType() { @SuppressWarnings("unchecked") - final Class<FlatCombineFunction<IN, OUT>> clazz = (Class<FlatCombineFunction<IN, OUT>>) (Class<?>) FlatCombineFunction.class; + final Class<GroupCombineFunction<IN, OUT>> clazz = (Class<GroupCombineFunction<IN, OUT>>) (Class<?>) GroupCombineFunction.class; return clazz; } @@ -95,7 +95,7 @@ public class AllGroupCombineDriver<IN, OUT> implements PactDriver<FlatCombineFun TypeSerializer<IN> serializer = serializerFactory.getSerializer(); final MutableObjectIterator<IN> in = this.taskContext.getInput(0); - final FlatCombineFunction<IN, OUT> reducer = this.taskContext.getStub(); + final GroupCombineFunction<IN, OUT> reducer = this.taskContext.getStub(); final Collector<OUT> output = this.taskContext.getOutputCollector(); if (objectReuseEnabled) { http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java index ad1afdb..a20fddf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java @@ -23,7 +23,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.runtime.util.NonReusingMutableToRegularIteratorWrapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.operators.util.TaskConfig; @@ -92,8 +92,8 @@ public class AllGroupReduceDriver<IT, OT> implements PactDriver<GroupReduceFunct switch (this.strategy) { case ALL_GROUP_REDUCE_COMBINE: - if (!(this.taskContext.getStub() instanceof FlatCombineFunction)) { - throw new Exception("Using combiner on a UDF that does not implement the combiner interface " + FlatCombineFunction.class.getName()); + if (!(this.taskContext.getStub() instanceof GroupCombineFunction)) { + throw new Exception("Using combiner on a UDF that does not implement the combiner interface " + GroupCombineFunction.class.getName()); } case ALL_GROUP_REDUCE: case ALL_GROUP_COMBINE: @@ -129,7 +129,7 @@ public class AllGroupReduceDriver<IT, OT> implements PactDriver<GroupReduceFunct final Collector<OT> output = this.taskContext.getOutputCollector(); reducer.reduce(inIter, output); } else if (strategy == DriverStrategy.ALL_GROUP_REDUCE_COMBINE || strategy == DriverStrategy.ALL_GROUP_COMBINE) { - @SuppressWarnings("unchecked") final FlatCombineFunction<IT, OT> combiner = (FlatCombineFunction<IT, OT>) this.taskContext.getStub(); + @SuppressWarnings("unchecked") final GroupCombineFunction<IT, OT> combiner = (GroupCombineFunction<IT, OT>) this.taskContext.getStub(); final Collector<OT> output = this.taskContext.getOutputCollector(); combiner.combine(inIter, output); } else { @@ -147,7 +147,7 @@ public class AllGroupReduceDriver<IT, OT> implements PactDriver<GroupReduceFunct final Collector<OT> output = this.taskContext.getOutputCollector(); reducer.reduce(inIter, output); } else if (strategy == DriverStrategy.ALL_GROUP_REDUCE_COMBINE || strategy == DriverStrategy.ALL_GROUP_COMBINE) { - @SuppressWarnings("unchecked") final FlatCombineFunction<IT, OT> combiner = (FlatCombineFunction<IT, OT>) this.taskContext.getStub(); + @SuppressWarnings("unchecked") final GroupCombineFunction<IT, OT> combiner = (GroupCombineFunction<IT, OT>) this.taskContext.getStub(); final Collector<OT> output = this.taskContext.getOutputCollector(); combiner.combine(inIter, output); } else { http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java index dacd436..493eb4f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java @@ -23,7 +23,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerFactory; @@ -45,7 +45,7 @@ import java.util.List; * the user supplied a RichGroupReduceFunction with a combine method. The combining is performed in memory with a * lazy approach which only combines elements which currently fit in the sorter. This may lead to a partial solution. * In the case of the RichGroupReduceFunction this partial result is transformed into a proper deterministic result. - * The CombineGroup uses the FlatCombineFunction interface which allows to combine values of type <IN> to any type + * The CombineGroup uses the GroupCombineFunction interface which allows to combine values of type <IN> to any type * of type <OUT>. In contrast, the RichGroupReduceFunction requires the combine method to have the same input and * output type to be able to reduce the elements after the combine from <IN> to <OUT>. * @@ -54,18 +54,18 @@ import java.util.List; * @param <IN> The data type consumed by the combiner. * @param <OUT> The data type produced by the combiner. */ -public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<FlatCombineFunction<IN, OUT>, OUT> { +public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombineFunction<IN, OUT>, OUT> { private static final Logger LOG = LoggerFactory.getLogger(GroupReduceCombineDriver.class); /** Fix length records with a length below this threshold will be in-place sorted, if possible. */ private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32; - private PactTaskContext<FlatCombineFunction<IN, OUT>, OUT> taskContext; + private PactTaskContext<GroupCombineFunction<IN, OUT>, OUT> taskContext; private InMemorySorter<IN> sorter; - private FlatCombineFunction<IN, OUT> combiner; + private GroupCombineFunction<IN, OUT> combiner; private TypeSerializer<IN> serializer; @@ -86,7 +86,7 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<FlatCombine // ------------------------------------------------------------------------ @Override - public void setup(PactTaskContext<FlatCombineFunction<IN, OUT>, OUT> context) { + public void setup(PactTaskContext<GroupCombineFunction<IN, OUT>, OUT> context) { this.taskContext = context; this.running = true; } @@ -97,9 +97,9 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<FlatCombine } @Override - public Class<FlatCombineFunction<IN, OUT>> getStubType() { + public Class<GroupCombineFunction<IN, OUT>> getStubType() { @SuppressWarnings("unchecked") - final Class<FlatCombineFunction<IN, OUT>> clazz = (Class<FlatCombineFunction<IN, OUT>>) (Class<?>) FlatCombineFunction.class; + final Class<GroupCombineFunction<IN, OUT>> clazz = (Class<GroupCombineFunction<IN, OUT>>) (Class<?>) GroupCombineFunction.class; return clazz; } @@ -188,7 +188,7 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<FlatCombine final ReusingKeyGroupedIterator<IN> keyIter = new ReusingKeyGroupedIterator<IN>(sorter.getIterator(), this.serializer, this.groupingComparator); - final FlatCombineFunction<IN, OUT> combiner = this.combiner; + final GroupCombineFunction<IN, OUT> combiner = this.combiner; final Collector<OUT> output = this.output; // iterate over key groups @@ -203,7 +203,7 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<FlatCombine final NonReusingKeyGroupedIterator<IN> keyIter = new NonReusingKeyGroupedIterator<IN>(sorter.getIterator(), this.groupingComparator); - final FlatCombineFunction<IN, OUT> combiner = this.combiner; + final GroupCombineFunction<IN, OUT> combiner = this.combiner; final Collector<OUT> output = this.output; // iterate over key groups http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java index ca110c2..71b8afc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java @@ -23,7 +23,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.AccumulatorHelper; import org.apache.flink.api.common.distributions.DataDistribution; -import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.common.functions.util.FunctionUtils; @@ -525,7 +525,9 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i try { FunctionUtils.closeFunction(this.stub); } - catch (Throwable t) {} + catch (Throwable t) { + // do nothing + } } // if resettable driver invoke teardown @@ -1006,13 +1008,13 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i (e.getMessage() == null ? "." : ": " + e.getMessage()), e); } - if (!(localStub instanceof FlatCombineFunction)) { + if (!(localStub instanceof GroupCombineFunction)) { throw new IllegalStateException("Performing combining sort outside a reduce task!"); } @SuppressWarnings({ "rawtypes", "unchecked" }) CombiningUnilateralSortMerger<?> cSorter = new CombiningUnilateralSortMerger( - (FlatCombineFunction) localStub, getMemoryManager(), getIOManager(), this.inputIterators[inputNum], + (GroupCombineFunction) localStub, getMemoryManager(), getIOManager(), this.inputIterators[inputNum], this, this.inputSerializers[inputNum], getLocalStrategyComparator(inputNum), this.config.getRelativeMemoryInput(inputNum), this.config.getFilehandlesInput(inputNum), this.config.getSpillingThresholdInput(inputNum)); @@ -1467,7 +1469,9 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i for (int i = 0; i < tasks.size(); i++) { try { tasks.get(i).cancelTask(); - } catch (Throwable t) {} + } catch (Throwable t) { + // do nothing + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java index 7e36b49..4116145 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java @@ -21,7 +21,7 @@ package org.apache.flink.runtime.operators.chaining; import java.io.IOException; import java.util.List; -import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.api.common.typeutils.TypeComparator; @@ -67,7 +67,7 @@ public class SynchronousChainedCombineDriver<IN, OUT> extends ChainedDriver<IN, private InMemorySorter<IN> sorter; - private FlatCombineFunction<IN, OUT> combiner; + private GroupCombineFunction<IN, OUT> combiner; private TypeSerializer<IN> serializer; @@ -90,8 +90,8 @@ public class SynchronousChainedCombineDriver<IN, OUT> extends ChainedDriver<IN, this.parent = parent; @SuppressWarnings("unchecked") - final FlatCombineFunction<IN, OUT> combiner = - RegularPactTask.instantiateUserCode(this.config, userCodeClassLoader, FlatCombineFunction.class); + final GroupCombineFunction<IN, OUT> combiner = + RegularPactTask.instantiateUserCode(this.config, userCodeClassLoader, GroupCombineFunction.class); this.combiner = combiner; FunctionUtils.setFunctionRuntimeContext(combiner, getUdfRuntimeContext()); } @@ -210,7 +210,7 @@ public class SynchronousChainedCombineDriver<IN, OUT> extends ChainedDriver<IN, // cache references on the stack - final FlatCombineFunction<IN, OUT> stub = this.combiner; + final GroupCombineFunction<IN, OUT> stub = this.combiner; final Collector<OUT> output = this.outputCollector; // run stub implementation @@ -226,7 +226,7 @@ public class SynchronousChainedCombineDriver<IN, OUT> extends ChainedDriver<IN, // cache references on the stack - final FlatCombineFunction<IN, OUT> stub = this.combiner; + final GroupCombineFunction<IN, OUT> stub = this.combiner; final Collector<OUT> output = this.outputCollector; // run stub implementation http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java index 9282fd4..8da9413 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.operators.sort; -import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -70,7 +70,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> { */ private static final Logger LOG = LoggerFactory.getLogger(CombiningUnilateralSortMerger.class); - private final FlatCombineFunction<E, E> combineStub; // the user code stub that does the combining + private final GroupCombineFunction<E, E> combineStub; // the user code stub that does the combining private Configuration udfConfig; @@ -100,7 +100,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> { * @throws MemoryAllocationException Thrown, if not enough memory can be obtained from the memory manager to * perform the sort. */ - public CombiningUnilateralSortMerger(FlatCombineFunction<E, E> combineStub, MemoryManager memoryManager, IOManager ioManager, + public CombiningUnilateralSortMerger(GroupCombineFunction<E, E> combineStub, MemoryManager memoryManager, IOManager ioManager, MutableObjectIterator<E> input, AbstractInvokable parentTask, TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator, double memoryFraction, int maxNumFileHandles, float startSpillingFraction) @@ -132,7 +132,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> { * @throws MemoryAllocationException Thrown, if not enough memory can be obtained from the memory manager to * perform the sort. */ - public CombiningUnilateralSortMerger(FlatCombineFunction<E, E> combineStub, MemoryManager memoryManager, IOManager ioManager, + public CombiningUnilateralSortMerger(GroupCombineFunction<E, E> combineStub, MemoryManager memoryManager, IOManager ioManager, MutableObjectIterator<E> input, AbstractInvokable parentTask, TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator, double memoryFraction, int numSortBuffers, int maxNumFileHandles, @@ -189,7 +189,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> { // ------------------- In-Memory Cache ------------------------ final Queue<CircularElement<E>> cache = new ArrayDeque<CircularElement<E>>(); - CircularElement<E> element = null; + CircularElement<E> element; boolean cacheOnly = false; // fill cache @@ -253,7 +253,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> { // ------------------- Spilling Phase ------------------------ - final FlatCombineFunction<E, E> combineStub = CombiningUnilateralSortMerger.this.combineStub; + final GroupCombineFunction<E, E> combineStub = CombiningUnilateralSortMerger.this.combineStub; // now that we are actually spilling, take the combiner, and open it try { @@ -463,7 +463,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> { this.memManager.getPageSize()); final WriterCollector<E> collector = new WriterCollector<E>(output, this.serializer); - final FlatCombineFunction<E, E> combineStub = CombiningUnilateralSortMerger.this.combineStub; + final GroupCombineFunction<E, E> combineStub = CombiningUnilateralSortMerger.this.combineStub; // combine and write to disk try { @@ -573,7 +573,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> { throw new TraversableOnceException(); } } - }; + } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java index 4b92b71..5c3ecf3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java @@ -18,11 +18,6 @@ package org.apache.flink.runtime.blob; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - import java.io.File; import java.net.InetSocketAddress; import java.net.URISyntaxException; @@ -33,6 +28,11 @@ import java.util.List; import org.apache.flink.configuration.Configuration; import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + /** * This class contains unit tests for the {@link BlobCache}. */ @@ -70,8 +70,8 @@ public class BlobCacheSuccessTest { blobCache = new BlobCache(serverAddress, new Configuration()); - for(int i = 0; i < blobKeys.size(); i++){ - blobCache.getURL(blobKeys.get(i)); + for (BlobKey blobKey : blobKeys) { + blobCache.getURL(blobKey); } // Now, shut down the BLOB server, the BLOBs must still be accessible through the cache. @@ -87,9 +87,7 @@ public class BlobCacheSuccessTest { // Verify the result assertEquals(blobKeys.size(), urls.length); - for (int i = 0; i < urls.length; ++i) { - - final URL url = urls[i]; + for (final URL url : urls) { assertNotNull(url); http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala index 1291181..2732112 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala @@ -641,7 +641,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { * arbitrary output type. */ def combineGroup[R: TypeInformation: ClassTag]( - combiner: FlatCombineFunction[T, R]): DataSet[R] = { + combiner: GroupCombineFunction[T, R]): DataSet[R] = { if (combiner == null) { throw new NullPointerException("Combine function must not be null.") } @@ -670,7 +670,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { if (fun == null) { throw new NullPointerException("Combine function must not be null.") } - val combiner = new FlatCombineFunction[T, R] { + val combiner = new GroupCombineFunction[T, R] { val cleanFun = clean(fun) def combine(in: java.lang.Iterable[T], out: Collector[R]) { cleanFun(in.iterator().asScala, out) http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala index eca4563..d547ea4 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala @@ -22,7 +22,7 @@ import org.apache.flink.api.java.functions.{KeySelector, FirstReducer} import org.apache.flink.api.scala.operators.ScalaAggregateOperator import scala.collection.JavaConverters._ import org.apache.commons.lang3.Validate -import org.apache.flink.api.common.functions.{FlatCombineFunction, GroupReduceFunction, ReduceFunction, Partitioner} +import org.apache.flink.api.common.functions.{GroupCombineFunction, GroupReduceFunction, ReduceFunction, Partitioner} import org.apache.flink.api.common.operators.Order import org.apache.flink.api.java.aggregation.Aggregations import org.apache.flink.api.java.operators._ @@ -370,7 +370,7 @@ class GroupedDataSet[T: ClassTag]( def combineGroup[R: TypeInformation: ClassTag]( fun: (Iterator[T], Collector[R]) => Unit): DataSet[R] = { Validate.notNull(fun, "GroupCombine function must not be null.") - val combiner = new FlatCombineFunction[T, R] { + val combiner = new GroupCombineFunction[T, R] { val cleanFun = set.clean(fun) def combine(in: java.lang.Iterable[T], out: Collector[R]) { cleanFun(in.iterator().asScala, out) @@ -396,7 +396,7 @@ class GroupedDataSet[T: ClassTag]( * arbitrary output type. */ def combineGroup[R: TypeInformation: ClassTag]( - combiner: FlatCombineFunction[T, R]): DataSet[R] = { + combiner: GroupCombineFunction[T, R]): DataSet[R] = { Validate.notNull(combiner, "GroupCombine function must not be null.") wrap( new GroupCombineOperator[T, R](maybeCreateSortedGrouping(), http://git-wip-us.apache.org/repos/asf/flink/blob/033c69f9/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java index 6631f07..e2a160d 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java @@ -21,14 +21,14 @@ package org.apache.flink.test.iterative.nephele.customdanglingpagerank; import java.util.Iterator; import org.apache.flink.api.common.functions.AbstractRichFunction; -import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRank; import org.apache.flink.util.Collector; public class CustomRankCombiner extends AbstractRichFunction implements GroupReduceFunction<VertexWithRank, VertexWithRank>, - FlatCombineFunction<VertexWithRank, VertexWithRank> + GroupCombineFunction<VertexWithRank, VertexWithRank> { private static final long serialVersionUID = 1L;
