[
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15301070#comment-15301070
]
ASF GitHub Bot commented on FLINK-3477:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/1517#discussion_r64668630
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedReduceCombineDriver.java
---
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators.chaining;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.BatchTask;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.hash.ReduceHashTable;
+import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter;
+import org.apache.flink.runtime.operators.sort.InMemorySorter;
+import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
+import org.apache.flink.runtime.operators.sort.QuickSort;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Chained version of ReduceCombineDriver.
+ */
+public class ChainedReduceCombineDriver<T> extends ChainedDriver<T, T> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ChainedReduceCombineDriver.class);
+
+ /** Fix length records with a length below this threshold will be
in-place sorted, if possible. */
+ private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
+
+
+ private AbstractInvokable parent;
+
+ private TypeSerializer<T> serializer;
+
+ private TypeComparator<T> comparator;
+
+ private ReduceFunction<T> reducer;
+
+ private DriverStrategy strategy;
+
+ private InMemorySorter<T> sorter;
+
+ private QuickSort sortAlgo = new QuickSort();
+
+ private ReduceHashTable<T> table;
+
+ private List<MemorySegment> memory;
+
+ private volatile boolean canceled;
+
+ //
------------------------------------------------------------------------
+
+ @Override
+ public Function getStub() {
+ return reducer;
+ }
+
+ @Override
+ public String getTaskName() {
+ return taskName;
+ }
+
+ @Override
+ public void setup(AbstractInvokable parent) {
+ this.parent = parent;
+ canceled = false;
+
+ strategy = config.getDriverStrategy();
+
+ reducer = BatchTask.instantiateUserCode(config,
userCodeClassLoader, ReduceFunction.class);
+ FunctionUtils.setFunctionRuntimeContext(reducer,
getUdfRuntimeContext());
+ }
+
+ @Override
+ public void openTask() throws Exception {
+ // open the stub first
+ final Configuration stubConfig = config.getStubParameters();
+ BatchTask.openUserCode(reducer, stubConfig);
+
+ // instantiate the serializer / comparator
+ serializer = config.<T>getInputSerializer(0,
userCodeClassLoader).getSerializer();
+ comparator = config.<T>getDriverComparator(0,
userCodeClassLoader).createComparator();
+
+ MemoryManager memManager =
parent.getEnvironment().getMemoryManager();
+ final int numMemoryPages =
memManager.computeNumberOfPages(config.getRelativeMemoryDriver());
+ memory = memManager.allocatePages(parent, numMemoryPages);
+
+ LOG.debug("ChainedReduceCombineDriver object reuse: " +
(objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
+
+ switch (strategy) {
+ case SORTED_PARTIAL_REDUCE:
+ // instantiate a fix-length in-place sorter, if
possible, otherwise the out-of-place sorter
+ if
(this.comparator.supportsSerializationWithKeyNormalization() &&
--- End diff --
You can remove `this` from the copied code. Will be more consistent within
this class.
> Add hash-based combine strategy for ReduceFunction
> --------------------------------------------------
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
> Issue Type: Sub-task
> Components: Local Runtime
> Reporter: Fabian Hueske
> Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a
> single value. A Reduce function is incrementally applied to compute a final
> aggregated value. This allows to hold the preaggregated value in a hash-table
> and update it with each function call.
> The hash-based strategy requires special implementation of an in-memory hash
> table. The hash table should support in place updates of elements (if the
> updated value has the same size as the new value) but also appending updates
> with invalidation of the old value (if the binary length of the new value
> differs). The hash table needs to be able to evict and emit all elements if
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the
> execution strategy.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)