[
https://issues.apache.org/jira/browse/FLINK-2237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15140897#comment-15140897
]
ASF GitHub Bot commented on FLINK-2237:
---------------------------------------
Github user ggevay commented on a diff in the pull request:
https://github.com/apache/flink/pull/1517#discussion_r52465416
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
---
@@ -0,0 +1,1014 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.runtime.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ * - Bucket area: they contain bucket heads:
+ * an 8 byte pointer to the first link of a linked list in the record
area
+ * - Record area: this contains the actual data in linked list elements.
A linked list element starts
+ * with an 8 byte pointer to the next element, and then the record
follows.
+ * - Staging area: This is a small, temporary storage area for writing
updated records. This is needed,
+ * because before serializing a record, there is no way to know in
advance how large will it be.
+ * Therefore, we can't serialize directly into the record area when we
are doing an update, because
+ * if it turns out to be larger then the old record, then it would
override some other record
+ * that happens to be after the old one in memory. The solution is to
serialize to the staging area first,
+ * and then copy it to the place of the original if it has the same
size, otherwise allocate a new linked
+ * list element at the end of the record area, and mark the old one as
abandoned. This creates "holes" in
+ * the record area, so compactions are eventually needed.
+ *
+ * Compaction happens by deleting everything in the bucket area, and then
reinserting all elements.
+ * The reinsertion happens by forgetting the structure (the linked lists)
of the record area, and reading it
+ * sequentially, and inserting all non-abandoned records, starting from
the beginning of the record area.
+ * Note, that insertions never override a record that have not been read
by the reinsertion sweep, because
+ * both the insertions and readings happen sequentially in the record
area, and the insertions obviously
+ * never overtake the reading sweep.
+ *
+ * Note: we have to abandon the old linked list element even when the
updated record has a smaller size
+ * than the original, because otherwise we wouldn't know where the next
record starts during a reinsertion
+ * sweep.
+ *
+ * The number of buckets depends on how large are the records. The
serializer might be able to tell us this,
+ * so in this case, we will calculate the number of buckets upfront, and
won't do resizes.
+ * If the serializer doesn't know the size, then we start with a small
number of buckets, and do resizes as more
+ * elements are inserted than the number of buckets.
+ *
+ * The number of memory segments given to the staging area is usually
one, because it just needs to hold
+ * one record.
+ *
+ * Note: For hashing, we need to use MathUtils.hash because of its
avalanche property, so that
+ * changing only some high bits of the original value shouldn't leave the
lower bits of the hash unaffected.
+ * This is because when choosing the bucket for a record, we mask only the
+ * lower bits (see numBucketsMask). Lots of collisions would occur when,
for example,
+ * the original value that is hashed is some bitset, where lots of
different values
+ * that are different only in the higher bits will actually occur.
+ */
+
+public class ReduceHashTable<T> extends AbstractMutableHashTable<T> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ReduceHashTable.class);
+
+ /** The minimum number of memory segments ReduceHashTable needs to be
supplied with in order to work. */
+ private static final int MIN_NUM_MEMORY_SEGMENTS = 3;
+
+ /** The last link in the linked lists will have this as next pointer. */
+ private static final long END_OF_LIST = -1;
+
+ /**
+ * The next pointer of a link will have this value, if it is not part
of the linked list.
+ * (This can happen because the record couldn't be updated in-place due
to a size change.)
+ * Note: the record that is in the link should still be readable, in
order to be possible to determine
+ * the size of the place (see EntryIterator).
+ * Note: the last record in the record area can't be abandoned.
(EntryIterator makes use of this fact.)
+ */
+ private static final long ABANDONED_RECORD = -2;
+
+ /** This value means that prevElemPtr is "pointing to the bucket head",
and not into the record segments. */
+ private static final long INVALID_PREV_POINTER = -3;
+
+ private static final long RECORD_OFFSET_IN_LINK = 8;
+
+
+ /** this is used by processRecordWithReduce */
+ private final ReduceFunction<T> reducer;
+
+ /** emit() sends data to outputCollector */
+ private final Collector<T> outputCollector;
+
+ private final boolean objectReuseEnabled;
+
+ /**
+ * This initially contains all the memory we have, and then segments
+ * are taken from it by bucketSegments, recordArea, and stagingSegments.
+ */
+ private final ArrayList<MemorySegment> freeMemorySegments;
+
+ private final int numAllMemorySegments;
+
+ private final int segmentSize;
+
+ /**
+ * These will contain the bucket heads.
+ * The bucket heads are pointers to the linked lists containing the
actual records.
+ */
+ private MemorySegment[] bucketSegments;
+
+ private static final int bucketSize = 8, bucketSizeBits = 3;
+
+ private int numBuckets;
+ private int numBucketsMask;
+ private final int numBucketsPerSegment, numBucketsPerSegmentBits,
numBucketsPerSegmentMask;
+
+ /**
+ * The segments where the actual data is stored.
+ */
+ private final RecordArea recordArea;
+
+ /**
+ * Segments for the staging area.
+ * (It should contain at most one record at all times.)
+ */
+ private final ArrayList<MemorySegment> stagingSegments;
+ private final RandomAccessInputView stagingSegmentsInView;
+ private final StagingOutputView stagingSegmentsOutView;
+
+ private T reuse;
+
+ /** This is the internal prober that insertOrReplaceRecord and
processRecordWithReduce use. */
+ private final HashTableProber<T> prober;
+
+ /** The number of elements currently held by the table. */
+ private long numElements = 0;
+
+ /** The number of bytes wasted by updates that couldn't overwrite the
old record. */
+ private long holes = 0;
+
+ /**
+ * If the serializer knows the size of the records, then we can
calculate the optimal number of buckets
+ * upfront, so we don't need resizes.
+ */
+ private boolean enableResize;
+
+
+ /**
+ * This constructor is for the case when will only call those
operations that are also
+ * present on CompactingHashTable.
+ */
+ public ReduceHashTable(TypeSerializer<T> serializer, TypeComparator<T>
comparator, List<MemorySegment> memory) {
+ this(serializer, comparator, memory, null, null, false);
+ }
+
+ public ReduceHashTable(TypeSerializer<T> serializer, TypeComparator<T>
comparator, List<MemorySegment> memory,
+ ReduceFunction<T> reducer,
Collector<T> outputCollector, boolean objectReuseEnabled) {
+ super(serializer, comparator);
+ this.reducer = reducer;
+ this.numAllMemorySegments = memory.size();
+ this.freeMemorySegments = new ArrayList<>(memory);
+ this.outputCollector = outputCollector;
+ this.objectReuseEnabled = objectReuseEnabled;
+
+ // some sanity checks first
+ if (freeMemorySegments.size() < MIN_NUM_MEMORY_SEGMENTS) {
+ throw new IllegalArgumentException("Too few memory
segments provided. ReduceHashTable needs at least " +
+ MIN_NUM_MEMORY_SEGMENTS + " memory segments.");
+ }
+
+ // Get the size of the first memory segment and record it. All
further buffers must have the same size.
+ // the size must also be a power of 2
+ segmentSize = freeMemorySegments.get(0).size();
+ if ( (segmentSize & segmentSize - 1) != 0) {
+ throw new IllegalArgumentException("Hash Table requires
buffers whose size is a power of 2.");
+ }
+
+ this.numBucketsPerSegment = segmentSize / bucketSize;
+ this.numBucketsPerSegmentBits =
MathUtils.log2strict(this.numBucketsPerSegment);
+ this.numBucketsPerSegmentMask = (1 <<
this.numBucketsPerSegmentBits) - 1;
+
+ recordArea = new RecordArea(segmentSize);
+
+ stagingSegments = new ArrayList<>();
+ stagingSegmentsInView = new
RandomAccessInputView(stagingSegments, segmentSize);
+ stagingSegmentsOutView = new StagingOutputView(stagingSegments,
segmentSize);
+
+ prober = new HashTableProber<>(buildSideComparator, new
SameTypePairComparator<>(buildSideComparator));
+
+ enableResize = buildSideSerializer.getLength() == -1;
+ }
+
+ private void open(int numBucketSegments) {
+ synchronized (stateLock) {
+ if (!closed) {
+ throw new IllegalStateException("currently not
closed.");
+ }
+ closed = false;
+ }
+
+ allocateBucketSegments(numBucketSegments);
+
+ stagingSegments.add(allocateSegment());
--- End diff --
This shouldn't happen, because the size of the bucket area is determined to
never encompass all the segments we have. (Note that at the beginning of
`open`, all segments should be free.)
But nevertheless I'll add a check and throw a "This shouldn't happen"
RuntimeException.
> Add hash-based Aggregation
> --------------------------
>
> Key: FLINK-2237
> URL: https://issues.apache.org/jira/browse/FLINK-2237
> Project: Flink
> Issue Type: New Feature
> Reporter: Rafiullah Momand
> Assignee: Gabor Gevay
> Priority: Minor
>
> Aggregation functions at the moment are implemented in a sort-based way.
> How can we implement hash based Aggregation for Flink?
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)