[ https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15310391#comment-15310391 ]
ASF GitHub Bot commented on FLINK-3477: --------------------------------------- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r65371616 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MemoryHashTableTestCommon.java --- @@ -0,0 +1,666 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.operators.testutils.UniformStringPairGenerator; +import org.apache.flink.runtime.operators.testutils.types.IntList; +import org.apache.flink.runtime.operators.testutils.types.IntListComparator; +import org.apache.flink.runtime.operators.testutils.types.IntListPairComparator; +import org.apache.flink.runtime.operators.testutils.types.IntListSerializer; +import org.apache.flink.runtime.operators.testutils.types.IntPair; +import org.apache.flink.runtime.operators.testutils.types.IntPairComparator; +import org.apache.flink.runtime.operators.testutils.types.IntPairListPairComparator; +import org.apache.flink.runtime.operators.testutils.types.IntPairPairComparator; +import org.apache.flink.runtime.operators.testutils.types.IntPairSerializer; +import org.apache.flink.runtime.operators.testutils.types.StringPair; +import org.apache.flink.runtime.operators.testutils.types.StringPairComparator; +import org.apache.flink.runtime.operators.testutils.types.StringPairPairComparator; +import org.apache.flink.runtime.operators.testutils.types.StringPairSerializer; +import org.apache.flink.util.MutableObjectIterator; + +import org.junit.Test; + +import org.powermock.reflect.Whitebox; + +import static org.junit.Assert.*; + + +public class MemoryHashTableTestCommon { --- End diff -- Good idea! 5c27d4717bae1e6fe27e489806d0ebf22316fe85 > Add hash-based combine strategy for ReduceFunction > -------------------------------------------------- > > Key: FLINK-3477 > URL: https://issues.apache.org/jira/browse/FLINK-3477 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime > Reporter: Fabian Hueske > Assignee: Gabor Gevay > > This issue is about adding a hash-based combine strategy for ReduceFunctions. > The interface of the {{reduce()}} method is as follows: > {code} > public T reduce(T v1, T v2) > {code} > Input type and output type are identical and the function returns only a > single value. A Reduce function is incrementally applied to compute a final > aggregated value. This allows to hold the preaggregated value in a hash-table > and update it with each function call. > The hash-based strategy requires special implementation of an in-memory hash > table. The hash table should support in place updates of elements (if the > updated value has the same size as the new value) but also appending updates > with invalidation of the old value (if the binary length of the new value > differs). The hash table needs to be able to evict and emit all elements if > it runs out-of-memory. > We should also add {{HASH}} and {{SORT}} compiler hints to > {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the > execution strategy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)