[ 
https://issues.apache.org/jira/browse/FLINK-2237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15140897#comment-15140897
 ] 

ASF GitHub Bot commented on FLINK-2237:
---------------------------------------

Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1517#discussion_r52465416
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
    @@ -0,0 +1,1014 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.operators.hash;
    +
    +import org.apache.flink.api.common.functions.ReduceFunction;
    +import org.apache.flink.api.common.typeutils.SameTypePairComparator;
    +import org.apache.flink.api.common.typeutils.TypeComparator;
    +import org.apache.flink.api.common.typeutils.TypePairComparator;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.MemorySegment;
    +import org.apache.flink.runtime.io.disk.RandomAccessInputView;
    +import org.apache.flink.runtime.memory.AbstractPagedOutputView;
    +import org.apache.flink.runtime.util.MathUtils;
    +import org.apache.flink.util.Collector;
    +import org.apache.flink.util.MutableObjectIterator;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.EOFException;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.List;
    +
    +/**
    + * This hash table supports updating elements, and it also has 
processRecordWithReduce,
    + * which makes one reduce step with the given record.
    + *
    + * The memory is divided into three areas:
    + *  - Bucket area: they contain bucket heads:
    + *    an 8 byte pointer to the first link of a linked list in the record 
area
    + *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
    + *    with an 8 byte pointer to the next element, and then the record 
follows.
    + *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
    + *    because before serializing a record, there is no way to know in 
advance how large will it be.
    + *    Therefore, we can't serialize directly into the record area when we 
are doing an update, because
    + *    if it turns out to be larger then the old record, then it would 
override some other record
    + *    that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
    + *    and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
    + *    list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
    + *    the record area, so compactions are eventually needed.
    + *
    + *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
    + *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
    + *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
    + *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
    + *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
    + *  never overtake the reading sweep.
    + *
    + *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
    + *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
    + *  sweep.
    + *
    + *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
    + *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
    + *  If the serializer doesn't know the size, then we start with a small 
number of buckets, and do resizes as more
    + *  elements are inserted than the number of buckets.
    + *
    + *  The number of memory segments given to the staging area is usually 
one, because it just needs to hold
    + *  one record.
    + *
    + * Note: For hashing, we need to use MathUtils.hash because of its 
avalanche property, so that
    + * changing only some high bits of the original value shouldn't leave the 
lower bits of the hash unaffected.
    + * This is because when choosing the bucket for a record, we mask only the
    + * lower bits (see numBucketsMask). Lots of collisions would occur when, 
for example,
    + * the original value that is hashed is some bitset, where lots of 
different values
    + * that are different only in the higher bits will actually occur.
    + */
    +
    +public class ReduceHashTable<T> extends AbstractMutableHashTable<T> {
    +
    +   private static final Logger LOG = 
LoggerFactory.getLogger(ReduceHashTable.class);
    +
    +   /** The minimum number of memory segments ReduceHashTable needs to be 
supplied with in order to work. */
    +   private static final int MIN_NUM_MEMORY_SEGMENTS = 3;
    +
    +   /** The last link in the linked lists will have this as next pointer. */
    +   private static final long END_OF_LIST = -1;
    +
    +   /**
    +    * The next pointer of a link will have this value, if it is not part 
of the linked list.
    +    * (This can happen because the record couldn't be updated in-place due 
to a size change.)
    +    * Note: the record that is in the link should still be readable, in 
order to be possible to determine
    +    * the size of the place (see EntryIterator).
    +    * Note: the last record in the record area can't be abandoned. 
(EntryIterator makes use of this fact.)
    +    */
    +   private static final long ABANDONED_RECORD = -2;
    +
    +   /** This value means that prevElemPtr is "pointing to the bucket head", 
and not into the record segments. */
    +   private static final long INVALID_PREV_POINTER = -3;
    +
    +   private static final long RECORD_OFFSET_IN_LINK = 8;
    +
    +
    +   /** this is used by processRecordWithReduce */
    +   private final ReduceFunction<T> reducer;
    +
    +   /** emit() sends data to outputCollector */
    +   private final Collector<T> outputCollector;
    +
    +   private final boolean objectReuseEnabled;
    +
    +   /**
    +    * This initially contains all the memory we have, and then segments
    +    * are taken from it by bucketSegments, recordArea, and stagingSegments.
    +    */
    +   private final ArrayList<MemorySegment> freeMemorySegments;
    +
    +   private final int numAllMemorySegments;
    +
    +   private final int segmentSize;
    +
    +   /**
    +    * These will contain the bucket heads.
    +    * The bucket heads are pointers to the linked lists containing the 
actual records.
    +    */
    +   private MemorySegment[] bucketSegments;
    +
    +   private static final int bucketSize = 8, bucketSizeBits = 3;
    +
    +   private int numBuckets;
    +   private int numBucketsMask;
    +   private final int numBucketsPerSegment, numBucketsPerSegmentBits, 
numBucketsPerSegmentMask;
    +
    +   /**
    +    * The segments where the actual data is stored.
    +    */
    +   private final RecordArea recordArea;
    +
    +   /**
    +    * Segments for the staging area.
    +    * (It should contain at most one record at all times.)
    +    */
    +   private final ArrayList<MemorySegment> stagingSegments;
    +   private final RandomAccessInputView stagingSegmentsInView;
    +   private final StagingOutputView stagingSegmentsOutView;
    +
    +   private T reuse;
    +
    +   /** This is the internal prober that insertOrReplaceRecord and 
processRecordWithReduce use. */
    +   private final HashTableProber<T> prober;
    +
    +   /** The number of elements currently held by the table. */
    +   private long numElements = 0;
    +
    +   /** The number of bytes wasted by updates that couldn't overwrite the 
old record. */
    +   private long holes = 0;
    +
    +   /**
    +    * If the serializer knows the size of the records, then we can 
calculate the optimal number of buckets
    +    * upfront, so we don't need resizes.
    +    */
    +   private boolean enableResize;
    +
    +
    +   /**
    +    * This constructor is for the case when will only call those 
operations that are also
    +    * present on CompactingHashTable.
    +    */
    +   public ReduceHashTable(TypeSerializer<T> serializer, TypeComparator<T> 
comparator, List<MemorySegment> memory) {
    +           this(serializer, comparator, memory, null, null, false);
    +   }
    +
    +   public ReduceHashTable(TypeSerializer<T> serializer, TypeComparator<T> 
comparator, List<MemorySegment> memory,
    +                                           ReduceFunction<T> reducer, 
Collector<T> outputCollector, boolean objectReuseEnabled) {
    +           super(serializer, comparator);
    +           this.reducer = reducer;
    +           this.numAllMemorySegments = memory.size();
    +           this.freeMemorySegments = new ArrayList<>(memory);
    +           this.outputCollector = outputCollector;
    +           this.objectReuseEnabled = objectReuseEnabled;
    +
    +           // some sanity checks first
    +           if (freeMemorySegments.size() < MIN_NUM_MEMORY_SEGMENTS) {
    +                   throw new IllegalArgumentException("Too few memory 
segments provided. ReduceHashTable needs at least " +
    +                           MIN_NUM_MEMORY_SEGMENTS + " memory segments.");
    +           }
    +
    +           // Get the size of the first memory segment and record it. All 
further buffers must have the same size.
    +           // the size must also be a power of 2
    +           segmentSize = freeMemorySegments.get(0).size();
    +           if ( (segmentSize & segmentSize - 1) != 0) {
    +                   throw new IllegalArgumentException("Hash Table requires 
buffers whose size is a power of 2.");
    +           }
    +
    +           this.numBucketsPerSegment = segmentSize / bucketSize;
    +           this.numBucketsPerSegmentBits = 
MathUtils.log2strict(this.numBucketsPerSegment);
    +           this.numBucketsPerSegmentMask = (1 << 
this.numBucketsPerSegmentBits) - 1;
    +
    +           recordArea = new RecordArea(segmentSize);
    +
    +           stagingSegments = new ArrayList<>();
    +           stagingSegmentsInView = new 
RandomAccessInputView(stagingSegments, segmentSize);
    +           stagingSegmentsOutView = new StagingOutputView(stagingSegments, 
segmentSize);
    +
    +           prober = new HashTableProber<>(buildSideComparator, new 
SameTypePairComparator<>(buildSideComparator));
    +
    +           enableResize = buildSideSerializer.getLength() == -1;
    +   }
    +
    +   private void open(int numBucketSegments) {
    +           synchronized (stateLock) {
    +                   if (!closed) {
    +                           throw new IllegalStateException("currently not 
closed.");
    +                   }
    +                   closed = false;
    +           }
    +
    +           allocateBucketSegments(numBucketSegments);
    +
    +           stagingSegments.add(allocateSegment());
    --- End diff --
    
    This shouldn't happen, because the size of the bucket area is determined to 
never encompass all the segments we have. (Note that at the beginning of 
`open`, all segments should be free.)
    
    But nevertheless I'll add a check and throw a "This shouldn't happen" 
RuntimeException.


> Add hash-based Aggregation
> --------------------------
>
>                 Key: FLINK-2237
>                 URL: https://issues.apache.org/jira/browse/FLINK-2237
>             Project: Flink
>          Issue Type: New Feature
>            Reporter: Rafiullah Momand
>            Assignee: Gabor Gevay
>            Priority: Minor
>
> Aggregation functions at the moment are implemented in a sort-based way.
> How can we implement hash based Aggregation for Flink?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to