[
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15301126#comment-15301126
]
ASF GitHub Bot commented on FLINK-3477:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/1517#discussion_r64671483
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ * - Bucket area: they contain bucket heads:
+ * an 8 byte pointer to the first link of a linked list in the record
area
+ * - Record area: this contains the actual data in linked list elements.
A linked list element starts
+ * with an 8 byte pointer to the next element, and then the record
follows.
+ * - Staging area: This is a small, temporary storage area for writing
updated records. This is needed,
+ * because before serializing a record, there is no way to know in
advance how large will it be.
+ * Therefore, we can't serialize directly into the record area when we
are doing an update, because
+ * if it turns out to be larger then the old record, then it would
override some other record
+ * that happens to be after the old one in memory. The solution is to
serialize to the staging area first,
+ * and then copy it to the place of the original if it has the same
size, otherwise allocate a new linked
+ * list element at the end of the record area, and mark the old one as
abandoned. This creates "holes" in
+ * the record area, so compactions are eventually needed.
+ *
+ * Compaction happens by deleting everything in the bucket area, and then
reinserting all elements.
+ * The reinsertion happens by forgetting the structure (the linked lists)
of the record area, and reading it
+ * sequentially, and inserting all non-abandoned records, starting from
the beginning of the record area.
+ * Note, that insertions never override a record that have not been read
by the reinsertion sweep, because
+ * both the insertions and readings happen sequentially in the record
area, and the insertions obviously
+ * never overtake the reading sweep.
+ *
+ * Note: we have to abandon the old linked list element even when the
updated record has a smaller size
+ * than the original, because otherwise we wouldn't know where the next
record starts during a reinsertion
+ * sweep.
+ *
+ * The number of buckets depends on how large are the records. The
serializer might be able to tell us this,
+ * so in this case, we will calculate the number of buckets upfront, and
won't do resizes.
+ * If the serializer doesn't know the size, then we start with a small
number of buckets, and do resizes as more
+ * elements are inserted than the number of buckets.
+ *
+ * The number of memory segments given to the staging area is usually
one, because it just needs to hold
+ * one record.
+ *
+ * Note: For hashing, we need to use MathUtils.hash because of its
avalanche property, so that
+ * changing only some high bits of the original value shouldn't leave the
lower bits of the hash unaffected.
+ * This is because when choosing the bucket for a record, we mask only the
+ * lower bits (see numBucketsMask). Lots of collisions would occur when,
for example,
+ * the original value that is hashed is some bitset, where lots of
different values
+ * that are different only in the higher bits will actually occur.
+ */
+
+public class ReduceHashTable<T> extends AbstractMutableHashTable<T> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ReduceHashTable.class);
+
+ /** The minimum number of memory segments ReduceHashTable needs to be
supplied with in order to work. */
+ private static final int MIN_NUM_MEMORY_SEGMENTS = 3;
+
+ // Note: the following two constants can't be negative, because
negative values are reserved for storing the
+ // negated size of the record, when it is abandoned (not part of any
linked list).
+
+ /** The last link in the linked lists will have this as next pointer. */
+ private static final long END_OF_LIST = Long.MAX_VALUE;
+
+ /** This value means that prevElemPtr is "pointing to the bucket head",
and not into the record segments. */
+ private static final long INVALID_PREV_POINTER = Long.MAX_VALUE - 1;
+
+
+ private static final long RECORD_OFFSET_IN_LINK = 8;
+
+
+ /** this is used by processRecordWithReduce */
+ private final ReduceFunction<T> reducer;
+
+ /** emit() sends data to outputCollector */
+ private final Collector<T> outputCollector;
+
+ private final boolean objectReuseEnabled;
+
+ /**
+ * This initially contains all the memory we have, and then segments
+ * are taken from it by bucketSegments, recordArea, and stagingSegments.
+ */
+ private final ArrayList<MemorySegment> freeMemorySegments;
+
+ private final int numAllMemorySegments;
+
+ private final int segmentSize;
+
+ /**
+ * These will contain the bucket heads.
+ * The bucket heads are pointers to the linked lists containing the
actual records.
+ */
+ private MemorySegment[] bucketSegments;
+
+ private static final int bucketSize = 8, bucketSizeBits = 3;
+
+ private int numBuckets;
+ private int numBucketsMask;
+ private final int numBucketsPerSegment, numBucketsPerSegmentBits,
numBucketsPerSegmentMask;
+
+ /**
+ * The segments where the actual data is stored.
+ */
+ private final RecordArea recordArea;
+
+ /**
+ * Segments for the staging area.
+ * (It should contain at most one record at all times.)
+ */
+ private final ArrayList<MemorySegment> stagingSegments;
+ private final RandomAccessInputView stagingSegmentsInView;
+ private final StagingOutputView stagingSegmentsOutView;
+
+ private T reuse;
+
+ /** This is the internal prober that insertOrReplaceRecord and
processRecordWithReduce use. */
+ private final HashTableProber<T> prober;
+
+ /** The number of elements currently held by the table. */
+ private long numElements = 0;
+
+ /** The number of bytes wasted by updates that couldn't overwrite the
old record due to size change. */
+ private long holes = 0;
+
+ /**
+ * If the serializer knows the size of the records, then we can
calculate the optimal number of buckets
+ * upfront, so we don't need resizes.
+ */
+ private boolean enableResize;
+
+
+ /**
+ * This constructor is for the case when will only call those
operations that are also
+ * present on CompactingHashTable.
+ */
+ public ReduceHashTable(TypeSerializer<T> serializer, TypeComparator<T>
comparator, List<MemorySegment> memory) {
+ this(serializer, comparator, memory, null, null, false);
+ }
+
+ public ReduceHashTable(TypeSerializer<T> serializer, TypeComparator<T>
comparator, List<MemorySegment> memory,
+ ReduceFunction<T> reducer,
Collector<T> outputCollector, boolean objectReuseEnabled) {
+ super(serializer, comparator);
+ this.reducer = reducer;
+ this.numAllMemorySegments = memory.size();
+ this.freeMemorySegments = new ArrayList<>(memory);
+ this.outputCollector = outputCollector;
+ this.objectReuseEnabled = objectReuseEnabled;
+
+ // some sanity checks first
+ if (freeMemorySegments.size() < MIN_NUM_MEMORY_SEGMENTS) {
+ throw new IllegalArgumentException("Too few memory
segments provided. ReduceHashTable needs at least " +
+ MIN_NUM_MEMORY_SEGMENTS + " memory segments.");
+ }
+
+ // Get the size of the first memory segment and record it. All
further buffers must have the same size.
+ // the size must also be a power of 2
+ segmentSize = freeMemorySegments.get(0).size();
+ if ( (segmentSize & segmentSize - 1) != 0) {
+ throw new IllegalArgumentException("Hash Table requires
buffers whose size is a power of 2.");
+ }
+
+ this.numBucketsPerSegment = segmentSize / bucketSize;
+ this.numBucketsPerSegmentBits =
MathUtils.log2strict(this.numBucketsPerSegment);
+ this.numBucketsPerSegmentMask = (1 <<
this.numBucketsPerSegmentBits) - 1;
+
+ recordArea = new RecordArea(segmentSize);
+
+ stagingSegments = new ArrayList<>();
+ stagingSegmentsInView = new
RandomAccessInputView(stagingSegments, segmentSize, false);
+ stagingSegmentsOutView = new StagingOutputView(stagingSegments,
segmentSize);
+
+ prober = new HashTableProber<>(buildSideComparator, new
SameTypePairComparator<>(buildSideComparator));
+
+ enableResize = buildSideSerializer.getLength() == -1;
+ }
+
+ /**
+ * Gets the total capacity of this hash table, in bytes.
+ *
+ * @return The hash table's total capacity.
+ */
+ public long getCapacity() {
+ return numAllMemorySegments * segmentSize;
+ }
+
+ /**
+ * Gets the number of bytes currently occupied in this hash table.
+ *
+ * @return The number of bytes occupied.
+ */
+ public long getOccupancy() {
+ return numAllMemorySegments * segmentSize -
freeMemorySegments.size() * segmentSize;
+ }
+
+ private void open(int numBucketSegments) {
+ synchronized (stateLock) {
+ if (!closed) {
+ throw new IllegalStateException("currently not
closed.");
+ }
+ closed = false;
+ }
+
+ allocateBucketSegments(numBucketSegments);
+
+ stagingSegments.add(forcedAllocateSegment());
+
+ reuse = buildSideSerializer.createInstance();
+ }
+
+ /**
+ * Initialize the hash table
+ */
+ @Override
+ public void open() {
+ open(calcInitialNumBucketSegments());
+ }
+
+ @Override
+ public void close() {
+ // make sure that we close only once
+ synchronized (stateLock) {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ }
+
+ LOG.debug("Closing ReduceHashTable and releasing resources.");
+
+ releaseBucketSegments();
+
+ recordArea.giveBackSegments();
+
+ freeMemorySegments.addAll(stagingSegments);
+ stagingSegments.clear();
+
+ numElements = 0;
+ holes = 0;
+ }
+
+ @Override
+ public void abort() {
+ LOG.debug("Aborting ReduceHashTable.");
+ close();
+ }
+
+ @Override
+ public List<MemorySegment> getFreeMemory() {
+ if (!this.closed) {
+ throw new IllegalStateException("Cannot return memory
while ReduceHashTable is open.");
+ }
+
+ return freeMemorySegments;
+ }
+
+ private int calcInitialNumBucketSegments() {
+ int recordLength = buildSideSerializer.getLength();
+ double fraction;
+ if (recordLength == -1) {
+ // It seems that resizing is quite efficient, so we can
err here on the too few bucket segments side.
+ // Even with small records, we lose only ~15% speed.
+ fraction = 0.1;
+ } else {
+ fraction = 8.0 / (16 + recordLength);
+ // note: enableResize is false in this case, so no
resizing will happen
+ }
+
+ int ret = Math.max(1,
MathUtils.roundDownToPowerOf2((int)(numAllMemorySegments * fraction)));
+
+ // We can't handle more than Integer.MAX_VALUE buckets (eg.
because hash functions return int)
+ if ((long)ret * numBucketsPerSegment > Integer.MAX_VALUE) {
+ ret = MathUtils.roundDownToPowerOf2(Integer.MAX_VALUE /
numBucketsPerSegment);
+ }
+ return ret;
+ }
+
+ private void allocateBucketSegments(int numBucketSegments) {
+ if (numBucketSegments < 1) {
+ throw new RuntimeException("Bug in ReduceHashTable");
+ }
+
+ bucketSegments = new MemorySegment[numBucketSegments];
+ for(int i = 0; i < bucketSegments.length; i++) {
+ bucketSegments[i] = forcedAllocateSegment();
+ // Init all pointers in all buckets to END_OF_LIST
+ for(int j = 0; j < numBucketsPerSegment; j++) {
+ bucketSegments[i].putLong(j << bucketSizeBits,
END_OF_LIST);
+ }
+ }
+ numBuckets = numBucketSegments * numBucketsPerSegment;
+ numBucketsMask = (1 << MathUtils.log2strict(numBuckets)) - 1;
+ }
+
+ private void releaseBucketSegments() {
+ freeMemorySegments.addAll(Arrays.asList(bucketSegments));
+ bucketSegments = null;
+ }
+
+ private MemorySegment allocateSegment() {
+ int s = freeMemorySegments.size();
+ if (s > 0) {
+ return freeMemorySegments.remove(s - 1);
+ } else {
+ return null;
+ }
+ }
+
+ private MemorySegment forcedAllocateSegment() {
+ MemorySegment segment = allocateSegment();
+ if (segment == null) {
+ throw new RuntimeException("Bug in ReduceHashTable: A
free segment should have been available.");
+ }
+ return segment;
+ }
+
+ /**
+ * Searches the hash table for the record with matching key, and
updates it (making one reduce step) if found,
+ * otherwise inserts a new entry.
+ *
+ * (If there are multiple entries with the same key, then it will
update one of them.)
+ *
+ * @param record The record to be processed.
+ */
+ public void processRecordWithReduce(T record) throws Exception {
+ if (closed) {
+ return;
+ }
+
+ T match = prober.getMatchFor(record, reuse);
+ if (match == null) {
+ prober.insertAfterNoMatch(record);
+ } else {
+ // do the reduce step
+ T res = reducer.reduce(match, record);
+
+ // We have given reuse to the reducer UDF, so create
new one if object reuse is disabled
+ if (!objectReuseEnabled) {
+ reuse = buildSideSerializer.createInstance();
+ }
+
+ prober.updateMatch(res);
+ }
+ }
+
+ /**
+ * Searches the hash table for a record with the given key.
+ * If it is found, then it is overridden with the specified record.
+ * Otherwise, the specified record is inserted.
+ * @param record The record to insert or to replace with.
+ * @throws IOException (EOFException specifically, if memory ran out)
+ */
+ @Override
+ public void insertOrReplaceRecord(T record) throws IOException {
+ if (closed) {
+ return;
+ }
+
+ T match = prober.getMatchFor(record, reuse);
+ if (match == null) {
+ prober.insertAfterNoMatch(record);
+ } else {
+ prober.updateMatch(record);
+ }
+ }
+
+ /**
+ * Inserts the given record into the hash table.
+ * Note: this method doesn't care about whether a record with the same
key is already present.
+ * @param record The record to insert.
+ * @throws IOException (EOFException specifically, if memory ran out)
+ */
+ @Override
+ public void insert(T record) throws IOException {
+ if (closed) {
+ return;
+ }
+
+ final int hashCode =
MathUtils.jenkinsHash(buildSideComparator.hash(record));
+ final int bucket = hashCode & numBucketsMask;
+ final int bucketSegmentIndex = bucket >>>
numBucketsPerSegmentBits; // which segment contains the bucket
+ final MemorySegment bucketSegment =
bucketSegments[bucketSegmentIndex];
+ final int bucketOffset = (bucket & numBucketsPerSegmentMask) <<
bucketSizeBits; // offset of the bucket in the segment
+ final long firstPointer = bucketSegment.getLong(bucketOffset);
+
+ try {
+ final long newFirstPointer =
recordArea.appendPointerAndRecord(firstPointer, record);
+ bucketSegment.putLong(bucketOffset, newFirstPointer);
+ } catch (EOFException ex) {
+ compactOrThrow();
+ insert(record);
+ return;
+ }
+
+ numElements++;
+ resizeTableIfNecessary();
+ }
+
+ private void resizeTableIfNecessary() throws IOException {
+ if (enableResize && numElements > numBuckets) {
+ final long newNumBucketSegments = 2L *
bucketSegments.length;
+ // Checks:
+ // - we can't handle more than Integer.MAX_VALUE buckets
+ // - don't take more memory than the free memory we
have left
+ // - the buckets shouldn't occupy more than half of all
our memory
+ if (newNumBucketSegments * numBucketsPerSegment <
Integer.MAX_VALUE &&
+ newNumBucketSegments - bucketSegments.length <
freeMemorySegments.size() &&
+ newNumBucketSegments < numAllMemorySegments /
2) {
+ // do the resize
+ rebuild(newNumBucketSegments);
+ }
+ }
+ }
+
+ /**
+ * WARNING: Doing any other operation on the table invalidates the
iterator! (Even
+ * using getMatchFor of a prober!)
+ */
+ public final class EntryIterator implements MutableObjectIterator<T> {
+
+ private final long endPosition;
+
+ public EntryIterator() {
+ endPosition = recordArea.getAppendPosition();
+ if (endPosition == 0) {
+ return;
+ }
+ recordArea.setReadPosition(0);
+ }
+
+ @Override
+ public T next(T reuse) throws IOException {
+ if (endPosition != 0 && recordArea.getReadPosition() <
endPosition) {
+ // Loop until we find a non-abandoned record.
+ // Note: the last record in the record area
can't be abandoned.
+ while (!closed) {
+ final long pointerOrNegatedLength =
recordArea.readLong();
+ final boolean isAbandoned =
pointerOrNegatedLength < 0;
+ if (!isAbandoned) {
+ reuse =
recordArea.readRecord(reuse);
+ return reuse;
+ } else {
+ // pointerOrNegatedLength is
storing a length, because the record was abandoned.
+
recordArea.skipBytesToRead((int)-(pointerOrNegatedLength + 1));
+ }
+ }
+ return null; // (we were closed)
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public T next() throws IOException {
+ return next(buildSideSerializer.createInstance());
+ }
+ }
+
+ /**
+ * Returns an iterator that can be used to iterate over all the
elements in the table.
+ * WARNING: Doing any other operation on the table invalidates the
iterator! (Even
+ * using getMatchFor of a prober!)
+ * @return the iterator
+ */
+ @Override
+ public EntryIterator getEntryIterator() {
+ return new EntryIterator();
+ }
+
+ /**
+ * This function reinitializes the bucket segments,
+ * reads all records from the record segments (sequentially, without
using the pointers or the buckets),
+ * and rebuilds the hash table.
+ */
+ private void rebuild() throws IOException {
+ rebuild(bucketSegments.length);
+ }
+
+ /** Same as above, but the number of bucket segments of the new table
can be specified. */
+ private void rebuild(long newNumBucketSegments) throws IOException {
+ // Get new bucket segments
+ releaseBucketSegments();
+ allocateBucketSegments((int)newNumBucketSegments);
+
+ T record = buildSideSerializer.createInstance();
+ try {
+ EntryIterator iter = getEntryIterator();
+ recordArea.resetAppendPosition();
+ recordArea.setWritePosition(0);
+ while ((record = iter.next(record)) != null && !closed)
{
+ final int hashCode =
MathUtils.jenkinsHash(buildSideComparator.hash(record));
+ final int bucket = hashCode & numBucketsMask;
+ final int bucketSegmentIndex = bucket >>>
numBucketsPerSegmentBits; // which segment contains the bucket
+ final MemorySegment bucketSegment =
bucketSegments[bucketSegmentIndex];
+ final int bucketOffset = (bucket &
numBucketsPerSegmentMask) << bucketSizeBits; // offset of the bucket in the
segment
+ final long firstPointer =
bucketSegment.getLong(bucketOffset);
+
+ long ptrToAppended =
recordArea.noSeekAppendPointerAndRecord(firstPointer, record);
+ bucketSegment.putLong(bucketOffset,
ptrToAppended);
+ }
+ recordArea.freeSegmentsAfterAppendPosition();
+ holes = 0;
+
+ } catch (EOFException ex) {
+ throw new RuntimeException("Bug in ReduceHashTable: we
shouldn't get out of memory during a rebuild, " +
+ "because we aren't allocating any new memory.");
+ }
+ }
+
+ /**
+ * Emits all elements currently held by the table to the collector,
+ * and resets the table. The table will have the same number of buckets
+ * as before the reset, to avoid doing resizes again.
+ */
+ public void emitAndReset() throws IOException {
+ final int oldNumBucketSegments = bucketSegments.length;
+ emit();
+ close();
+ open(oldNumBucketSegments);
+ }
+
+ /**
+ * Emits all elements currently held by the table to the collector.
+ */
+ public void emit() throws IOException {
+ T record = buildSideSerializer.createInstance();
+ EntryIterator iter = getEntryIterator();
+ while ((record = iter.next(record)) != null && !closed) {
+ outputCollector.collect(record);
+ if (!objectReuseEnabled) {
+ record = buildSideSerializer.createInstance();
+ }
+ }
+ }
+
+ /**
+ * If there is wasted space (due to updated records not fitting in
their old places), then do a compaction.
+ * Else, throw EOFException to indicate that memory ran out.
+ * @throws IOException
+ */
+ private void compactOrThrow() throws IOException {
+ if (holes > (double)recordArea.getTotalSize() * 0.05) {
+ rebuild();
+ } else {
+ throw new EOFException("ReduceHashTable memory ran out.
" + getMemoryConsumptionString());
+ }
+ }
+
+ /**
+ * @return String containing a summary of the memory consumption for
error messages
+ */
+ private String getMemoryConsumptionString() {
+ return "ReduceHashTable memory stats:\n" +
+ "Total memory: " + numAllMemorySegments *
segmentSize + "\n" +
+ "Free memory: " + freeMemorySegments.size() *
segmentSize + "\n" +
+ "Bucket area: " + numBuckets * 8 + "\n" +
+ "Record area: " + recordArea.getTotalSize() + "\n"
+
+ "Staging area: " + stagingSegments.size() *
segmentSize + "\n" +
+ "Num of elements: " + numElements + "\n" +
+ "Holes total size: " + holes;
+ }
+
+ /**
+ * This class encapsulates the memory segments that belong to the
record area. It
+ * - can append a record
+ * - can overwrite a record at an arbitrary position (WARNING: the new
record must have the same size
+ * as the old one)
+ * - can be rewritten by calling resetAppendPosition
+ * - takes memory from ReduceHashTable.freeMemorySegments on append
+ */
+ private final class RecordArea
+ {
+ private final ArrayList<MemorySegment> segments = new
ArrayList<>();
+
+ private final OutputView outView;
+ private final RandomAccessInputView inView;
+
+ private final int segmentSizeBits;
+ private final int segmentSizeMask;
+
+ private long appendPosition = 0;
+
+
+ private final class OutputView extends AbstractPagedOutputView {
+
+ public int currentSegmentIndex;
+
+ public OutputView(int segmentSize) {
+ super(segmentSize, 0);
+ }
+
+ @Override
+ protected MemorySegment nextSegment(MemorySegment
current, int positionInCurrent) throws EOFException {
+ currentSegmentIndex++;
+ if (currentSegmentIndex == segments.size()) {
+ addSegment();
+ }
+ return segments.get(currentSegmentIndex);
+ }
+
+ @Override
+ public void seekOutput(MemorySegment seg, int position)
{
+ super.seekOutput(seg, position);
+ }
+ }
+
+
+ public RecordArea(int segmentSize) {
--- End diff --
Only this constructor is used. I think it can be merged with the other one.
> Add hash-based combine strategy for ReduceFunction
> --------------------------------------------------
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
> Issue Type: Sub-task
> Components: Local Runtime
> Reporter: Fabian Hueske
> Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a
> single value. A Reduce function is incrementally applied to compute a final
> aggregated value. This allows to hold the preaggregated value in a hash-table
> and update it with each function call.
> The hash-based strategy requires special implementation of an in-memory hash
> table. The hash table should support in place updates of elements (if the
> updated value has the same size as the new value) but also appending updates
> with invalidation of the old value (if the binary length of the new value
> differs). The hash table needs to be able to evict and emit all elements if
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the
> execution strategy.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)