This is an automated email from the ASF dual-hosted git repository. cwylie pushed a commit to branch 0.14.1-incubating in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
commit 76b7998a130f12f28b9918af03b7b236706afabe Author: Jonathan Wei <[email protected]> AuthorDate: Fri Apr 12 19:04:07 2019 -0700 Adjust BufferAggregator.get() impls to return copies (#7464) * Adjust BufferAggregator.get() impls to return copies * Update BufferAggregator docs, more agg fixes * Update BufferAggregator get() doc --- .../datasketches/hll/HllSketchBuildBufferAggregator.java | 2 +- .../quantiles/DoublesSketchBuildBufferAggregator.java | 2 +- .../aggregation/bloom/BaseBloomFilterBufferAggregator.java | 6 +++++- .../org/apache/druid/query/aggregation/BufferAggregator.java | 11 ++++++++++- 4 files changed, 17 insertions(+), 4 deletions(-) diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java index 0ec525e..ce15821 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java @@ -108,7 +108,7 @@ public class HllSketchBuildBufferAggregator implements BufferAggregator final Lock lock = stripedLock.getAt(lockIndex(position)).readLock(); lock.lock(); try { - return sketchCache.get(buf).get(position); + return sketchCache.get(buf).get(position).copy(); } finally { lock.unlock(); diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregator.java index ead9a6a..609a46e 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregator.java @@ -69,7 +69,7 @@ public class DoublesSketchBuildBufferAggregator implements BufferAggregator @Override public synchronized Object get(final ByteBuffer buffer, final int position) { - return sketches.get(buffer).get(position); + return sketches.get(buffer).get(position).compact(); } @Override diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterBufferAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterBufferAggregator.java index 74def15..ff866f9 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterBufferAggregator.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterBufferAggregator.java @@ -66,7 +66,11 @@ public abstract class BaseBloomFilterBufferAggregator<TSelector extends BaseNull // | k (byte) | numLongs (int) | bitset (long[numLongs]) | int sizeBytes = 1 + Integer.BYTES + (buf.getInt(position + 1) * Long.BYTES); mutationBuffer.limit(position + sizeBytes); - return mutationBuffer.slice(); + + ByteBuffer resultCopy = ByteBuffer.allocate(sizeBytes); + resultCopy.put(mutationBuffer.slice()); + resultCopy.rewind(); + return resultCopy; } @Override diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/BufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/BufferAggregator.java index ecd0c11..ed77c91 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/BufferAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/BufferAggregator.java @@ -72,7 +72,16 @@ public interface BufferAggregator extends HotLoopCallee * * Converts the given byte buffer representation into an intermediate aggregate Object * - * <b>Implementations must not change the position, limit or mark of the given buffer</b> + * <b>Implementations must not change the position, limit or mark of the given buffer.</b> + * + * <b> + * The object returned must not have any references to the given buffer (i.e., make a copy), since the + * underlying buffer is a shared resource and may be given to another processing thread + * while the objects returned by this aggregator are still in use. + * </b> + * + * <b>If the corresponding {@link AggregatorFactory#combine(Object, Object)} method for this aggregator + * expects its inputs to be mutable, then the object returned by this method must be mutable.</b> * * @param buf byte buffer storing the byte array representation of the aggregate * @param position offset within the byte buffer at which the aggregate value is stored --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
