This is an automated email from the ASF dual-hosted git repository. cwylie pushed a commit to branch 0.14.1-incubating in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
commit 357d1862a4528a2f84a4d146f7a751a859a84f7c Author: Clint Wylie <[email protected]> AuthorDate: Thu Apr 18 11:54:06 2019 -0700 refactor druid-bloom-filter aggregators (#7496) * now with 100% more buffer * there can be only 1 * simplify * javadoc * clean up unused test method * fix exception message * style * why does style hate javadocs * review stuff * style :( --- .../bloom/BaseBloomFilterAggregator.java | 113 ++++++++++++++++++++- .../bloom/BaseBloomFilterBufferAggregator.java | 105 ------------------- .../bloom/BloomFilterAggregateCombiner.java | 72 ------------- .../bloom/BloomFilterAggregatorFactory.java | 82 +++++++++------ .../bloom/BloomFilterMergeAggregator.java | 32 ++---- .../bloom/BloomFilterMergeAggregatorFactory.java | 25 +++-- .../bloom/BloomFilterMergeBufferAggregator.java | 40 -------- .../bloom/DoubleBloomFilterAggregator.java | 12 ++- .../bloom/DoubleBloomFilterBufferAggregator.java | 44 -------- .../bloom/EmptyBloomFilterAggregator.java | 37 ------- .../bloom/FloatBloomFilterAggregator.java | 12 ++- .../bloom/FloatBloomFilterBufferAggregator.java | 44 -------- .../bloom/LongBloomFilterAggregator.java | 12 ++- .../bloom/LongBloomFilterBufferAggregator.java | 44 -------- ...regator.java => NoopBloomFilterAggregator.java} | 12 ++- .../bloom/StringBloomFilterAggregator.java | 18 ++-- .../bloom/StringBloomFilterBufferAggregator.java | 56 ---------- .../bloom/BloomFilterAggregatorTest.java | 104 +++++++++++-------- .../bloom/BloomFilterGroupByQueryTest.java | 10 +- .../bloom/sql/BloomFilterSqlAggregatorTest.java | 1 - .../druid/sql/calcite/util/CalciteTests.java | 2 - 21 files changed, 287 insertions(+), 590 deletions(-) diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java index 652236b..48ba083 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java @@ -20,20 +20,119 @@ package org.apache.druid.query.aggregation.bloom; import org.apache.druid.query.aggregation.Aggregator; +import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.filter.BloomKFilter; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.BaseNullableColumnValueSelector; import javax.annotation.Nullable; +import java.nio.ByteBuffer; -public abstract class BaseBloomFilterAggregator<TSelector extends BaseNullableColumnValueSelector> implements Aggregator +/** + * All bloom filter aggregations are done using {@link ByteBuffer}, so this base class implements both {@link Aggregator} + * and {@link BufferAggregator}. + * + * If used as an {@link Aggregator} the caller MUST specify the 'onHeap' parameter in the + * constructor as "true", or else the "collector" will not be allocated and null pointer exceptions will make things sad. + * + * If used as a {@link BufferAggregator}, the "collector" buffer is not necessary, and should be called with "false", + * but at least nothing dramatic will happen like incorrect use in the {@link Aggregator} case. + * + * {@link BloomFilterAggregatorFactory} and {@link BloomFilterMergeAggregatorFactory}, which should be the creators of + * all implementations of {@link BaseBloomFilterAggregator} outside of tests, should be sure to set the 'onHeap' value + * to "true" and "false" respectively for + * {@link org.apache.druid.query.aggregation.AggregatorFactory#factorize} and + * {@link org.apache.druid.query.aggregation.AggregatorFactory#factorizeBuffered} + * + * @param <TSelector> type of {@link BaseNullableColumnValueSelector} that feeds this aggregator, likely either values + * to add to a bloom filter, or other bloom filters to merge into this bloom filter. + */ +public abstract class BaseBloomFilterAggregator<TSelector extends BaseNullableColumnValueSelector> + implements BufferAggregator, Aggregator { - final BloomKFilter collector; + @Nullable + private final ByteBuffer collector; + protected final int maxNumEntries; protected final TSelector selector; - BaseBloomFilterAggregator(TSelector selector, BloomKFilter collector) + /** + * @param selector selector that feeds values to the aggregator + * @param maxNumEntries maximum number of entries that can be added to a bloom filter before accuracy degrades rapidly + * @param onHeap allocate a ByteBuffer "collector" to use as an {@link Aggregator} + */ + BaseBloomFilterAggregator(TSelector selector, int maxNumEntries, boolean onHeap) { - this.collector = collector; this.selector = selector; + this.maxNumEntries = maxNumEntries; + if (onHeap) { + BloomKFilter bloomFilter = new BloomKFilter(maxNumEntries); + this.collector = ByteBuffer.allocate(BloomKFilter.computeSizeBytes(maxNumEntries)); + BloomKFilter.serialize(collector, bloomFilter); + } else { + collector = null; + } + } + + abstract void bufferAdd(ByteBuffer buf); + + @Override + public void init(ByteBuffer buf, int position) + { + final ByteBuffer mutationBuffer = buf.duplicate(); + mutationBuffer.position(position); + BloomKFilter filter = new BloomKFilter(maxNumEntries); + BloomKFilter.serialize(mutationBuffer, filter); + } + + @Override + public void aggregate(ByteBuffer buf, int position) + { + final int oldPosition = buf.position(); + try { + buf.position(position); + bufferAdd(buf); + } + finally { + buf.position(oldPosition); + } + } + + @Override + public Object get(ByteBuffer buf, int position) + { + ByteBuffer mutationBuffer = buf.duplicate(); + mutationBuffer.position(position); + int sizeBytes = BloomKFilter.computeSizeBytes(maxNumEntries); + mutationBuffer.limit(position + sizeBytes); + + ByteBuffer resultCopy = ByteBuffer.allocate(sizeBytes); + resultCopy.put(mutationBuffer.slice()); + resultCopy.rewind(); + return resultCopy; + } + + @Override + public float getFloat(ByteBuffer buf, int position) + { + throw new UnsupportedOperationException("BloomFilterAggregator does not support getFloat()"); + } + + @Override + public long getLong(ByteBuffer buf, int position) + { + throw new UnsupportedOperationException("BloomFilterAggregator does not support getLong()"); + } + + @Override + public double getDouble(ByteBuffer buf, int position) + { + throw new UnsupportedOperationException("BloomFilterAggregator does not support getDouble()"); + } + + @Override + public void aggregate() + { + aggregate(collector, 0); } @Nullable @@ -66,4 +165,10 @@ public abstract class BaseBloomFilterAggregator<TSelector extends BaseNullableCo { // nothing to close } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("selector", selector); + } } diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterBufferAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterBufferAggregator.java deleted file mode 100644 index ff866f9..0000000 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterBufferAggregator.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.query.aggregation.bloom; - -import org.apache.druid.query.aggregation.BufferAggregator; -import org.apache.druid.query.filter.BloomKFilter; -import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; -import org.apache.druid.segment.BaseNullableColumnValueSelector; - -import java.nio.ByteBuffer; - -public abstract class BaseBloomFilterBufferAggregator<TSelector extends BaseNullableColumnValueSelector> implements BufferAggregator -{ - protected final int maxNumEntries; - protected final TSelector selector; - - BaseBloomFilterBufferAggregator(TSelector selector, int maxNumEntries) - { - this.selector = selector; - this.maxNumEntries = maxNumEntries; - } - - abstract void bufferAdd(ByteBuffer buf); - - @Override - public void init(ByteBuffer buf, int position) - { - final ByteBuffer mutationBuffer = buf.duplicate(); - mutationBuffer.position(position); - BloomKFilter filter = new BloomKFilter(maxNumEntries); - BloomKFilter.serialize(mutationBuffer, filter); - } - - @Override - public void aggregate(ByteBuffer buf, int position) - { - final int oldPosition = buf.position(); - buf.position(position); - bufferAdd(buf); - buf.position(oldPosition); - } - - - @Override - public Object get(ByteBuffer buf, int position) - { - ByteBuffer mutationBuffer = buf.duplicate(); - mutationBuffer.position(position); - // | k (byte) | numLongs (int) | bitset (long[numLongs]) | - int sizeBytes = 1 + Integer.BYTES + (buf.getInt(position + 1) * Long.BYTES); - mutationBuffer.limit(position + sizeBytes); - - ByteBuffer resultCopy = ByteBuffer.allocate(sizeBytes); - resultCopy.put(mutationBuffer.slice()); - resultCopy.rewind(); - return resultCopy; - } - - @Override - public float getFloat(ByteBuffer buf, int position) - { - throw new UnsupportedOperationException("BloomFilterBufferAggregator does not support getFloat()"); - } - - @Override - public long getLong(ByteBuffer buf, int position) - { - throw new UnsupportedOperationException("BloomFilterBufferAggregator does not support getLong()"); - } - - @Override - public double getDouble(ByteBuffer buf, int position) - { - throw new UnsupportedOperationException("BloomFilterBufferAggregator does not support getDouble()"); - } - - @Override - public void close() - { - // nothing to close - } - - @Override - public void inspectRuntimeShape(RuntimeShapeInspector inspector) - { - inspector.visit("selector", selector); - } -} diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregateCombiner.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregateCombiner.java deleted file mode 100644 index 6fc4bf9..0000000 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregateCombiner.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.query.aggregation.bloom; - -import org.apache.druid.query.aggregation.ObjectAggregateCombiner; -import org.apache.druid.query.filter.BloomKFilter; -import org.apache.druid.segment.ColumnValueSelector; - -import javax.annotation.Nullable; - -public class BloomFilterAggregateCombiner extends ObjectAggregateCombiner<BloomKFilter> -{ - @Nullable - private BloomKFilter combined; - - private final int maxNumEntries; - - public BloomFilterAggregateCombiner(int maxNumEntries) - { - this.maxNumEntries = maxNumEntries; - } - - @Override - public void reset(ColumnValueSelector selector) - { - combined = null; - fold(selector); - } - - @Override - public void fold(ColumnValueSelector selector) - { - BloomKFilter other = (BloomKFilter) selector.getObject(); - if (other == null) { - return; - } - if (combined == null) { - combined = new BloomKFilter(maxNumEntries); - } - combined.merge(other); - } - - @Nullable - @Override - public BloomKFilter getObject() - { - return combined; - } - - @Override - public Class<? extends BloomKFilter> classOfObject() - { - return BloomKFilter.class; - } -} diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java index af60135..74e921c 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java @@ -40,7 +40,6 @@ import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ValueType; import javax.annotation.Nullable; -import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collections; import java.util.Comparator; @@ -59,10 +58,6 @@ public class BloomFilterAggregatorFactory extends AggregatorFactory BloomKFilter.getNumSetBits(buf1, buf1.position()), BloomKFilter.getNumSetBits(buf2, buf2.position()) ); - } else if (o1 instanceof BloomKFilter && o2 instanceof BloomKFilter) { - BloomKFilter o1f = (BloomKFilter) o1; - BloomKFilter o2f = (BloomKFilter) o2; - return Integer.compare(o1f.getNumSetBits(), o2f.getNumSetBits()); } else { throw new RE("Unable to compare unexpected types [%s]", o1.getClass().getName()); } @@ -87,14 +82,13 @@ public class BloomFilterAggregatorFactory extends AggregatorFactory @Override public Aggregator factorize(ColumnSelectorFactory columnFactory) { - BloomKFilter filter = new BloomKFilter(maxNumEntries); ColumnCapabilities capabilities = columnFactory.getColumnCapabilities(field.getDimension()); if (capabilities == null) { BaseNullableColumnValueSelector selector = columnFactory.makeColumnValueSelector(field.getDimension()); if (selector instanceof NilColumnValueSelector) { // BloomKFilter must be the same size so we cannot use a constant for the empty agg - return new EmptyBloomFilterAggregator(filter); + return new NoopBloomFilterAggregator(maxNumEntries, true); } throw new IAE( "Cannot create bloom filter buffer aggregator for column selector type [%s]", @@ -104,13 +98,29 @@ public class BloomFilterAggregatorFactory extends AggregatorFactory ValueType type = capabilities.getType(); switch (type) { case STRING: - return new StringBloomFilterAggregator(columnFactory.makeDimensionSelector(field), filter); + return new StringBloomFilterAggregator( + columnFactory.makeDimensionSelector(field), + maxNumEntries, + true + ); case LONG: - return new LongBloomFilterAggregator(columnFactory.makeColumnValueSelector(field.getDimension()), filter); + return new LongBloomFilterAggregator( + columnFactory.makeColumnValueSelector(field.getDimension()), + maxNumEntries, + true + ); case FLOAT: - return new FloatBloomFilterAggregator(columnFactory.makeColumnValueSelector(field.getDimension()), filter); + return new FloatBloomFilterAggregator( + columnFactory.makeColumnValueSelector(field.getDimension()), + maxNumEntries, + true + ); case DOUBLE: - return new DoubleBloomFilterAggregator(columnFactory.makeColumnValueSelector(field.getDimension()), filter); + return new DoubleBloomFilterAggregator( + columnFactory.makeColumnValueSelector(field.getDimension()), + maxNumEntries, + true + ); default: throw new IAE("Cannot create bloom filter aggregator for invalid column type [%s]", type); } @@ -124,7 +134,7 @@ public class BloomFilterAggregatorFactory extends AggregatorFactory if (capabilities == null) { BaseNullableColumnValueSelector selector = columnFactory.makeColumnValueSelector(field.getDimension()); if (selector instanceof NilColumnValueSelector) { - return new EmptyBloomFilterBufferAggregator(maxNumEntries); + return new NoopBloomFilterAggregator(maxNumEntries, false); } throw new IAE( "Cannot create bloom filter buffer aggregator for column selector type [%s]", @@ -135,18 +145,28 @@ public class BloomFilterAggregatorFactory extends AggregatorFactory ValueType type = capabilities.getType(); switch (type) { case STRING: - return new StringBloomFilterBufferAggregator(columnFactory.makeDimensionSelector(field), maxNumEntries); + return new StringBloomFilterAggregator( + columnFactory.makeDimensionSelector(field), + maxNumEntries, + false + ); case LONG: - return new LongBloomFilterBufferAggregator( - columnFactory.makeColumnValueSelector(field.getDimension()), maxNumEntries + return new LongBloomFilterAggregator( + columnFactory.makeColumnValueSelector(field.getDimension()), + maxNumEntries, + false ); case FLOAT: - return new FloatBloomFilterBufferAggregator( - columnFactory.makeColumnValueSelector(field.getDimension()), maxNumEntries + return new FloatBloomFilterAggregator( + columnFactory.makeColumnValueSelector(field.getDimension()), + maxNumEntries, + false ); case DOUBLE: - return new DoubleBloomFilterBufferAggregator( - columnFactory.makeColumnValueSelector(field.getDimension()), maxNumEntries + return new DoubleBloomFilterAggregator( + columnFactory.makeColumnValueSelector(field.getDimension()), + maxNumEntries, + false ); default: throw new IAE("Cannot create bloom filter buffer aggregator for invalid column type [%s]", type); @@ -168,14 +188,19 @@ public class BloomFilterAggregatorFactory extends AggregatorFactory if (lhs == null) { return rhs; } - ((BloomKFilter) lhs).merge((BloomKFilter) rhs); + BloomKFilter.mergeBloomFilterByteBuffers( + (ByteBuffer) lhs, + ((ByteBuffer) lhs).position(), + (ByteBuffer) rhs, + ((ByteBuffer) rhs).position() + ); return lhs; } @Override public AggregateCombiner makeAggregateCombiner() { - return new BloomFilterAggregateCombiner(maxNumEntries); + throw new UnsupportedOperationException("Bloom filter aggregators are query-time only"); } @Override @@ -195,6 +220,8 @@ public class BloomFilterAggregatorFactory extends AggregatorFactory { if (object instanceof String) { return ByteBuffer.wrap(StringUtils.decodeBase64String((String) object)); + } else if (object instanceof byte[]) { + return ByteBuffer.wrap((byte[]) object); } else { return object; } @@ -203,18 +230,7 @@ public class BloomFilterAggregatorFactory extends AggregatorFactory @Override public Object finalizeComputation(Object object) { - try { - if (object instanceof ByteBuffer) { - return BloomKFilter.deserialize((ByteBuffer) object); - } else if (object instanceof byte[]) { - return BloomKFilter.deserialize(ByteBuffer.wrap((byte[]) object)); - } else { - return object; - } - } - catch (IOException ioe) { - throw new RuntimeException("Failed to deserialize BloomKFilter", ioe); - } + return object; } @JsonProperty diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregator.java index 67d7a70..011f2f6 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregator.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregator.java @@ -19,39 +19,27 @@ package org.apache.druid.query.aggregation.bloom; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.query.filter.BloomKFilter; import org.apache.druid.segment.ColumnValueSelector; -import java.io.IOException; import java.nio.ByteBuffer; -public final class BloomFilterMergeAggregator extends BaseBloomFilterAggregator<ColumnValueSelector<Object>> +public final class BloomFilterMergeAggregator extends BaseBloomFilterAggregator<ColumnValueSelector<ByteBuffer>> { - public BloomFilterMergeAggregator(ColumnValueSelector<Object> selector, BloomKFilter collector) + public BloomFilterMergeAggregator(ColumnValueSelector<ByteBuffer> selector, int maxNumEntries, boolean onHeap) { - super(selector, collector); + super(selector, maxNumEntries, onHeap); } @Override - public void aggregate() + public void bufferAdd(ByteBuffer buf) { - Object other = selector.getObject(); - if (other != null) { - if (other instanceof BloomKFilter) { - collector.merge((BloomKFilter) other); - } else if (other instanceof ByteBuffer) { - // fun fact: because bloom filter agg factory deserialize returns a byte buffer to avoid unnecessary serde, - // but GroupByQueryEngine (group by v1) ends up trying to merge ByteBuffers from buffer aggs with this agg - // instead of the BloomFilterBufferMergeAggregator. fun! Also, it requires a 'ComplexMetricSerde' to be - // registered even for query time only aggs, but then never uses it. also fun! - try { - BloomKFilter otherFilter = BloomKFilter.deserialize((ByteBuffer) other); - collector.merge(otherFilter); - } - catch (IOException ioe) { - throw new RuntimeException("Failed to deserialize BloomKFilter", ioe); - } - } + ByteBuffer other = selector.getObject(); + if (other == null) { + // nulls should be empty bloom filters by this point, so encountering a nil column in merge agg is unexpected + throw new ISE("WTF?! Unexpected null value in BloomFilterMergeAggregator"); } + BloomKFilter.mergeBloomFilterByteBuffers(buf, buf.position(), other, other.position()); } } diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregatorFactory.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregatorFactory.java index 8dab867..ed5ce29 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregatorFactory.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregatorFactory.java @@ -25,7 +25,6 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.cache.CacheKeyBuilder; -import org.apache.druid.query.filter.BloomKFilter; import org.apache.druid.segment.BaseNullableColumnValueSelector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; @@ -48,23 +47,13 @@ public class BloomFilterMergeAggregatorFactory extends BloomFilterAggregatorFact @Override public Aggregator factorize(final ColumnSelectorFactory metricFactory) { - final BaseNullableColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName); - // null columns should be empty bloom filters by this point, so encountering a nil column in merge agg is unexpected - if (selector instanceof NilColumnValueSelector) { - throw new ISE("WTF?! Unexpected NilColumnValueSelector"); - } - return new BloomFilterMergeAggregator((ColumnValueSelector<Object>) selector, new BloomKFilter(getMaxNumEntries())); + return makeMergeAggregator(metricFactory); } @Override public BufferAggregator factorizeBuffered(final ColumnSelectorFactory metricFactory) { - final BaseNullableColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName); - // null columns should be empty bloom filters by this point, so encountering a nil column in merge agg is unexpected - if (selector instanceof NilColumnValueSelector) { - throw new ISE("WTF?! Unexpected NilColumnValueSelector"); - } - return new BloomFilterMergeBufferAggregator((ColumnValueSelector<ByteBuffer>) selector, getMaxNumEntries()); + return makeMergeAggregator(metricFactory); } @Override @@ -81,4 +70,14 @@ public class BloomFilterMergeAggregatorFactory extends BloomFilterAggregatorFact .appendInt(getMaxNumEntries()) .build(); } + + private BloomFilterMergeAggregator makeMergeAggregator(ColumnSelectorFactory metricFactory) + { + final BaseNullableColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName); + // null columns should be empty bloom filters by this point, so encountering a nil column in merge agg is unexpected + if (selector instanceof NilColumnValueSelector) { + throw new ISE("WTF?! Unexpected NilColumnValueSelector"); + } + return new BloomFilterMergeAggregator((ColumnValueSelector<ByteBuffer>) selector, getMaxNumEntries(), true); + } } diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeBufferAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeBufferAggregator.java deleted file mode 100644 index 026a23e..0000000 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeBufferAggregator.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.query.aggregation.bloom; - -import org.apache.druid.query.filter.BloomKFilter; -import org.apache.druid.segment.ColumnValueSelector; - -import java.nio.ByteBuffer; - -public final class BloomFilterMergeBufferAggregator extends BaseBloomFilterBufferAggregator<ColumnValueSelector<ByteBuffer>> -{ - public BloomFilterMergeBufferAggregator(ColumnValueSelector<ByteBuffer> selector, int maxNumEntries) - { - super(selector, maxNumEntries); - } - - @Override - public void bufferAdd(ByteBuffer buf) - { - ByteBuffer other = selector.getObject(); - BloomKFilter.mergeBloomFilterByteBuffers(buf, buf.position(), other, other.position()); - } -} diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/DoubleBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/DoubleBloomFilterAggregator.java index dfdae6c..8aa899e 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/DoubleBloomFilterAggregator.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/DoubleBloomFilterAggregator.java @@ -23,20 +23,22 @@ import org.apache.druid.common.config.NullHandling; import org.apache.druid.query.filter.BloomKFilter; import org.apache.druid.segment.BaseDoubleColumnValueSelector; +import java.nio.ByteBuffer; + public final class DoubleBloomFilterAggregator extends BaseBloomFilterAggregator<BaseDoubleColumnValueSelector> { - DoubleBloomFilterAggregator(BaseDoubleColumnValueSelector selector, BloomKFilter collector) + DoubleBloomFilterAggregator(BaseDoubleColumnValueSelector selector, int maxNumEntries, boolean onHeap) { - super(selector, collector); + super(selector, maxNumEntries, onHeap); } @Override - public void aggregate() + public void bufferAdd(ByteBuffer buf) { if (NullHandling.replaceWithDefault() || !selector.isNull()) { - collector.addDouble(selector.getDouble()); + BloomKFilter.addDouble(buf, selector.getDouble()); } else { - collector.addBytes(null, 0, 0); + BloomKFilter.addBytes(buf, null, 0, 0); } } } diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/DoubleBloomFilterBufferAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/DoubleBloomFilterBufferAggregator.java deleted file mode 100644 index e84b9fc..0000000 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/DoubleBloomFilterBufferAggregator.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.query.aggregation.bloom; - -import org.apache.druid.common.config.NullHandling; -import org.apache.druid.query.filter.BloomKFilter; -import org.apache.druid.segment.BaseDoubleColumnValueSelector; - -import java.nio.ByteBuffer; - -public final class DoubleBloomFilterBufferAggregator extends BaseBloomFilterBufferAggregator<BaseDoubleColumnValueSelector> -{ - DoubleBloomFilterBufferAggregator(BaseDoubleColumnValueSelector selector, int maxNumEntries) - { - super(selector, maxNumEntries); - } - - @Override - public void bufferAdd(ByteBuffer buf) - { - if (NullHandling.replaceWithDefault() || !selector.isNull()) { - BloomKFilter.addDouble(buf, selector.getDouble()); - } else { - BloomKFilter.addBytes(buf, null, 0, 0); - } - } -} diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/EmptyBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/EmptyBloomFilterAggregator.java deleted file mode 100644 index 57df6f2..0000000 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/EmptyBloomFilterAggregator.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.query.aggregation.bloom; - -import org.apache.druid.query.filter.BloomKFilter; -import org.apache.druid.segment.NilColumnValueSelector; - -public final class EmptyBloomFilterAggregator extends BaseBloomFilterAggregator<NilColumnValueSelector> -{ - EmptyBloomFilterAggregator(BloomKFilter collector) - { - super(NilColumnValueSelector.instance(), collector); - } - - @Override - public void aggregate() - { - // nothing to do - } -} diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/FloatBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/FloatBloomFilterAggregator.java index ae53d16..0a7c042 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/FloatBloomFilterAggregator.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/FloatBloomFilterAggregator.java @@ -23,20 +23,22 @@ import org.apache.druid.common.config.NullHandling; import org.apache.druid.query.filter.BloomKFilter; import org.apache.druid.segment.BaseFloatColumnValueSelector; +import java.nio.ByteBuffer; + public final class FloatBloomFilterAggregator extends BaseBloomFilterAggregator<BaseFloatColumnValueSelector> { - FloatBloomFilterAggregator(BaseFloatColumnValueSelector selector, BloomKFilter collector) + FloatBloomFilterAggregator(BaseFloatColumnValueSelector selector, int maxNumEntries, boolean onHeap) { - super(selector, collector); + super(selector, maxNumEntries, onHeap); } @Override - public void aggregate() + public void bufferAdd(ByteBuffer buf) { if (NullHandling.replaceWithDefault() || !selector.isNull()) { - collector.addFloat(selector.getFloat()); + BloomKFilter.addFloat(buf, selector.getFloat()); } else { - collector.addBytes(null, 0, 0); + BloomKFilter.addBytes(buf, null, 0, 0); } } } diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/FloatBloomFilterBufferAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/FloatBloomFilterBufferAggregator.java deleted file mode 100644 index 27e88d4..0000000 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/FloatBloomFilterBufferAggregator.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.query.aggregation.bloom; - -import org.apache.druid.common.config.NullHandling; -import org.apache.druid.query.filter.BloomKFilter; -import org.apache.druid.segment.BaseFloatColumnValueSelector; - -import java.nio.ByteBuffer; - -public final class FloatBloomFilterBufferAggregator extends BaseBloomFilterBufferAggregator<BaseFloatColumnValueSelector> -{ - FloatBloomFilterBufferAggregator(BaseFloatColumnValueSelector selector, int maxNumEntries) - { - super(selector, maxNumEntries); - } - - @Override - public void bufferAdd(ByteBuffer buf) - { - if (NullHandling.replaceWithDefault() || !selector.isNull()) { - BloomKFilter.addFloat(buf, selector.getFloat()); - } else { - BloomKFilter.addBytes(buf, null, 0, 0); - } - } -} diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/LongBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/LongBloomFilterAggregator.java index caa4739..3e232ce 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/LongBloomFilterAggregator.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/LongBloomFilterAggregator.java @@ -23,20 +23,22 @@ import org.apache.druid.common.config.NullHandling; import org.apache.druid.query.filter.BloomKFilter; import org.apache.druid.segment.BaseLongColumnValueSelector; +import java.nio.ByteBuffer; + public final class LongBloomFilterAggregator extends BaseBloomFilterAggregator<BaseLongColumnValueSelector> { - LongBloomFilterAggregator(BaseLongColumnValueSelector selector, BloomKFilter collector) + LongBloomFilterAggregator(BaseLongColumnValueSelector selector, int maxNumEntries, boolean onHeap) { - super(selector, collector); + super(selector, maxNumEntries, onHeap); } @Override - public void aggregate() + public void bufferAdd(ByteBuffer buf) { if (NullHandling.replaceWithDefault() || !selector.isNull()) { - collector.addLong(selector.getLong()); + BloomKFilter.addLong(buf, selector.getLong()); } else { - collector.addBytes(null, 0, 0); + BloomKFilter.addBytes(buf, null, 0, 0); } } } diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/LongBloomFilterBufferAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/LongBloomFilterBufferAggregator.java deleted file mode 100644 index 13a6634..0000000 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/LongBloomFilterBufferAggregator.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.query.aggregation.bloom; - -import org.apache.druid.common.config.NullHandling; -import org.apache.druid.query.filter.BloomKFilter; -import org.apache.druid.segment.BaseLongColumnValueSelector; - -import java.nio.ByteBuffer; - -public final class LongBloomFilterBufferAggregator extends BaseBloomFilterBufferAggregator<BaseLongColumnValueSelector> -{ - LongBloomFilterBufferAggregator(BaseLongColumnValueSelector selector, int maxNumEntries) - { - super(selector, maxNumEntries); - } - - @Override - public void bufferAdd(ByteBuffer buf) - { - if (NullHandling.replaceWithDefault() || !selector.isNull()) { - BloomKFilter.addLong(buf, selector.getLong()); - } else { - BloomKFilter.addBytes(buf, null, 0, 0); - } - } -} diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/EmptyBloomFilterBufferAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/NoopBloomFilterAggregator.java similarity index 78% rename from extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/EmptyBloomFilterBufferAggregator.java rename to extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/NoopBloomFilterAggregator.java index 7b6301d..ec23df5 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/EmptyBloomFilterBufferAggregator.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/NoopBloomFilterAggregator.java @@ -23,11 +23,11 @@ import org.apache.druid.segment.NilColumnValueSelector; import java.nio.ByteBuffer; -public final class EmptyBloomFilterBufferAggregator extends BaseBloomFilterBufferAggregator<NilColumnValueSelector> +public final class NoopBloomFilterAggregator extends BaseBloomFilterAggregator<NilColumnValueSelector> { - EmptyBloomFilterBufferAggregator(int maxNumEntries) + NoopBloomFilterAggregator(int maxNumEntries, boolean onHeap) { - super(NilColumnValueSelector.instance(), maxNumEntries); + super(NilColumnValueSelector.instance(), maxNumEntries, onHeap); } @Override @@ -41,4 +41,10 @@ public final class EmptyBloomFilterBufferAggregator extends BaseBloomFilterBuffe { // nothing to do } + + @Override + public void aggregate() + { + // nothing to do + } } diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterAggregator.java index 351ef84..f3f6dae 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterAggregator.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterAggregator.java @@ -22,32 +22,34 @@ package org.apache.druid.query.aggregation.bloom; import org.apache.druid.query.filter.BloomKFilter; import org.apache.druid.segment.DimensionSelector; +import java.nio.ByteBuffer; + public final class StringBloomFilterAggregator extends BaseBloomFilterAggregator<DimensionSelector> { - StringBloomFilterAggregator(DimensionSelector selector, BloomKFilter collector) + + StringBloomFilterAggregator(DimensionSelector selector, int maxNumEntries, boolean onHeap) { - super(selector, collector); + super(selector, maxNumEntries, onHeap); } @Override - public void aggregate() + public void bufferAdd(ByteBuffer buf) { - // note: there might be room for optimization here but behavior must match BloomDimFilter implementation if (selector.getRow().size() > 1) { selector.getRow().forEach(v -> { String value = selector.lookupName(v); if (value == null) { - collector.addBytes(null, 0, 0); + BloomKFilter.addBytes(buf, null, 0, 0); } else { - collector.addString(value); + BloomKFilter.addString(buf, value); } }); } else { String value = (String) selector.getObject(); if (value == null) { - collector.addBytes(null, 0, 0); + BloomKFilter.addBytes(buf, null, 0, 0); } else { - collector.addString(value); + BloomKFilter.addString(buf, value); } } } diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterBufferAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterBufferAggregator.java deleted file mode 100644 index c7c17c9..0000000 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterBufferAggregator.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.query.aggregation.bloom; - -import org.apache.druid.query.filter.BloomKFilter; -import org.apache.druid.segment.DimensionSelector; - -import java.nio.ByteBuffer; - -public final class StringBloomFilterBufferAggregator extends BaseBloomFilterBufferAggregator<DimensionSelector> -{ - - StringBloomFilterBufferAggregator(DimensionSelector selector, int maxNumEntries) - { - super(selector, maxNumEntries); - } - - @Override - public void bufferAdd(ByteBuffer buf) - { - if (selector.getRow().size() > 1) { - selector.getRow().forEach(v -> { - String value = selector.lookupName(v); - if (value == null) { - BloomKFilter.addBytes(buf, null, 0, 0); - } else { - BloomKFilter.addString(buf, value); - } - }); - } else { - String value = (String) selector.getObject(); - if (value == null) { - BloomKFilter.addBytes(buf, null, 0, 0); - } else { - BloomKFilter.addString(buf, value); - } - } - } -} diff --git a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorTest.java b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorTest.java index 790cf8c..da4479b 100644 --- a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorTest.java +++ b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorTest.java @@ -241,13 +241,15 @@ public class BloomFilterAggregatorTest public void testAggregateValues() throws IOException { DimensionSelector dimSelector = new CardinalityAggregatorTest.TestDimensionSelector(values1, null); - StringBloomFilterAggregator agg = new StringBloomFilterAggregator(dimSelector, new BloomKFilter(maxNumValues)); + StringBloomFilterAggregator agg = new StringBloomFilterAggregator(dimSelector, maxNumValues, true); for (int i = 0; i < values1.size(); ++i) { aggregateDimension(Collections.singletonList(dimSelector), agg); } - BloomKFilter bloomKFilter = (BloomKFilter) valueAggregatorFactory.finalizeComputation(agg.get()); + BloomKFilter bloomKFilter = BloomKFilter.deserialize( + (ByteBuffer) valueAggregatorFactory.finalizeComputation(agg.get()) + ); String serialized = filterToString(bloomKFilter); Assert.assertEquals(serializedFilter1, serialized); } @@ -256,13 +258,15 @@ public class BloomFilterAggregatorTest public void testAggregateLongValues() throws IOException { TestLongColumnSelector selector = new TestLongColumnSelector(Arrays.asList(longValues1)); - LongBloomFilterAggregator agg = new LongBloomFilterAggregator(selector, new BloomKFilter(maxNumValues)); + LongBloomFilterAggregator agg = new LongBloomFilterAggregator(selector, maxNumValues, true); for (Long ignored : longValues1) { aggregateColumn(Collections.singletonList(selector), agg); } - BloomKFilter bloomKFilter = (BloomKFilter) valueAggregatorFactory.finalizeComputation(agg.get()); + BloomKFilter bloomKFilter = BloomKFilter.deserialize( + (ByteBuffer) valueAggregatorFactory.finalizeComputation(agg.get()) + ); String serialized = filterToString(bloomKFilter); Assert.assertEquals(serializedLongFilter, serialized); } @@ -271,13 +275,15 @@ public class BloomFilterAggregatorTest public void testAggregateFloatValues() throws IOException { TestFloatColumnSelector selector = new TestFloatColumnSelector(Arrays.asList(floatValues1)); - FloatBloomFilterAggregator agg = new FloatBloomFilterAggregator(selector, new BloomKFilter(maxNumValues)); + FloatBloomFilterAggregator agg = new FloatBloomFilterAggregator(selector, maxNumValues, true); for (Float ignored : floatValues1) { aggregateColumn(Collections.singletonList(selector), agg); } - BloomKFilter bloomKFilter = (BloomKFilter) valueAggregatorFactory.finalizeComputation(agg.get()); + BloomKFilter bloomKFilter = BloomKFilter.deserialize( + (ByteBuffer) valueAggregatorFactory.finalizeComputation(agg.get()) + ); String serialized = filterToString(bloomKFilter); Assert.assertEquals(serializedFloatFilter, serialized); } @@ -286,13 +292,15 @@ public class BloomFilterAggregatorTest public void testAggregateDoubleValues() throws IOException { TestDoubleColumnSelector selector = new TestDoubleColumnSelector(Arrays.asList(doubleValues1)); - DoubleBloomFilterAggregator agg = new DoubleBloomFilterAggregator(selector, new BloomKFilter(maxNumValues)); + DoubleBloomFilterAggregator agg = new DoubleBloomFilterAggregator(selector, maxNumValues, true); for (Double ignored : doubleValues1) { aggregateColumn(Collections.singletonList(selector), agg); } - BloomKFilter bloomKFilter = (BloomKFilter) valueAggregatorFactory.finalizeComputation(agg.get()); + BloomKFilter bloomKFilter = BloomKFilter.deserialize( + (ByteBuffer) valueAggregatorFactory.finalizeComputation(agg.get()) + ); String serialized = filterToString(bloomKFilter); Assert.assertEquals(serializedDoubleFilter, serialized); } @@ -301,7 +309,7 @@ public class BloomFilterAggregatorTest public void testBufferAggregateStringValues() throws IOException { DimensionSelector dimSelector = new CardinalityAggregatorTest.TestDimensionSelector(values2, null); - StringBloomFilterBufferAggregator agg = new StringBloomFilterBufferAggregator(dimSelector, maxNumValues); + StringBloomFilterAggregator agg = new StringBloomFilterAggregator(dimSelector, maxNumValues, true); int maxSize = valueAggregatorFactory.getMaxIntermediateSizeWithNulls(); ByteBuffer buf = ByteBuffer.allocate(maxSize + 64); @@ -313,7 +321,9 @@ public class BloomFilterAggregatorTest for (int i = 0; i < values2.size(); ++i) { bufferAggregateDimension(Collections.singletonList(dimSelector), agg, buf, pos); } - BloomKFilter bloomKFilter = (BloomKFilter) valueAggregatorFactory.finalizeComputation(agg.get(buf, pos)); + BloomKFilter bloomKFilter = BloomKFilter.deserialize( + (ByteBuffer) valueAggregatorFactory.finalizeComputation(agg.get(buf, pos)) + ); String serialized = filterToString(bloomKFilter); Assert.assertEquals(serializedFilter2, serialized); } @@ -322,7 +332,7 @@ public class BloomFilterAggregatorTest public void testBufferAggregateLongValues() throws IOException { TestLongColumnSelector selector = new TestLongColumnSelector(Arrays.asList(longValues1)); - LongBloomFilterBufferAggregator agg = new LongBloomFilterBufferAggregator(selector, maxNumValues); + LongBloomFilterAggregator agg = new LongBloomFilterAggregator(selector, maxNumValues, true); int maxSize = valueAggregatorFactory.getMaxIntermediateSizeWithNulls(); ByteBuffer buf = ByteBuffer.allocate(maxSize + 64); @@ -333,7 +343,9 @@ public class BloomFilterAggregatorTest IntStream.range(0, longValues1.length) .forEach(i -> bufferAggregateColumn(Collections.singletonList(selector), agg, buf, pos)); - BloomKFilter bloomKFilter = (BloomKFilter) valueAggregatorFactory.finalizeComputation(agg.get(buf, pos)); + BloomKFilter bloomKFilter = BloomKFilter.deserialize( + (ByteBuffer) valueAggregatorFactory.finalizeComputation(agg.get(buf, pos)) + ); String serialized = filterToString(bloomKFilter); Assert.assertEquals(serializedLongFilter, serialized); } @@ -342,7 +354,7 @@ public class BloomFilterAggregatorTest public void testBufferAggregateFloatValues() throws IOException { TestFloatColumnSelector selector = new TestFloatColumnSelector(Arrays.asList(floatValues1)); - FloatBloomFilterBufferAggregator agg = new FloatBloomFilterBufferAggregator(selector, maxNumValues); + FloatBloomFilterAggregator agg = new FloatBloomFilterAggregator(selector, maxNumValues, true); int maxSize = valueAggregatorFactory.getMaxIntermediateSizeWithNulls(); ByteBuffer buf = ByteBuffer.allocate(maxSize + 64); @@ -353,7 +365,9 @@ public class BloomFilterAggregatorTest IntStream.range(0, floatValues1.length) .forEach(i -> bufferAggregateColumn(Collections.singletonList(selector), agg, buf, pos)); - BloomKFilter bloomKFilter = (BloomKFilter) valueAggregatorFactory.finalizeComputation(agg.get(buf, pos)); + BloomKFilter bloomKFilter = BloomKFilter.deserialize( + (ByteBuffer) valueAggregatorFactory.finalizeComputation(agg.get(buf, pos)) + ); String serialized = filterToString(bloomKFilter); Assert.assertEquals(serializedFloatFilter, serialized); } @@ -362,7 +376,7 @@ public class BloomFilterAggregatorTest public void testBufferAggregateDoubleValues() throws IOException { TestDoubleColumnSelector selector = new TestDoubleColumnSelector(Arrays.asList(doubleValues1)); - DoubleBloomFilterBufferAggregator agg = new DoubleBloomFilterBufferAggregator(selector, maxNumValues); + DoubleBloomFilterAggregator agg = new DoubleBloomFilterAggregator(selector, maxNumValues, true); int maxSize = valueAggregatorFactory.getMaxIntermediateSizeWithNulls(); ByteBuffer buf = ByteBuffer.allocate(maxSize + 64); @@ -373,7 +387,9 @@ public class BloomFilterAggregatorTest IntStream.range(0, doubleValues1.length) .forEach(i -> bufferAggregateColumn(Collections.singletonList(selector), agg, buf, pos)); - BloomKFilter bloomKFilter = (BloomKFilter) valueAggregatorFactory.finalizeComputation(agg.get(buf, pos)); + BloomKFilter bloomKFilter = BloomKFilter.deserialize( + (ByteBuffer) valueAggregatorFactory.finalizeComputation(agg.get(buf, pos)) + ); String serialized = filterToString(bloomKFilter); Assert.assertEquals(serializedDoubleFilter, serialized); } @@ -384,8 +400,8 @@ public class BloomFilterAggregatorTest DimensionSelector dimSelector1 = new CardinalityAggregatorTest.TestDimensionSelector(values1, null); DimensionSelector dimSelector2 = new CardinalityAggregatorTest.TestDimensionSelector(values2, null); - StringBloomFilterAggregator agg1 = new StringBloomFilterAggregator(dimSelector1, new BloomKFilter(maxNumValues)); - StringBloomFilterAggregator agg2 = new StringBloomFilterAggregator(dimSelector2, new BloomKFilter(maxNumValues)); + StringBloomFilterAggregator agg1 = new StringBloomFilterAggregator(dimSelector1, maxNumValues, true); + StringBloomFilterAggregator agg2 = new StringBloomFilterAggregator(dimSelector2, maxNumValues, true); for (int i = 0; i < values1.size(); ++i) { aggregateDimension(Collections.singletonList(dimSelector1), agg1); @@ -394,10 +410,12 @@ public class BloomFilterAggregatorTest aggregateDimension(Collections.singletonList(dimSelector2), agg2); } - BloomKFilter combined = (BloomKFilter) valueAggregatorFactory.finalizeComputation( - valueAggregatorFactory.combine( - agg1.get(), - agg2.get() + BloomKFilter combined = BloomKFilter.deserialize( + (ByteBuffer) valueAggregatorFactory.finalizeComputation( + valueAggregatorFactory.combine( + agg1.get(), + agg2.get() + ) ) ); @@ -408,19 +426,25 @@ public class BloomFilterAggregatorTest @Test public void testMergeValues() throws IOException { - final TestBloomFilterColumnSelector mergeDim = - new TestBloomFilterColumnSelector(ImmutableList.of(filter1, filter2)); + final TestBloomFilterBufferColumnSelector mergeDim = + new TestBloomFilterBufferColumnSelector( + ImmutableList.of( + ByteBuffer.wrap(BloomFilterSerializersModule.bloomKFilterToBytes(filter1)), + ByteBuffer.wrap(BloomFilterSerializersModule.bloomKFilterToBytes(filter2)) + ) + ); BloomFilterMergeAggregator mergeAggregator = - new BloomFilterMergeAggregator(mergeDim, new BloomKFilter(maxNumValues)); + new BloomFilterMergeAggregator(mergeDim, maxNumValues, true); for (int i = 0; i < 2; ++i) { aggregateColumn(Collections.singletonList(mergeDim), mergeAggregator); } - BloomKFilter merged = (BloomKFilter) valueAggregatorFactory.getCombiningFactory() - .finalizeComputation(mergeAggregator.get()); + BloomKFilter merged = BloomKFilter.deserialize( + (ByteBuffer) valueAggregatorFactory.getCombiningFactory().finalizeComputation(mergeAggregator.get()) + ); String serialized = filterToString(merged); Assert.assertEquals(serializedCombinedFilter, serialized); } @@ -428,8 +452,8 @@ public class BloomFilterAggregatorTest @Test public void testMergeValuesWithBuffersForGroupByV1() throws IOException { - final TestBloomFilterColumnSelector mergeDim = - new TestBloomFilterColumnSelector( + final TestBloomFilterBufferColumnSelector mergeDim = + new TestBloomFilterBufferColumnSelector( ImmutableList.of( ByteBuffer.wrap(BloomFilterSerializersModule.bloomKFilterToBytes(filter1)), ByteBuffer.wrap(BloomFilterSerializersModule.bloomKFilterToBytes(filter2)) @@ -437,15 +461,16 @@ public class BloomFilterAggregatorTest ); BloomFilterMergeAggregator mergeAggregator = - new BloomFilterMergeAggregator(mergeDim, new BloomKFilter(maxNumValues)); + new BloomFilterMergeAggregator(mergeDim, maxNumValues, true); for (int i = 0; i < 2; ++i) { aggregateColumn(Collections.singletonList(mergeDim), mergeAggregator); } - BloomKFilter merged = (BloomKFilter) valueAggregatorFactory.getCombiningFactory() - .finalizeComputation(mergeAggregator.get()); + BloomKFilter merged = BloomKFilter.deserialize( + (ByteBuffer) valueAggregatorFactory.getCombiningFactory().finalizeComputation(mergeAggregator.get()) + ); String serialized = filterToString(merged); Assert.assertEquals(serializedCombinedFilter, serialized); } @@ -461,7 +486,7 @@ public class BloomFilterAggregatorTest ) ); - BloomFilterMergeBufferAggregator mergeAggregator = new BloomFilterMergeBufferAggregator(mergeDim, maxNumValues); + BloomFilterMergeAggregator mergeAggregator = new BloomFilterMergeAggregator(mergeDim, maxNumValues, false); int maxSize = valueAggregatorFactory.getCombiningFactory().getMaxIntermediateSizeWithNulls(); ByteBuffer buf = ByteBuffer.allocate(maxSize + 64); @@ -474,8 +499,9 @@ public class BloomFilterAggregatorTest bufferAggregateColumn(Collections.singletonList(mergeDim), mergeAggregator, buf, pos); } - BloomKFilter merged = (BloomKFilter) valueAggregatorFactory.getCombiningFactory() - .finalizeComputation(mergeAggregator.get(buf, pos)); + BloomKFilter merged = BloomKFilter.deserialize( + (ByteBuffer) valueAggregatorFactory.getCombiningFactory().finalizeComputation(mergeAggregator.get(buf, pos)) + ); String serialized = filterToString(merged); Assert.assertEquals(serializedCombinedFilter, serialized); @@ -596,14 +622,6 @@ public class BloomFilterAggregatorTest } } - public static class TestBloomFilterColumnSelector extends SteppableSelector<Object> - { - public TestBloomFilterColumnSelector(List<Object> values) - { - super(values); - } - } - public static class TestBloomFilterBufferColumnSelector extends SteppableSelector<ByteBuffer> { public TestBloomFilterBufferColumnSelector(List<ByteBuffer> values) diff --git a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterGroupByQueryTest.java b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterGroupByQueryTest.java index a2207f2..ce3b932 100644 --- a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterGroupByQueryTest.java +++ b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterGroupByQueryTest.java @@ -42,6 +42,7 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -108,9 +109,10 @@ public class BloomFilterGroupByQueryTest MapBasedRow row = ingestAndQuery(query); - Assert.assertTrue(((BloomKFilter) row.getRaw("blooming_quality")).testString("mezzanine")); - Assert.assertTrue(((BloomKFilter) row.getRaw("blooming_quality")).testString("premium")); - Assert.assertFalse(((BloomKFilter) row.getRaw("blooming_quality")).testString("entertainment")); + BloomKFilter filter = BloomKFilter.deserialize((ByteBuffer) row.getRaw("blooming_quality")); + Assert.assertTrue(filter.testString("mezzanine")); + Assert.assertTrue(filter.testString("premium")); + Assert.assertFalse(filter.testString("entertainment")); } @Test @@ -135,7 +137,7 @@ public class BloomFilterGroupByQueryTest Object val = row.getRaw("blooming_quality"); - String serialized = BloomFilterAggregatorTest.filterToString((BloomKFilter) val); + String serialized = BloomFilterAggregatorTest.filterToString(BloomKFilter.deserialize((ByteBuffer) val)); String empty = BloomFilterAggregatorTest.filterToString(filter); Assert.assertEquals(empty, serialized); diff --git a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregatorTest.java b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregatorTest.java index 6b218bb..17241ed 100644 --- a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregatorTest.java +++ b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregatorTest.java @@ -179,7 +179,6 @@ public class BloomFilterSqlAggregatorTest .rows(CalciteTests.ROWS1_WITH_NUMERIC_DIMS) .buildMMappedIndex(); - walker = new SpecificSegmentsQuerySegmentWalker(conglomerate).add( DataSegment.builder() .dataSource(DATA_SOURCE) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java index 9ca3233..898b19f 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java @@ -425,8 +425,6 @@ public class CalciteTests ) ); - - public static final List<InputRow> ROWS2 = ImmutableList.of( createRow("2000-01-01", "דרואיד", "he", 1.0), createRow("2000-01-01", "druid", "en", 1.0), --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
