[ https://issues.apache.org/jira/browse/FLINK-2237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15140897#comment-15140897 ]
ASF GitHub Bot commented on FLINK-2237: --------------------------------------- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r52465416 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java --- @@ -0,0 +1,1014 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeutils.SameTypePairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.RandomAccessInputView; +import org.apache.flink.runtime.memory.AbstractPagedOutputView; +import org.apache.flink.runtime.util.MathUtils; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * This hash table supports updating elements, and it also has processRecordWithReduce, + * which makes one reduce step with the given record. + * + * The memory is divided into three areas: + * - Bucket area: they contain bucket heads: + * an 8 byte pointer to the first link of a linked list in the record area + * - Record area: this contains the actual data in linked list elements. A linked list element starts + * with an 8 byte pointer to the next element, and then the record follows. + * - Staging area: This is a small, temporary storage area for writing updated records. This is needed, + * because before serializing a record, there is no way to know in advance how large will it be. + * Therefore, we can't serialize directly into the record area when we are doing an update, because + * if it turns out to be larger then the old record, then it would override some other record + * that happens to be after the old one in memory. The solution is to serialize to the staging area first, + * and then copy it to the place of the original if it has the same size, otherwise allocate a new linked + * list element at the end of the record area, and mark the old one as abandoned. This creates "holes" in + * the record area, so compactions are eventually needed. + * + * Compaction happens by deleting everything in the bucket area, and then reinserting all elements. + * The reinsertion happens by forgetting the structure (the linked lists) of the record area, and reading it + * sequentially, and inserting all non-abandoned records, starting from the beginning of the record area. + * Note, that insertions never override a record that have not been read by the reinsertion sweep, because + * both the insertions and readings happen sequentially in the record area, and the insertions obviously + * never overtake the reading sweep. + * + * Note: we have to abandon the old linked list element even when the updated record has a smaller size + * than the original, because otherwise we wouldn't know where the next record starts during a reinsertion + * sweep. + * + * The number of buckets depends on how large are the records. The serializer might be able to tell us this, + * so in this case, we will calculate the number of buckets upfront, and won't do resizes. + * If the serializer doesn't know the size, then we start with a small number of buckets, and do resizes as more + * elements are inserted than the number of buckets. + * + * The number of memory segments given to the staging area is usually one, because it just needs to hold + * one record. + * + * Note: For hashing, we need to use MathUtils.hash because of its avalanche property, so that + * changing only some high bits of the original value shouldn't leave the lower bits of the hash unaffected. + * This is because when choosing the bucket for a record, we mask only the + * lower bits (see numBucketsMask). Lots of collisions would occur when, for example, + * the original value that is hashed is some bitset, where lots of different values + * that are different only in the higher bits will actually occur. + */ + +public class ReduceHashTable<T> extends AbstractMutableHashTable<T> { + + private static final Logger LOG = LoggerFactory.getLogger(ReduceHashTable.class); + + /** The minimum number of memory segments ReduceHashTable needs to be supplied with in order to work. */ + private static final int MIN_NUM_MEMORY_SEGMENTS = 3; + + /** The last link in the linked lists will have this as next pointer. */ + private static final long END_OF_LIST = -1; + + /** + * The next pointer of a link will have this value, if it is not part of the linked list. + * (This can happen because the record couldn't be updated in-place due to a size change.) + * Note: the record that is in the link should still be readable, in order to be possible to determine + * the size of the place (see EntryIterator). + * Note: the last record in the record area can't be abandoned. (EntryIterator makes use of this fact.) + */ + private static final long ABANDONED_RECORD = -2; + + /** This value means that prevElemPtr is "pointing to the bucket head", and not into the record segments. */ + private static final long INVALID_PREV_POINTER = -3; + + private static final long RECORD_OFFSET_IN_LINK = 8; + + + /** this is used by processRecordWithReduce */ + private final ReduceFunction<T> reducer; + + /** emit() sends data to outputCollector */ + private final Collector<T> outputCollector; + + private final boolean objectReuseEnabled; + + /** + * This initially contains all the memory we have, and then segments + * are taken from it by bucketSegments, recordArea, and stagingSegments. + */ + private final ArrayList<MemorySegment> freeMemorySegments; + + private final int numAllMemorySegments; + + private final int segmentSize; + + /** + * These will contain the bucket heads. + * The bucket heads are pointers to the linked lists containing the actual records. + */ + private MemorySegment[] bucketSegments; + + private static final int bucketSize = 8, bucketSizeBits = 3; + + private int numBuckets; + private int numBucketsMask; + private final int numBucketsPerSegment, numBucketsPerSegmentBits, numBucketsPerSegmentMask; + + /** + * The segments where the actual data is stored. + */ + private final RecordArea recordArea; + + /** + * Segments for the staging area. + * (It should contain at most one record at all times.) + */ + private final ArrayList<MemorySegment> stagingSegments; + private final RandomAccessInputView stagingSegmentsInView; + private final StagingOutputView stagingSegmentsOutView; + + private T reuse; + + /** This is the internal prober that insertOrReplaceRecord and processRecordWithReduce use. */ + private final HashTableProber<T> prober; + + /** The number of elements currently held by the table. */ + private long numElements = 0; + + /** The number of bytes wasted by updates that couldn't overwrite the old record. */ + private long holes = 0; + + /** + * If the serializer knows the size of the records, then we can calculate the optimal number of buckets + * upfront, so we don't need resizes. + */ + private boolean enableResize; + + + /** + * This constructor is for the case when will only call those operations that are also + * present on CompactingHashTable. + */ + public ReduceHashTable(TypeSerializer<T> serializer, TypeComparator<T> comparator, List<MemorySegment> memory) { + this(serializer, comparator, memory, null, null, false); + } + + public ReduceHashTable(TypeSerializer<T> serializer, TypeComparator<T> comparator, List<MemorySegment> memory, + ReduceFunction<T> reducer, Collector<T> outputCollector, boolean objectReuseEnabled) { + super(serializer, comparator); + this.reducer = reducer; + this.numAllMemorySegments = memory.size(); + this.freeMemorySegments = new ArrayList<>(memory); + this.outputCollector = outputCollector; + this.objectReuseEnabled = objectReuseEnabled; + + // some sanity checks first + if (freeMemorySegments.size() < MIN_NUM_MEMORY_SEGMENTS) { + throw new IllegalArgumentException("Too few memory segments provided. ReduceHashTable needs at least " + + MIN_NUM_MEMORY_SEGMENTS + " memory segments."); + } + + // Get the size of the first memory segment and record it. All further buffers must have the same size. + // the size must also be a power of 2 + segmentSize = freeMemorySegments.get(0).size(); + if ( (segmentSize & segmentSize - 1) != 0) { + throw new IllegalArgumentException("Hash Table requires buffers whose size is a power of 2."); + } + + this.numBucketsPerSegment = segmentSize / bucketSize; + this.numBucketsPerSegmentBits = MathUtils.log2strict(this.numBucketsPerSegment); + this.numBucketsPerSegmentMask = (1 << this.numBucketsPerSegmentBits) - 1; + + recordArea = new RecordArea(segmentSize); + + stagingSegments = new ArrayList<>(); + stagingSegmentsInView = new RandomAccessInputView(stagingSegments, segmentSize); + stagingSegmentsOutView = new StagingOutputView(stagingSegments, segmentSize); + + prober = new HashTableProber<>(buildSideComparator, new SameTypePairComparator<>(buildSideComparator)); + + enableResize = buildSideSerializer.getLength() == -1; + } + + private void open(int numBucketSegments) { + synchronized (stateLock) { + if (!closed) { + throw new IllegalStateException("currently not closed."); + } + closed = false; + } + + allocateBucketSegments(numBucketSegments); + + stagingSegments.add(allocateSegment()); --- End diff -- This shouldn't happen, because the size of the bucket area is determined to never encompass all the segments we have. (Note that at the beginning of `open`, all segments should be free.) But nevertheless I'll add a check and throw a "This shouldn't happen" RuntimeException. > Add hash-based Aggregation > -------------------------- > > Key: FLINK-2237 > URL: https://issues.apache.org/jira/browse/FLINK-2237 > Project: Flink > Issue Type: New Feature > Reporter: Rafiullah Momand > Assignee: Gabor Gevay > Priority: Minor > > Aggregation functions at the moment are implemented in a sort-based way. > How can we implement hash based Aggregation for Flink? -- This message was sent by Atlassian JIRA (v6.3.4#6332)