[
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15301097#comment-15301097
]
ASF GitHub Bot commented on FLINK-3477:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/1517#discussion_r64669861
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ * - Bucket area: they contain bucket heads:
+ * an 8 byte pointer to the first link of a linked list in the record
area
+ * - Record area: this contains the actual data in linked list elements.
A linked list element starts
+ * with an 8 byte pointer to the next element, and then the record
follows.
+ * - Staging area: This is a small, temporary storage area for writing
updated records. This is needed,
+ * because before serializing a record, there is no way to know in
advance how large will it be.
+ * Therefore, we can't serialize directly into the record area when we
are doing an update, because
+ * if it turns out to be larger then the old record, then it would
override some other record
+ * that happens to be after the old one in memory. The solution is to
serialize to the staging area first,
+ * and then copy it to the place of the original if it has the same
size, otherwise allocate a new linked
+ * list element at the end of the record area, and mark the old one as
abandoned. This creates "holes" in
+ * the record area, so compactions are eventually needed.
+ *
+ * Compaction happens by deleting everything in the bucket area, and then
reinserting all elements.
+ * The reinsertion happens by forgetting the structure (the linked lists)
of the record area, and reading it
+ * sequentially, and inserting all non-abandoned records, starting from
the beginning of the record area.
+ * Note, that insertions never override a record that have not been read
by the reinsertion sweep, because
+ * both the insertions and readings happen sequentially in the record
area, and the insertions obviously
+ * never overtake the reading sweep.
+ *
+ * Note: we have to abandon the old linked list element even when the
updated record has a smaller size
+ * than the original, because otherwise we wouldn't know where the next
record starts during a reinsertion
+ * sweep.
+ *
+ * The number of buckets depends on how large are the records. The
serializer might be able to tell us this,
+ * so in this case, we will calculate the number of buckets upfront, and
won't do resizes.
+ * If the serializer doesn't know the size, then we start with a small
number of buckets, and do resizes as more
+ * elements are inserted than the number of buckets.
+ *
+ * The number of memory segments given to the staging area is usually
one, because it just needs to hold
+ * one record.
+ *
+ * Note: For hashing, we need to use MathUtils.hash because of its
avalanche property, so that
+ * changing only some high bits of the original value shouldn't leave the
lower bits of the hash unaffected.
+ * This is because when choosing the bucket for a record, we mask only the
+ * lower bits (see numBucketsMask). Lots of collisions would occur when,
for example,
+ * the original value that is hashed is some bitset, where lots of
different values
+ * that are different only in the higher bits will actually occur.
+ */
+
+public class ReduceHashTable<T> extends AbstractMutableHashTable<T> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ReduceHashTable.class);
+
+ /** The minimum number of memory segments ReduceHashTable needs to be
supplied with in order to work. */
+ private static final int MIN_NUM_MEMORY_SEGMENTS = 3;
+
+ // Note: the following two constants can't be negative, because
negative values are reserved for storing the
+ // negated size of the record, when it is abandoned (not part of any
linked list).
+
+ /** The last link in the linked lists will have this as next pointer. */
+ private static final long END_OF_LIST = Long.MAX_VALUE;
+
+ /** This value means that prevElemPtr is "pointing to the bucket head",
and not into the record segments. */
+ private static final long INVALID_PREV_POINTER = Long.MAX_VALUE - 1;
+
+
+ private static final long RECORD_OFFSET_IN_LINK = 8;
+
+
+ /** this is used by processRecordWithReduce */
+ private final ReduceFunction<T> reducer;
+
+ /** emit() sends data to outputCollector */
+ private final Collector<T> outputCollector;
+
+ private final boolean objectReuseEnabled;
+
+ /**
+ * This initially contains all the memory we have, and then segments
+ * are taken from it by bucketSegments, recordArea, and stagingSegments.
+ */
+ private final ArrayList<MemorySegment> freeMemorySegments;
+
+ private final int numAllMemorySegments;
+
+ private final int segmentSize;
+
+ /**
+ * These will contain the bucket heads.
+ * The bucket heads are pointers to the linked lists containing the
actual records.
+ */
+ private MemorySegment[] bucketSegments;
+
+ private static final int bucketSize = 8, bucketSizeBits = 3;
+
+ private int numBuckets;
+ private int numBucketsMask;
+ private final int numBucketsPerSegment, numBucketsPerSegmentBits,
numBucketsPerSegmentMask;
+
+ /**
+ * The segments where the actual data is stored.
+ */
+ private final RecordArea recordArea;
+
+ /**
+ * Segments for the staging area.
+ * (It should contain at most one record at all times.)
+ */
+ private final ArrayList<MemorySegment> stagingSegments;
+ private final RandomAccessInputView stagingSegmentsInView;
+ private final StagingOutputView stagingSegmentsOutView;
+
+ private T reuse;
+
+ /** This is the internal prober that insertOrReplaceRecord and
processRecordWithReduce use. */
+ private final HashTableProber<T> prober;
+
+ /** The number of elements currently held by the table. */
+ private long numElements = 0;
+
+ /** The number of bytes wasted by updates that couldn't overwrite the
old record due to size change. */
+ private long holes = 0;
+
+ /**
+ * If the serializer knows the size of the records, then we can
calculate the optimal number of buckets
+ * upfront, so we don't need resizes.
+ */
+ private boolean enableResize;
+
+
+ /**
+ * This constructor is for the case when will only call those
operations that are also
+ * present on CompactingHashTable.
+ */
+ public ReduceHashTable(TypeSerializer<T> serializer, TypeComparator<T>
comparator, List<MemorySegment> memory) {
+ this(serializer, comparator, memory, null, null, false);
+ }
+
+ public ReduceHashTable(TypeSerializer<T> serializer, TypeComparator<T>
comparator, List<MemorySegment> memory,
--- End diff --
Can we remove the `ReduceFunction` and `Collector` from this class and move
the corresponding logic into the driver?
It would be good if this table could also be used by a `CombineFunction`
(not only a `ReduceFunction`).
So, I would remove the following methods:
- `processRecordWithReduce()`: can be implemented by the driver using the
Prober methods `getMatchFor()` and `updateMatch()`
- `emit()`: can be implemented by the driver using `getEntryIterator()`
- `emitAndReset()`: same as `emit()` but we need an additional `reset()`
method
I would also rename the table if it becomes less specialized, maybe to
`InPlaceMutableHashTable` or do you have a better idea, @ggevay?
> Add hash-based combine strategy for ReduceFunction
> --------------------------------------------------
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
> Issue Type: Sub-task
> Components: Local Runtime
> Reporter: Fabian Hueske
> Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a
> single value. A Reduce function is incrementally applied to compute a final
> aggregated value. This allows to hold the preaggregated value in a hash-table
> and update it with each function call.
> The hash-based strategy requires special implementation of an in-memory hash
> table. The hash table should support in place updates of elements (if the
> updated value has the same size as the new value) but also appending updates
> with invalidation of the old value (if the binary length of the new value
> differs). The hash table needs to be able to evict and emit all elements if
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the
> execution strategy.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)