[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15310421#comment-15310421
 ] 

ASF GitHub Bot commented on FLINK-3477:
---------------------------------------

Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1517#discussion_r65376284
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
    @@ -0,0 +1,1048 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.operators.hash;
    +
    +import org.apache.flink.api.common.functions.ReduceFunction;
    +import org.apache.flink.api.common.typeutils.SameTypePairComparator;
    +import org.apache.flink.api.common.typeutils.TypeComparator;
    +import org.apache.flink.api.common.typeutils.TypePairComparator;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.MemorySegment;
    +import org.apache.flink.runtime.io.disk.RandomAccessInputView;
    +import org.apache.flink.runtime.memory.AbstractPagedOutputView;
    +import org.apache.flink.util.MathUtils;
    +import org.apache.flink.util.Collector;
    +import org.apache.flink.util.MutableObjectIterator;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.EOFException;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.List;
    +
    +/**
    + * This hash table supports updating elements, and it also has 
processRecordWithReduce,
    + * which makes one reduce step with the given record.
    + *
    + * The memory is divided into three areas:
    + *  - Bucket area: they contain bucket heads:
    + *    an 8 byte pointer to the first link of a linked list in the record 
area
    + *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
    + *    with an 8 byte pointer to the next element, and then the record 
follows.
    + *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
    + *    because before serializing a record, there is no way to know in 
advance how large will it be.
    + *    Therefore, we can't serialize directly into the record area when we 
are doing an update, because
    + *    if it turns out to be larger then the old record, then it would 
override some other record
    + *    that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
    + *    and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
    + *    list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
    + *    the record area, so compactions are eventually needed.
    + *
    + *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
    + *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
    + *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
    + *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
    + *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
    + *  never overtake the reading sweep.
    + *
    + *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
    + *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
    + *  sweep.
    + *
    + *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
    + *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
    + *  If the serializer doesn't know the size, then we start with a small 
number of buckets, and do resizes as more
    + *  elements are inserted than the number of buckets.
    + *
    + *  The number of memory segments given to the staging area is usually 
one, because it just needs to hold
    + *  one record.
    + *
    + * Note: For hashing, we need to use MathUtils.hash because of its 
avalanche property, so that
    + * changing only some high bits of the original value shouldn't leave the 
lower bits of the hash unaffected.
    + * This is because when choosing the bucket for a record, we mask only the
    + * lower bits (see numBucketsMask). Lots of collisions would occur when, 
for example,
    + * the original value that is hashed is some bitset, where lots of 
different values
    + * that are different only in the higher bits will actually occur.
    + */
    +
    +public class ReduceHashTable<T> extends AbstractMutableHashTable<T> {
    +
    +   private static final Logger LOG = 
LoggerFactory.getLogger(ReduceHashTable.class);
    +
    +   /** The minimum number of memory segments ReduceHashTable needs to be 
supplied with in order to work. */
    +   private static final int MIN_NUM_MEMORY_SEGMENTS = 3;
    +
    +   // Note: the following two constants can't be negative, because 
negative values are reserved for storing the
    +   // negated size of the record, when it is abandoned (not part of any 
linked list).
    +
    +   /** The last link in the linked lists will have this as next pointer. */
    +   private static final long END_OF_LIST = Long.MAX_VALUE;
    +
    +   /** This value means that prevElemPtr is "pointing to the bucket head", 
and not into the record segments. */
    +   private static final long INVALID_PREV_POINTER = Long.MAX_VALUE - 1;
    +
    +
    +   private static final long RECORD_OFFSET_IN_LINK = 8;
    +
    +
    +   /** this is used by processRecordWithReduce */
    +   private final ReduceFunction<T> reducer;
    +
    +   /** emit() sends data to outputCollector */
    +   private final Collector<T> outputCollector;
    +
    +   private final boolean objectReuseEnabled;
    +
    +   /**
    +    * This initially contains all the memory we have, and then segments
    +    * are taken from it by bucketSegments, recordArea, and stagingSegments.
    +    */
    +   private final ArrayList<MemorySegment> freeMemorySegments;
    +
    +   private final int numAllMemorySegments;
    +
    +   private final int segmentSize;
    +
    +   /**
    +    * These will contain the bucket heads.
    +    * The bucket heads are pointers to the linked lists containing the 
actual records.
    +    */
    +   private MemorySegment[] bucketSegments;
    +
    +   private static final int bucketSize = 8, bucketSizeBits = 3;
    +
    +   private int numBuckets;
    +   private int numBucketsMask;
    +   private final int numBucketsPerSegment, numBucketsPerSegmentBits, 
numBucketsPerSegmentMask;
    +
    +   /**
    +    * The segments where the actual data is stored.
    +    */
    +   private final RecordArea recordArea;
    +
    +   /**
    +    * Segments for the staging area.
    +    * (It should contain at most one record at all times.)
    +    */
    +   private final ArrayList<MemorySegment> stagingSegments;
    +   private final RandomAccessInputView stagingSegmentsInView;
    +   private final StagingOutputView stagingSegmentsOutView;
    +
    +   private T reuse;
    +
    +   /** This is the internal prober that insertOrReplaceRecord and 
processRecordWithReduce use. */
    +   private final HashTableProber<T> prober;
    +
    +   /** The number of elements currently held by the table. */
    +   private long numElements = 0;
    +
    +   /** The number of bytes wasted by updates that couldn't overwrite the 
old record due to size change. */
    +   private long holes = 0;
    +
    +   /**
    +    * If the serializer knows the size of the records, then we can 
calculate the optimal number of buckets
    +    * upfront, so we don't need resizes.
    +    */
    +   private boolean enableResize;
    +
    +
    +   /**
    +    * This constructor is for the case when will only call those 
operations that are also
    +    * present on CompactingHashTable.
    +    */
    +   public ReduceHashTable(TypeSerializer<T> serializer, TypeComparator<T> 
comparator, List<MemorySegment> memory) {
    +           this(serializer, comparator, memory, null, null, false);
    +   }
    +
    +   public ReduceHashTable(TypeSerializer<T> serializer, TypeComparator<T> 
comparator, List<MemorySegment> memory,
    --- End diff --
    
    9f82815b588b7d2e7cd239679e98520c45356bf9


> Add hash-based combine strategy for ReduceFunction
> --------------------------------------------------
>
>                 Key: FLINK-3477
>                 URL: https://issues.apache.org/jira/browse/FLINK-3477
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Local Runtime
>            Reporter: Fabian Hueske
>            Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to