This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch aggregation_filter in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 9a2b052d63cbf3802cc24a40d2d9cefd5912cac4 Author: Xiaotian (Jackie) Jiang <[email protected]> AuthorDate: Thu Dec 30 23:32:33 2021 +0100 Aggregation Filter --- .../pinot/core/operator/blocks/FilterBlock.java | 15 ++- .../core/operator/docidsets/BitmapDocIdSet.java | 12 ++- .../operator/docidsets/FilterBlockDocIdSet.java | 40 ++++++++ ...pDocIdSet.java => RangelessBitmapDocIdSet.java} | 20 ++-- .../operator/filter/CombinedFilterOperator.java | 66 ++++++++++++ .../query/FilteredAggregationOperator.java | 111 +++++++++++++++++++++ 6 files changed, 248 insertions(+), 16 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/FilterBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/FilterBlock.java index 1f87255..aa97bc4 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/FilterBlock.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/FilterBlock.java @@ -30,14 +30,25 @@ import org.apache.pinot.core.operator.docidsets.FilterBlockDocIdSet; */ public class FilterBlock implements Block { private final FilterBlockDocIdSet _filterBlockDocIdSet; + private FilterBlockDocIdSet _nonScanFilterBlockDocIdSet; public FilterBlock(FilterBlockDocIdSet filterBlockDocIdSet) { _filterBlockDocIdSet = filterBlockDocIdSet; } + /** + * Pre-scans the documents if needed, and returns a non-scan-based FilterBlockDocIdSet. + */ + public FilterBlockDocIdSet getNonScanFilterBLockDocIdSet() { + if (_nonScanFilterBlockDocIdSet == null) { + _nonScanFilterBlockDocIdSet = _filterBlockDocIdSet.toNonScanDocIdSet(); + } + return _nonScanFilterBlockDocIdSet; + } + @Override public FilterBlockDocIdSet getBlockDocIdSet() { - return _filterBlockDocIdSet; + return _nonScanFilterBlockDocIdSet != null ? _nonScanFilterBlockDocIdSet : _filterBlockDocIdSet; } @Override @@ -54,4 +65,4 @@ public class FilterBlock implements Block { public BlockMetadata getMetadata() { throw new UnsupportedOperationException(); } -} +} \ No newline at end of file diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/BitmapDocIdSet.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/BitmapDocIdSet.java index eacd4e3..a69ac00 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/BitmapDocIdSet.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/BitmapDocIdSet.java @@ -23,17 +23,19 @@ import org.roaringbitmap.buffer.ImmutableRoaringBitmap; public class BitmapDocIdSet implements FilterBlockDocIdSet { - private final ImmutableRoaringBitmap _docIds; - private final int _numDocs; + private final BitmapDocIdIterator _iterator; public BitmapDocIdSet(ImmutableRoaringBitmap docIds, int numDocs) { - _docIds = docIds; - _numDocs = numDocs; + _iterator = new BitmapDocIdIterator(docIds, numDocs); + } + + public BitmapDocIdSet(BitmapDocIdIterator iterator) { + _iterator = iterator; } @Override public BitmapDocIdIterator iterator() { - return new BitmapDocIdIterator(_docIds, _numDocs); + return _iterator; } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/FilterBlockDocIdSet.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/FilterBlockDocIdSet.java index 92b6ac7..18dab2b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/FilterBlockDocIdSet.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/FilterBlockDocIdSet.java @@ -18,7 +18,16 @@ */ package org.apache.pinot.core.operator.docidsets; +import org.apache.pinot.core.common.BlockDocIdIterator; import org.apache.pinot.core.common.BlockDocIdSet; +import org.apache.pinot.core.operator.dociditerators.AndDocIdIterator; +import org.apache.pinot.core.operator.dociditerators.BitmapDocIdIterator; +import org.apache.pinot.core.operator.dociditerators.OrDocIdIterator; +import org.apache.pinot.core.operator.dociditerators.RangelessBitmapDocIdIterator; +import org.apache.pinot.core.operator.dociditerators.ScanBasedDocIdIterator; +import org.apache.pinot.segment.spi.Constants; +import org.roaringbitmap.RoaringBitmapWriter; +import org.roaringbitmap.buffer.MutableRoaringBitmap; /** @@ -32,4 +41,35 @@ public interface FilterBlockDocIdSet extends BlockDocIdSet { * filtering phase. This method should be called after the filtering is done. */ long getNumEntriesScannedInFilter(); + + /** + * For scan-based FilterBlockDocIdSet, pre-scans the documents and returns a non-scan-based FilterBlockDocIdSet. + */ + default FilterBlockDocIdSet toNonScanDocIdSet() { + BlockDocIdIterator docIdIterator = iterator(); + + // NOTE: AND and OR DocIdIterator might contain scan-based DocIdIterator + // TODO: This scan is not counted in the execution stats + if (docIdIterator instanceof ScanBasedDocIdIterator || docIdIterator instanceof AndDocIdIterator + || docIdIterator instanceof OrDocIdIterator) { + RoaringBitmapWriter<MutableRoaringBitmap> bitmapWriter = + RoaringBitmapWriter.bufferWriter().runCompress(false).get(); + int docId; + while ((docId = docIdIterator.next()) != Constants.EOF) { + bitmapWriter.add(docId); + } + return new RangelessBitmapDocIdSet(bitmapWriter.get()); + } + + // NOTE: AND and OR DocIdSet might return BitmapBasedDocIdIterator after processing the iterators. Create a new + // DocIdSet to prevent processing the iterators again + if (docIdIterator instanceof RangelessBitmapDocIdIterator) { + return new RangelessBitmapDocIdSet((RangelessBitmapDocIdIterator) docIdIterator); + } + if (docIdIterator instanceof BitmapDocIdIterator) { + return new BitmapDocIdSet((BitmapDocIdIterator) docIdIterator); + } + + return this; + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/BitmapDocIdSet.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/RangelessBitmapDocIdSet.java similarity index 66% copy from pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/BitmapDocIdSet.java copy to pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/RangelessBitmapDocIdSet.java index eacd4e3..463a2df 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/BitmapDocIdSet.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/RangelessBitmapDocIdSet.java @@ -18,22 +18,24 @@ */ package org.apache.pinot.core.operator.docidsets; -import org.apache.pinot.core.operator.dociditerators.BitmapDocIdIterator; +import org.apache.pinot.core.operator.dociditerators.RangelessBitmapDocIdIterator; import org.roaringbitmap.buffer.ImmutableRoaringBitmap; -public class BitmapDocIdSet implements FilterBlockDocIdSet { - private final ImmutableRoaringBitmap _docIds; - private final int _numDocs; +public class RangelessBitmapDocIdSet implements FilterBlockDocIdSet { + private final RangelessBitmapDocIdIterator _iterator; - public BitmapDocIdSet(ImmutableRoaringBitmap docIds, int numDocs) { - _docIds = docIds; - _numDocs = numDocs; + public RangelessBitmapDocIdSet(ImmutableRoaringBitmap docIds) { + _iterator = new RangelessBitmapDocIdIterator(docIds); + } + + public RangelessBitmapDocIdSet(RangelessBitmapDocIdIterator iterator) { + _iterator = iterator; } @Override - public BitmapDocIdIterator iterator() { - return new BitmapDocIdIterator(_docIds, _numDocs); + public RangelessBitmapDocIdIterator iterator() { + return _iterator; } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/CombinedFilterOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/CombinedFilterOperator.java new file mode 100644 index 0000000..54c26dc --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/CombinedFilterOperator.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.filter; + +import java.util.Arrays; +import java.util.List; +import org.apache.pinot.core.common.Operator; +import org.apache.pinot.core.operator.blocks.FilterBlock; +import org.apache.pinot.core.operator.docidsets.AndDocIdSet; +import org.apache.pinot.core.operator.docidsets.FilterBlockDocIdSet; + + +/** + * A combined filter operator consisting of one main filter operator and one sub filter operator. The result block is + * the AND result of the main and sub filter. + */ +public class CombinedFilterOperator extends BaseFilterOperator { + private static final String OPERATOR_NAME = "CombinedFilterOperator"; + private static final String EXPLAIN_NAME = "FILTER_COMBINED"; + + private final BaseFilterOperator _mainFilterOperator; + private final BaseFilterOperator _subFilterOperator; + + public CombinedFilterOperator(BaseFilterOperator mainFilterOperator, BaseFilterOperator subFilterOperator) { + _mainFilterOperator = mainFilterOperator; + _subFilterOperator = subFilterOperator; + } + + @Override + public String getOperatorName() { + return OPERATOR_NAME; + } + + @Override + public List<Operator> getChildOperators() { + return Arrays.asList(_mainFilterOperator, _subFilterOperator); + } + + @Override + public String toExplainString() { + return EXPLAIN_NAME; + } + + @Override + protected FilterBlock getNextBlock() { + FilterBlockDocIdSet mainFilterDocIdSet = _mainFilterOperator.nextBlock().getNonScanFilterBLockDocIdSet(); + FilterBlockDocIdSet subFilterDocIdSet = _subFilterOperator.nextBlock().getBlockDocIdSet(); + return new FilterBlock(new AndDocIdSet(Arrays.asList(mainFilterDocIdSet, subFilterDocIdSet))); + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredAggregationOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredAggregationOperator.java new file mode 100644 index 0000000..727a820 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredAggregationOperator.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.query; + +import java.util.Arrays; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pinot.core.common.Operator; +import org.apache.pinot.core.operator.BaseOperator; +import org.apache.pinot.core.operator.ExecutionStatistics; +import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock; +import org.apache.pinot.core.operator.blocks.TransformBlock; +import org.apache.pinot.core.operator.transform.TransformOperator; +import org.apache.pinot.core.query.aggregation.AggregationExecutor; +import org.apache.pinot.core.query.aggregation.DefaultAggregationExecutor; +import org.apache.pinot.core.query.aggregation.function.AggregationFunction; + + +/** + * The <code>AggregationOperator</code> class provides the operator for aggregation only query on a single segment. + */ +@SuppressWarnings("rawtypes") +public class FilteredAggregationOperator extends BaseOperator<IntermediateResultsBlock> { + private static final String OPERATOR_NAME = "FilteredAggregationOperator"; + private static final String EXPLAIN_NAME = "FILTERED_AGGREGATE"; + + private final AggregationFunction[] _aggregationFunctions; + private final List<Pair<AggregationFunction[], TransformOperator>> _filteredAggregations; + private final long _numTotalDocs; + + private long _numDocsScanned; + private long _numEntriesScannedInFilter; + private long _numEntriesScannedPostFilter; + + public FilteredAggregationOperator(AggregationFunction[] aggregationFunctions, + List<Pair<AggregationFunction[], TransformOperator>> filteredAggregations, long numTotalDocs) { + _aggregationFunctions = aggregationFunctions; + _filteredAggregations = filteredAggregations; + _numTotalDocs = numTotalDocs; + } + + @Override + protected IntermediateResultsBlock getNextBlock() { + int numAggregations = _aggregationFunctions.length; + Object[] result = new Object[numAggregations]; + IdentityHashMap<AggregationFunction, Integer> resultIndexMap = new IdentityHashMap<>(numAggregations); + for (int i = 0; i < numAggregations; i++) { + resultIndexMap.put(_aggregationFunctions[i], i); + } + for (Pair<AggregationFunction[], TransformOperator> filteredAggregation : _filteredAggregations) { + AggregationFunction[] aggregationFunctions = filteredAggregation.getLeft(); + AggregationExecutor aggregationExecutor = new DefaultAggregationExecutor(aggregationFunctions); + TransformOperator transformOperator = filteredAggregation.getRight(); + TransformBlock transformBlock; + int numDocsScanned = 0; + while ((transformBlock = transformOperator.nextBlock()) != null) { + aggregationExecutor.aggregate(transformBlock); + numDocsScanned += transformBlock.getNumDocs(); + } + List<Object> filteredResult = aggregationExecutor.getResult(); + int numFilteredAggregations = aggregationFunctions.length; + for (int i = 0; i < numFilteredAggregations; i++) { + result[resultIndexMap.get(aggregationFunctions[i])] = filteredResult.get(i); + } + _numDocsScanned += numDocsScanned; + _numEntriesScannedInFilter += transformOperator.getExecutionStatistics().getNumEntriesScannedInFilter(); + _numEntriesScannedPostFilter += (long) numDocsScanned * transformOperator.getNumColumnsProjected(); + } + return new IntermediateResultsBlock(_aggregationFunctions, Arrays.asList(result), false); + } + + @Override + public String getOperatorName() { + return OPERATOR_NAME; + } + + @Override + public List<Operator> getChildOperators() { + return _filteredAggregations.stream().map(Pair::getRight).collect(Collectors.toList()); + } + + @Override + public ExecutionStatistics getExecutionStatistics() { + return new ExecutionStatistics(_numDocsScanned, _numEntriesScannedInFilter, _numEntriesScannedPostFilter, + _numTotalDocs); + } + + @Override + public String toExplainString() { + // TODO: To be added + return null; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
