This is an automated email from the ASF dual-hosted git repository. kishoreg pushed a commit to branch range-index in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 7b2eacf65dfb06f411756a11f1e44b98a557e8ad Author: kishoreg <g.kish...@gmail.com> AuthorDate: Fri Apr 10 23:02:13 2020 -0700 Initial code dump for supporting range queries efficiently --- .../pinot/core/common/BlockMultiValIterator.java | 4 +- .../org/apache/pinot/core/common/DataSource.java | 6 + .../indexsegment/mutable/MutableSegmentImpl.java | 4 +- .../DictionaryBasedMultiValueIterator.java | 108 ++++++ ...ava => DictionaryBasedSingleValueIterator.java} | 43 ++- .../docvaliterators/MultiValueIterator.java | 25 ++ .../core/operator/filter/FilterOperatorUtils.java | 4 + .../core/operator/filter/RangeFilterOperator.java | 113 ++++++ .../org/apache/pinot/core/plan/FilterPlanNode.java | 1 - .../core/segment/creator/impl/V1Constants.java | 1 + .../impl/inv/DictionaryBasedRangeIndexCreator.java | 416 +++++++++++++++++++++ .../segment/index/column/ColumnIndexContainer.java | 5 + .../index/column/PhysicalColumnIndexContainer.java | 20 +- .../segment/index/datasource/BaseDataSource.java | 11 +- .../index/datasource/ImmutableDataSource.java | 2 +- .../index/datasource/MutableDataSource.java | 4 +- .../segment/index/loader/IndexLoadingConfig.java | 18 + .../segment/index/loader/SegmentPreProcessor.java | 6 + .../loader/invertedindex/RangeIndexHandler.java | 168 +++++++++ .../segment/index/metadata/SegmentMetadata.java | 2 + .../index/metadata/SegmentMetadataImpl.java | 5 + .../index/readers/BitmapRangeIndexReader.java | 134 +++++++ .../core/segment/store/ColumnIndexDirectory.java | 19 + .../pinot/core/segment/store/ColumnIndexType.java | 3 +- .../core/segment/store/FilePerIndexDirectory.java | 17 + .../segment/store/SegmentLocalFSDirectory.java | 5 + .../segment/store/SingleFileIndexDirectory.java | 12 + .../virtualcolumn/VirtualColumnIndexContainer.java | 5 + .../core/startree/v2/store/StarTreeDataSource.java | 3 +- .../core/common/RealtimeNoDictionaryTest.java | 12 +- .../apache/pinot/spi/config/IndexingConfig.java | 9 + .../tools/scan/query/RangePredicateFilter.java | 8 + .../baseballStats_offline_table_config.json | 3 + 33 files changed, 1172 insertions(+), 24 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/BlockMultiValIterator.java b/pinot-core/src/main/java/org/apache/pinot/core/common/BlockMultiValIterator.java index 69d5da5..68c0fc5 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/BlockMultiValIterator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/BlockMultiValIterator.java @@ -24,7 +24,7 @@ package org.apache.pinot.core.common; */ public abstract class BlockMultiValIterator implements BlockValIterator { - public int nextCharVal(int[] charArray) { + public int nextCharVal(char[] charArray) { throw new UnsupportedOperationException(); } @@ -44,7 +44,7 @@ public abstract class BlockMultiValIterator implements BlockValIterator { throw new UnsupportedOperationException(); } - public byte[][] nextBytesArrayVal(byte[][] bytesArrays) { + public int nextBytesArrayVal(byte[][] bytesArrays) { throw new UnsupportedOperationException(); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/DataSource.java b/pinot-core/src/main/java/org/apache/pinot/core/common/DataSource.java index 8f33931..492c344 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/DataSource.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/DataSource.java @@ -55,6 +55,12 @@ public abstract class DataSource extends BaseOperator { public abstract InvertedIndexReader getInvertedIndex(); /** + * Returns the inverted index for the column if exists, or {@code null} if not. + */ + @Nullable + public abstract InvertedIndexReader getRangeIndex(); + + /** * Returns the bloom filter for the column if exists, or {@code null} if not. */ @Nullable diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java index ba42e8b..53aee51 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java @@ -112,6 +112,7 @@ public class MutableSegmentImpl implements MutableSegment { private final Map<String, BaseMutableDictionary> _dictionaryMap = new HashMap<>(); private final Map<String, DataFileReader> _indexReaderWriterMap = new HashMap<>(); private final Map<String, InvertedIndexReader> _invertedIndexMap = new HashMap<>(); + private final Map<String, InvertedIndexReader> _rangeIndexMap = new HashMap<>(); private final Map<String, BloomFilterReader> _bloomFilterMap = new HashMap<>(); private final Map<String, RealtimeNullValueVectorReaderWriter> _nullValueVectorMap = new HashMap<>(); private final IdMap<FixedIntArray> _recordIdMap; @@ -607,11 +608,12 @@ public class MutableSegmentImpl implements MutableSegment { DataFileReader forwardIndex = _indexReaderWriterMap.get(column); BaseMutableDictionary dictionary = _dictionaryMap.get(column); InvertedIndexReader invertedIndex = _invertedIndexMap.get(column); + InvertedIndexReader rangeIndex = _rangeIndexMap.get(column); BloomFilterReader bloomFilter = _bloomFilterMap.get(column); RealtimeNullValueVectorReaderWriter nullValueVector = _nullValueVectorMap.get(column); return new MutableDataSource(fieldSpec, _numDocsIndexed, numValuesInfo.getNumValues(), numValuesInfo.getMaxNumValuesPerMVEntry(), partitionFunction, partitionId, forwardIndex, dictionary, - invertedIndex, bloomFilter, nullValueVector); + invertedIndex, rangeIndex, bloomFilter, nullValueVector); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/DictionaryBasedMultiValueIterator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/DictionaryBasedMultiValueIterator.java new file mode 100644 index 0000000..9f3dc2a --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/DictionaryBasedMultiValueIterator.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.docvaliterators; + +import org.apache.pinot.core.common.BlockMultiValIterator; +import org.apache.pinot.core.common.BlockSingleValIterator; +import org.apache.pinot.core.io.reader.ReaderContext; +import org.apache.pinot.core.io.reader.SingleColumnMultiValueReader; +import org.apache.pinot.core.io.reader.SingleColumnSingleValueReader; +import org.apache.pinot.core.segment.index.readers.Dictionary; + + +@SuppressWarnings("unchecked") +public final class DictionaryBasedMultiValueIterator extends BlockMultiValIterator { + + private final SingleColumnMultiValueReader _reader; + private final int _numDocs; + private final ReaderContext _context; + private final Dictionary _dictionary; + private final int[] _dictIds; + + private int _nextDocId; + + public DictionaryBasedMultiValueIterator(SingleColumnMultiValueReader reader, Dictionary dictionary, int numDocs, + int maxLength) { + _reader = reader; + _numDocs = numDocs; + _context = _reader.createContext(); + _dictionary = dictionary; + _dictIds = new int[maxLength]; + } + + @Override + public int nextIntVal(int[] intArray) { + int length = _reader.getIntArray(_nextDocId++, _dictIds, _context); + for (int i = 0; i < length; i++) { + intArray[i] = _dictionary.getIntValue(_dictIds[i]); + } + return length; + } + + @Override + public int nextDoubleVal(double[] doubleArray) { + int length = _reader.getIntArray(_nextDocId++, _dictIds, _context); + for (int i = 0; i < length; i++) { + doubleArray[i] = _dictionary.getDoubleValue(_dictIds[i]); + } + return length; + } + + @Override + public int nextFloatVal(float[] floatArray) { + int length = _reader.getIntArray(_nextDocId++, _dictIds, _context); + for (int i = 0; i < length; i++) { + floatArray[i] = _dictionary.getFloatValue(_dictIds[i]); + } + return length; + } + + @Override + public int nextLongVal(long[] longArray) { + int length = _reader.getIntArray(_nextDocId++, _dictIds, _context); + for (int i = 0; i < length; i++) { + longArray[i] = _dictionary.getLongValue(_dictIds[i]); + } + return length; + } + + @Override + public int nextBytesArrayVal(byte[][] bytesArrays) { + int length = _reader.getIntArray(_nextDocId++, _dictIds, _context); + for (int i = 0; i < length; i++) { + bytesArrays[i] = _dictionary.getBytesValue(_dictIds[i]); + } + return length; + } + + @Override + public boolean hasNext() { + return _nextDocId < _numDocs; + } + + @Override + public void skipTo(int docId) { + _nextDocId = docId; + } + + @Override + public void reset() { + _nextDocId = 0; + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/MultiValueIterator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/DictionaryBasedSingleValueIterator.java similarity index 52% copy from pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/MultiValueIterator.java copy to pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/DictionaryBasedSingleValueIterator.java index 85ae7c7..34855e0 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/MultiValueIterator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/DictionaryBasedSingleValueIterator.java @@ -18,28 +18,57 @@ */ package org.apache.pinot.core.operator.docvaliterators; -import org.apache.pinot.core.common.BlockMultiValIterator; +import org.apache.pinot.core.common.BlockSingleValIterator; import org.apache.pinot.core.io.reader.ReaderContext; -import org.apache.pinot.core.io.reader.SingleColumnMultiValueReader; +import org.apache.pinot.core.io.reader.SingleColumnSingleValueReader; +import org.apache.pinot.core.segment.index.readers.Dictionary; @SuppressWarnings("unchecked") -public final class MultiValueIterator extends BlockMultiValIterator { - private final SingleColumnMultiValueReader _reader; +public final class DictionaryBasedSingleValueIterator extends BlockSingleValIterator { + + private final SingleColumnSingleValueReader _reader; private final int _numDocs; private final ReaderContext _context; + private final Dictionary _dictionary; private int _nextDocId; - public MultiValueIterator(SingleColumnMultiValueReader reader, int numDocs) { + public DictionaryBasedSingleValueIterator(SingleColumnSingleValueReader reader, Dictionary dictionary, int numDocs) { _reader = reader; _numDocs = numDocs; _context = _reader.createContext(); + _dictionary = dictionary; + } + + @Override + public int nextIntVal() { + return _dictionary.getIntValue(_reader.getInt(_nextDocId++, _context)); + } + + @Override + public long nextLongVal() { + return _dictionary.getLongValue(_reader.getInt(_nextDocId++, _context)); + } + + @Override + public float nextFloatVal() { + return _dictionary.getFloatValue(_reader.getInt(_nextDocId++, _context)); + } + + @Override + public double nextDoubleVal() { + return _dictionary.getDoubleValue(_reader.getInt(_nextDocId++, _context)); + } + + @Override + public String nextStringVal() { + return _dictionary.getStringValue(_reader.getInt(_nextDocId++, _context)); } @Override - public int nextIntVal(int[] intArray) { - return _reader.getIntArray(_nextDocId++, intArray, _context); + public byte[] nextBytesVal() { + return _dictionary.getBytesValue(_reader.getInt(_nextDocId++, _context)); } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/MultiValueIterator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/MultiValueIterator.java index 85ae7c7..4529c84 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/MultiValueIterator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/MultiValueIterator.java @@ -43,6 +43,31 @@ public final class MultiValueIterator extends BlockMultiValIterator { } @Override + public int nextCharVal(char[] charArray) { + return _reader.getCharArray(_nextDocId++, charArray); + } + + @Override + public int nextDoubleVal(double[] doubleArray) { + return _reader.getDoubleArray(_nextDocId++, doubleArray); + } + + @Override + public int nextFloatVal(float[] floatArray) { + return _reader.getFloatArray(_nextDocId++, floatArray); + } + + @Override + public int nextLongVal(long[] longArray) { + return _reader.getLongArray(_nextDocId++, longArray); + } + + @Override + public int nextBytesArrayVal(byte[][] bytesArrays) { + return _reader.getBytesArray(_nextDocId++, bytesArrays); + } + + @Override public boolean hasNext() { return _nextDocId < _numDocs; } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java index cb5bbb3..1de0b1b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java @@ -54,6 +54,10 @@ public class FilterOperatorUtils { Predicate.Type predicateType = predicateEvaluator.getPredicateType(); + if (predicateType == Predicate.Type.RANGE && dataSource.getRangeIndex() != null) { + return new RangeFilterOperator(predicateEvaluator, dataSource, startDocId, endDocId); + } + if (predicateType == Predicate.Type.TEXT_MATCH) { return new TextMatchFilterOperator(predicateEvaluator, dataSource, startDocId, endDocId); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/RangeFilterOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/RangeFilterOperator.java new file mode 100644 index 0000000..6211fa9 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/RangeFilterOperator.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.filter; + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.List; +import org.apache.pinot.core.common.BlockDocIdIterator; +import org.apache.pinot.core.common.DataSource; +import org.apache.pinot.core.operator.blocks.FilterBlock; +import org.apache.pinot.core.operator.dociditerators.MVScanDocIdIterator; +import org.apache.pinot.core.operator.dociditerators.SVScanDocIdIterator; +import org.apache.pinot.core.operator.dociditerators.ScanBasedDocIdIterator; +import org.apache.pinot.core.operator.docidsets.BitmapDocIdSet; +import org.apache.pinot.core.operator.docidsets.FilterBlockDocIdSet; +import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator; +import org.apache.pinot.core.operator.filter.predicate.RangePredicateEvaluatorFactory; +import org.apache.pinot.core.segment.index.readers.BitmapRangeIndexReader; +import org.apache.pinot.core.segment.index.readers.InvertedIndexReader; +import org.roaringbitmap.RoaringBitmap; +import org.roaringbitmap.buffer.ImmutableRoaringBitmap; +import org.roaringbitmap.buffer.MutableRoaringBitmap; + + +public class RangeFilterOperator extends BaseFilterOperator { + private static final String OPERATOR_NAME = "RangeFilterOperator"; + + private final PredicateEvaluator _predicateEvaluator; + private final DataSource _dataSource; + private final int _startDocId; + // TODO: change it to exclusive + // Inclusive + private final int _endDocId; + private final boolean _exclusive; + + public RangeFilterOperator(PredicateEvaluator predicateEvaluator, DataSource dataSource, int startDocId, + int endDocId) { + // NOTE: + // Predicate that is always evaluated as true or false should not be passed into the BitmapBasedFilterOperator for + // performance concern. + // If predicate is always evaluated as true, use MatchAllFilterOperator; if predicate is always evaluated as false, + // use EmptyFilterOperator. + Preconditions.checkArgument(!predicateEvaluator.isAlwaysTrue() && !predicateEvaluator.isAlwaysFalse()); + + _predicateEvaluator = predicateEvaluator; + _dataSource = dataSource; + _startDocId = startDocId; + _endDocId = endDocId; + _exclusive = predicateEvaluator.isExclusive(); + } + + @Override + protected FilterBlock getNextBlock() { + + RangePredicateEvaluatorFactory.OfflineDictionaryBasedRangePredicateEvaluator evaluator = + (RangePredicateEvaluatorFactory.OfflineDictionaryBasedRangePredicateEvaluator) _predicateEvaluator; + BitmapRangeIndexReader rangeIndexReader = (BitmapRangeIndexReader) _dataSource.getRangeIndex(); + int startRangeId = rangeIndexReader.findRangeId(evaluator.getStartDictId()); + int endRangeId = rangeIndexReader.findRangeId(evaluator.getEndDictId()); + //Handle Matching Ranges - some ranges match fully but some partially + //below code assumes first and last range always match partially which may not be the case always //todo: optimize it + MutableRoaringBitmap mutableRoaringBitmap = new MutableRoaringBitmap(); + mutableRoaringBitmap.or(rangeIndexReader.getDocIds(startRangeId)); + mutableRoaringBitmap.or(rangeIndexReader.getDocIds(endRangeId)); + final ScanBasedFilterOperator scanBasedFilterOperator = + new ScanBasedFilterOperator(_predicateEvaluator, _dataSource, _startDocId, _endDocId); + FilterBlockDocIdSet scanBlockDocIdSet = scanBasedFilterOperator.getNextBlock().getBlockDocIdSet(); + BlockDocIdIterator iterator = scanBlockDocIdSet.iterator(); + + int id = 0; + List<ImmutableRoaringBitmap> bitmapList = new ArrayList<>(); + if (_dataSource.getDataSourceMetadata().isSingleValue()) { + bitmapList.add(((SVScanDocIdIterator) iterator).applyAnd(mutableRoaringBitmap)); + } else { + bitmapList.add(((MVScanDocIdIterator) iterator).applyAnd(mutableRoaringBitmap)); + } + + //All the intermediate + for (int rangeId = startRangeId + 1; rangeId < endRangeId; rangeId++) { + bitmapList.add(rangeIndexReader.getDocIds(rangeId)); + } + ImmutableRoaringBitmap[] bitmaps = new ImmutableRoaringBitmap[bitmapList.size()]; + bitmapList.toArray(bitmaps); + return new FilterBlock(new BitmapDocIdSet(bitmaps, _startDocId, _endDocId, _exclusive) { + + @Override + public long getNumEntriesScannedInFilter() { + return scanBlockDocIdSet.getNumEntriesScannedInFilter(); + } + }); + } + + @Override + public String getOperatorName() { + return OPERATOR_NAME; + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java index 0ddee87..2b1b29f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java @@ -117,7 +117,6 @@ public class FilterPlanNode implements PlanNode { TransformExpressionTree expression = filterQueryTree.getExpression(); if (expression.getExpressionType() == TransformExpressionTree.ExpressionType.FUNCTION) { - return new ExpressionFilterOperator(segment, expression, predicate); } else { DataSource dataSource = segment.getDataSource(filterQueryTree.getColumn()); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java index c3316cb..3dc60aa 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java @@ -36,6 +36,7 @@ public class V1Constants { public static final String RAW_SV_FORWARD_INDEX_FILE_EXTENSION = ".sv.raw.fwd"; public static final String UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION = ".mv.fwd"; public static final String BITMAP_INVERTED_INDEX_FILE_EXTENSION = ".bitmap.inv"; + public static final String BITMAP_RANGE_INDEX_FILE_EXTENSION = ".bitmap.range"; public static final String BLOOM_FILTER_FILE_EXTENSION = ".bloom"; public static final String NULLVALUE_VECTOR_FILE_EXTENSION = ".bitmap.nullvalue"; } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/DictionaryBasedRangeIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/DictionaryBasedRangeIndexCreator.java new file mode 100644 index 0000000..7ad2b21 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/DictionaryBasedRangeIndexCreator.java @@ -0,0 +1,416 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.segment.creator.impl.inv; + +import com.google.common.base.Preconditions; +import java.io.BufferedOutputStream; +import java.io.Closeable; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Random; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.core.segment.creator.InvertedIndexCreator; +import org.apache.pinot.core.segment.creator.impl.V1Constants; +import org.apache.pinot.core.segment.index.readers.BaseImmutableDictionary; +import org.apache.pinot.core.segment.memory.PinotDataBuffer; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.MetricFieldSpec; +import org.roaringbitmap.RoaringBitmap; +import org.roaringbitmap.buffer.ImmutableRoaringBitmap; +import org.roaringbitmap.buffer.MutableRoaringBitmap; + +import static org.apache.pinot.core.segment.creator.impl.V1Constants.Indexes.BITMAP_RANGE_INDEX_FILE_EXTENSION; + + +/** + * Implementation of {@link InvertedIndexCreator} that uses off-heap memory. + * <p>We use 2 passes to create the inverted index. + * <ul> + * + * <li> + * In the constructor compute the number of ranges needed. We can decide this based on the following + * - number of buckets based on dictionary (use the user provided config for number for buckets or number of dictIds per bucket or pick something a default) + * </li> + * <li> + * In the first pass (adding values phase), when add() method is called, store the dictIds into the forward index + * value buffer (for multi-valued column also store number of values for each docId into forward index length + * buffer). We also compute the inverted index length for each dictId while adding values. + * </li> + * <li> + * In the second pass (processing values phase), when seal() method is called, all the dictIds should already been + * added. We first reorder the values into the inverted index buffers by going over the dictIds in forward index + * value buffer (for multi-valued column we also need forward index length buffer to get the docId for each dictId). + * <p>Once we have the inverted index buffers, we simply go over them and create the bitmap for each dictId and + * serialize them into a file. + * </li> + * </ul> + * <p>Based on the number of values we need to store, we use direct memory or MMap file to allocate the buffer. + */ +public final class DictionaryBasedRangeIndexCreator implements InvertedIndexCreator { + // Use MMapBuffer if the value buffer size is larger than 2G + private static final int NUM_VALUES_THRESHOLD_FOR_MMAP_BUFFER = 500_000_000; + private static final int DEFAULT_NUM_RANGES = 20; + + private static final String FORWARD_INDEX_VALUE_BUFFER_SUFFIX = ".fwd.idx.val.buf"; + private static final String FORWARD_INDEX_LENGTH_BUFFER_SUFFIX = ".fwd.idx.len.buf"; + private static final String INVERTED_INDEX_VALUE_BUFFER_SUFFIX = ".inv.idx.val.buf"; + private static final String INVERTED_INDEX_LENGTH_BUFFER_SUFFIX = ".inv.idx.len.buf"; + + private final File _invertedIndexFile; + private final File _forwardIndexValueBufferFile; + private final File _forwardIndexLengthBufferFile; + private final File _invertedIndexValueBufferFile; + private final File _invertedIndexLengthBufferFile; + private final boolean _singleValue; + private final int _cardinality; + private final int _numDocs; + private final int _numValues; + private final boolean _useMMapBuffer; + + private final int[] _values; + + // Forward index buffers (from docId to dictId) + private int _nextDocId; + private PinotDataBuffer _forwardIndexValueBuffer; + // For multi-valued column only because each docId can have multiple dictIds + private int _nextValueId; + private PinotDataBuffer _forwardIndexLengthBuffer; + + // Inverted index buffers (from dictId to docId) + private PinotDataBuffer _invertedIndexValueBuffer; + private PinotDataBuffer _invertedIndexLengthBuffer; + private int _numRanges; + private int _numDocsPerRange; + + public DictionaryBasedRangeIndexCreator(File indexDir, FieldSpec fieldSpec, int cardinality, int numDocs, + int numValues) + throws IOException { + String columnName = fieldSpec.getName(); + _invertedIndexFile = new File(indexDir, columnName + BITMAP_RANGE_INDEX_FILE_EXTENSION); + _forwardIndexValueBufferFile = new File(indexDir, columnName + FORWARD_INDEX_VALUE_BUFFER_SUFFIX); + _forwardIndexLengthBufferFile = new File(indexDir, columnName + FORWARD_INDEX_LENGTH_BUFFER_SUFFIX); + _invertedIndexValueBufferFile = new File(indexDir, columnName + INVERTED_INDEX_VALUE_BUFFER_SUFFIX); + _invertedIndexLengthBufferFile = new File(indexDir, columnName + INVERTED_INDEX_LENGTH_BUFFER_SUFFIX); + _singleValue = fieldSpec.isSingleValueField(); + _cardinality = cardinality; + _numDocs = numDocs; + _numValues = _singleValue ? numDocs : numValues; + _useMMapBuffer = _numValues > NUM_VALUES_THRESHOLD_FOR_MMAP_BUFFER; + + try { + _values = new int[_numValues]; + _numRanges = DEFAULT_NUM_RANGES; + _numDocsPerRange = (int) Math.ceil(_numValues / _numRanges); + _forwardIndexValueBuffer = createTempBuffer((long) _numValues * Integer.BYTES, _forwardIndexValueBufferFile); + if (!_singleValue) { + _forwardIndexLengthBuffer = createTempBuffer((long) _numDocs * Integer.BYTES, _forwardIndexLengthBufferFile); + } + + // We need to clear the inverted index length buffer because we rely on the initial value of 0, and keep updating + // the value instead of directly setting the value + _invertedIndexLengthBuffer = + createTempBuffer((long) _cardinality * Integer.BYTES, _invertedIndexLengthBufferFile); + for (int i = 0; i < _cardinality; i++) { + _invertedIndexLengthBuffer.putInt((long) i * Integer.BYTES, 0); + } + } catch (Exception e) { + destroyBuffer(_forwardIndexValueBuffer, _forwardIndexValueBufferFile); + destroyBuffer(_forwardIndexLengthBuffer, _forwardIndexLengthBufferFile); + destroyBuffer(_invertedIndexLengthBuffer, _invertedIndexLengthBufferFile); + throw e; + } + } + + @Override + public void add(int dictId) { + putInt(_forwardIndexValueBuffer, _nextDocId, dictId); + putInt(_invertedIndexLengthBuffer, dictId, getInt(_invertedIndexLengthBuffer, dictId) + 1); + _values[_nextDocId] = dictId; + _nextDocId = _nextDocId + 1; + } + + @Override + public void add(int[] dictIds, int length) { + for (int i = 0; i < length; i++) { + int dictId = dictIds[i]; + putInt(_forwardIndexValueBuffer, _nextValueId, dictId); + putInt(_invertedIndexLengthBuffer, dictId, getInt(_invertedIndexLengthBuffer, dictId) + 1); + _values[_nextValueId] = dictId; + _nextValueId = _nextValueId + 1; + } + putInt(_forwardIndexLengthBuffer, _nextDocId++, length); + } + + @Override + public void addDoc(Object document, int docIdCounter) { + throw new IllegalStateException("Bitmap inverted index creator does not support Object type currently"); + } + + @Override + public void seal() + throws IOException { + //copy forward index, sort the forward index, create a copy + //go over the sorted value to compute ranges + Integer[] positions = new Integer[_values.length]; + for (int i = 0; i < _values.length; i++) { + positions[i] = i; + } + Arrays.sort(positions, Comparator.comparingInt(pos -> _values[pos])); + +// List<Integer> ranges = new ArrayList<>(); +// for (int i = 0; i < _values.length; i++) { +// if (i % _numDocsPerRange == 0) { +// ranges.add(_values[i]); +// } +// } + + List<Integer> rangeStartList = new ArrayList<>(); + int numDocsInCurrentRange = 0; + // Calculate value index for each dictId in the inverted index value buffer + // Re-use inverted index length buffer to store the value index for each dictId, where value index is the index in + // the inverted index value buffer where we should put next docId for the dictId + int invertedValueIndex = 0; + rangeStartList.add(0); + int prevEndDict = 0; + for (int dictId = 0; dictId < _cardinality; dictId++) { + int length = getInt(_invertedIndexLengthBuffer, dictId); + putInt(_invertedIndexLengthBuffer, dictId, invertedValueIndex); + invertedValueIndex += length; + if (prevEndDict == dictId || (numDocsInCurrentRange + length <= _numDocsPerRange)) { + numDocsInCurrentRange += length; + } else { + rangeStartList.add(dictId); + prevEndDict = dictId; + numDocsInCurrentRange = length; + } + } + + // Put values into inverted index value buffer + _invertedIndexValueBuffer = createTempBuffer((long) _numValues * Integer.BYTES, _invertedIndexValueBufferFile); + if (_singleValue) { + for (int docId = 0; docId < _numDocs; docId++) { + int dictId = getInt(_forwardIndexValueBuffer, docId); + int index = getInt(_invertedIndexLengthBuffer, dictId); + putInt(_invertedIndexValueBuffer, index, docId); + putInt(_invertedIndexLengthBuffer, dictId, index + 1); + } + + // Destroy buffer no longer needed + destroyBuffer(_forwardIndexValueBuffer, _forwardIndexValueBufferFile); + _forwardIndexValueBuffer = null; + } else { + int valueId = 0; + for (int docId = 0; docId < _numDocs; docId++) { + int length = getInt(_forwardIndexLengthBuffer, docId); + for (int i = 0; i < length; i++) { + int dictId = getInt(_forwardIndexValueBuffer, valueId++); + int index = getInt(_invertedIndexLengthBuffer, dictId); + putInt(_invertedIndexValueBuffer, index, docId); + putInt(_invertedIndexLengthBuffer, dictId, index + 1); + } + } + + // Destroy buffers no longer needed + destroyBuffer(_forwardIndexValueBuffer, _forwardIndexValueBufferFile); + _forwardIndexValueBuffer = null; + destroyBuffer(_forwardIndexLengthBuffer, _forwardIndexLengthBufferFile); + _forwardIndexLengthBuffer = null; + } + + // Create bitmaps from inverted index buffers and serialize them to file + //HEADER + //# OF RANGES + // Range Start 0 + // ......... + // Range Start R - 1 + // Bitmap for Range 0 Start Offset + // ..... + // Bitmap for Range R Start Offset + //BODY + // Bitmap for range 0 + // Bitmap for range 2 + // ...... + // Bitmap for range R - 1 + long length = 0; + try (BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(_invertedIndexFile)); + DataOutputStream header = new DataOutputStream(bos); + FileOutputStream fos = new FileOutputStream(_invertedIndexFile); + DataOutputStream dataOutputStream = new DataOutputStream(new BufferedOutputStream(fos))) { + + //Write the Range values + header.writeInt(rangeStartList.size()); +// System.out.println("rangeStartList = " + rangeStartList); + for (int rangeStart : rangeStartList) { + header.writeInt(rangeStart); + } + //compute the offset where the bitmap for the first range would be written + int bitmapOffset = + Integer.BYTES + (rangeStartList.size()) * Integer.BYTES + (rangeStartList.size() + 1) * Integer.BYTES; + length += bitmapOffset; + header.writeInt(bitmapOffset); +// System.out.println("bitmapOffset = " + bitmapOffset); + //set the starting position where the actual bitmaps will be written + fos.getChannel().position(bitmapOffset); + + int startIndex = 0; + int[] bitmapOffsets = new int[_numRanges]; + for (int i = 0; i < rangeStartList.size(); i++) { + int rangeStartDictId = rangeStartList.get(i); + int rangeEndDictId = (i == rangeStartList.size() - 1) ? _cardinality : rangeStartList.get(i + 1); + MutableRoaringBitmap bitmap = new MutableRoaringBitmap(); + for (int dictId = rangeStartDictId; dictId < rangeEndDictId; dictId++) { + int endIndex = getInt(_invertedIndexLengthBuffer, dictId); + for (int index = startIndex; index < endIndex; index++) { + bitmap.add(getInt(_invertedIndexValueBuffer, index)); + } + startIndex = endIndex; + } + // Write offset and bitmap into file + int sizeInBytes = bitmap.serializedSizeInBytes(); +// System.out.println("sizeInBytes = " + sizeInBytes); + bitmapOffset += sizeInBytes; + length += sizeInBytes; + // Check for int overflow + Preconditions.checkState(bitmapOffset > 0, "Inverted index file: %s exceeds 2GB limit", _invertedIndexFile); + + header.writeInt(bitmapOffset); + byte[] bytes = new byte[sizeInBytes]; + bitmap.serialize(ByteBuffer.wrap(bytes)); +// System.out.println("Arrays.toString(bytes) = " + Arrays.toString(bytes)); + dataOutputStream.write(bytes); + } +// System.out.println("bitmapOffsets = " + Arrays.toString(bitmapOffsets)); +// System.out.println("length = " + length); + } catch (Exception e) { + FileUtils.deleteQuietly(_invertedIndexFile); + throw e; + } +// System.out.println("_invertedIndexFile = " + _invertedIndexFile.length()); + } + + @Override + public void close() + throws IOException { + org.apache.pinot.common.utils.FileUtils + .close(new DataBufferAndFile(_forwardIndexValueBuffer, _forwardIndexValueBufferFile), + new DataBufferAndFile(_forwardIndexLengthBuffer, _forwardIndexLengthBufferFile), + new DataBufferAndFile(_invertedIndexValueBuffer, _invertedIndexValueBufferFile), + new DataBufferAndFile(_invertedIndexLengthBuffer, _invertedIndexLengthBufferFile)); + } + + private class DataBufferAndFile implements Closeable { + private final PinotDataBuffer _dataBuffer; + private final File _file; + + DataBufferAndFile(final PinotDataBuffer buffer, final File file) { + _dataBuffer = buffer; + _file = file; + } + + @Override + public void close() + throws IOException { + destroyBuffer(_dataBuffer, _file); + } + } + + private static void putInt(PinotDataBuffer buffer, long index, int value) { + buffer.putInt(index << 2, value); + } + + private static int getInt(PinotDataBuffer buffer, long index) { + return buffer.getInt(index << 2); + } + + private PinotDataBuffer createTempBuffer(long size, File mmapFile) + throws IOException { + if (_useMMapBuffer) { + return PinotDataBuffer.mapFile(mmapFile, false, 0, size, PinotDataBuffer.NATIVE_ORDER, + "OffHeapBitmapInvertedIndexCreator: temp buffer"); + } else { + return PinotDataBuffer.allocateDirect(size, PinotDataBuffer.NATIVE_ORDER, + "OffHeapBitmapInvertedIndexCreator: temp buffer for " + mmapFile.getName()); + } + } + + private void destroyBuffer(PinotDataBuffer buffer, File mmapFile) + throws IOException { + if (buffer != null) { + buffer.close(); + if (mmapFile.exists()) { + FileUtils.forceDelete(mmapFile); + } + } + } + + public static void main(String[] args) + throws IOException { + File indexDir = new File("/tmp/testRangeIndex"); + indexDir.mkdirs(); + FieldSpec fieldSpec = new MetricFieldSpec(); + fieldSpec.setDataType(FieldSpec.DataType.INT); + String columnName = "latency"; + fieldSpec.setName(columnName); + int cardinality = 1000; + int numDocs = 10000; + int numValues = 10000; + DictionaryBasedRangeIndexCreator creator = + new DictionaryBasedRangeIndexCreator(indexDir, fieldSpec, cardinality, numDocs, numValues); + Random r = new Random(); + for (int i = 0; i < numDocs; i++) { + creator.add(r.nextInt(cardinality)); + } + creator.seal(); + + File file = new File(indexDir, columnName + BITMAP_RANGE_INDEX_FILE_EXTENSION); + DataInputStream dis = new DataInputStream(new FileInputStream(file)); + int numRanges = dis.readInt(); + System.out.println("numRanges = " + numRanges); + int[] rangeStart = new int[numRanges]; + for (int i = 0; i < numRanges; i++) { + rangeStart[i] = dis.readInt(); + } + System.out.println("Arrays.toString(rangeStart) = " + Arrays.toString(rangeStart)); + int[] rangeBitmapOffsets = new int[numRanges + 1]; + for (int i = 0; i <= numRanges; i++) { + rangeBitmapOffsets[i] = dis.readInt(); + } + System.out.println("Arrays.toString(rangeBitmapOffsets) = " + Arrays.toString(rangeBitmapOffsets)); + ImmutableRoaringBitmap[] bitmaps = new ImmutableRoaringBitmap[numRanges]; + for (int i = 0; i < numRanges; i++) { + long serializedBitmapLength; + serializedBitmapLength = rangeBitmapOffsets[i + 1] - rangeBitmapOffsets[i]; + System.out.println("serializedBitmapLength = " + serializedBitmapLength); + byte[] bytes = new byte[(int) serializedBitmapLength]; + dis.read(bytes, 0, (int) serializedBitmapLength); + System.out.println("bytes = " + Arrays.toString(bytes)); + bitmaps[i] = new ImmutableRoaringBitmap(ByteBuffer.wrap(bytes)); + System.out.println(bitmaps[i]); + } + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/ColumnIndexContainer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/ColumnIndexContainer.java index 531c3fc..ab9fbfc 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/ColumnIndexContainer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/ColumnIndexContainer.java @@ -41,6 +41,11 @@ public interface ColumnIndexContainer { InvertedIndexReader getInvertedIndex(); /** + * Returns the inverted index for the column, or {@code null} if it does not exist. + */ + InvertedIndexReader getRangeIndex(); + + /** * Returns the dictionary for the column, or {@code null} if it does not exist. */ Dictionary getDictionary(); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java index 76ff19e..8bf151c 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java @@ -32,6 +32,7 @@ import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.core.segment.index.metadata.ColumnMetadata; import org.apache.pinot.core.segment.index.readers.BaseImmutableDictionary; import org.apache.pinot.core.segment.index.readers.BitmapInvertedIndexReader; +import org.apache.pinot.core.segment.index.readers.BitmapRangeIndexReader; import org.apache.pinot.core.segment.index.readers.BloomFilterReader; import org.apache.pinot.core.segment.index.readers.BytesDictionary; import org.apache.pinot.core.segment.index.readers.DoubleDictionary; @@ -60,6 +61,7 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer private final DataFileReader _forwardIndex; private final InvertedIndexReader _invertedIndex; + private final InvertedIndexReader _rangeIndex; private final BaseImmutableDictionary _dictionary; private final BloomFilterReader _bloomFilterReader; private final NullValueVectorReaderImpl _nullValueVectorReader; @@ -69,11 +71,13 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer throws IOException { String columnName = metadata.getColumnName(); boolean loadInvertedIndex = false; + boolean loadRangeIndex = false; boolean loadTextIndex = false; boolean loadOnHeapDictionary = false; boolean loadBloomFilter = false; if (indexLoadingConfig != null) { loadInvertedIndex = indexLoadingConfig.getInvertedIndexColumns().contains(columnName); + loadRangeIndex = indexLoadingConfig.getRangeIndexColumns().contains(columnName); loadOnHeapDictionary = indexLoadingConfig.getOnHeapDictionaryColumns().contains(columnName); loadBloomFilter = indexLoadingConfig.getBloomFilterColumns().contains(columnName); loadTextIndex = indexLoadingConfig.getTextIndexColumns().contains(columnName); @@ -107,6 +111,7 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer new SortedIndexReaderImpl(fwdIndexBuffer, metadata.getCardinality()); _forwardIndex = sortedIndexReader; _invertedIndex = sortedIndexReader; + _rangeIndex = null; return; } else { // Unsorted @@ -123,14 +128,21 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer _invertedIndex = new BitmapInvertedIndexReader(segmentReader.getIndexFor(columnName, ColumnIndexType.INVERTED_INDEX), metadata.getCardinality()); + _rangeIndex = null; + } else if (loadRangeIndex) { + _invertedIndex = null; + _rangeIndex = + new BitmapRangeIndexReader(segmentReader.getIndexFor(columnName, ColumnIndexType.RANGE_INDEX)); } else { _invertedIndex = null; + _rangeIndex = null; } } else { // Raw index _forwardIndex = loadRawForwardIndex(fwdIndexBuffer, metadata.getDataType()); _dictionary = null; _bloomFilterReader = null; + _rangeIndex = null; if (loadTextIndex) { _invertedIndex = new LuceneTextIndexReader(columnName, segmentIndexDir, metadata.getTotalDocs()); } else { @@ -150,6 +162,11 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer } @Override + public InvertedIndexReader getRangeIndex() { + return _rangeIndex; + } + + @Override public BaseImmutableDictionary getDictionary() { return _dictionary; } @@ -164,7 +181,8 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer return _nullValueVectorReader; } - private static BaseImmutableDictionary loadDictionary(PinotDataBuffer dictionaryBuffer, ColumnMetadata metadata, + //TODO: move this to a DictionaryLoader class + public static BaseImmutableDictionary loadDictionary(PinotDataBuffer dictionaryBuffer, ColumnMetadata metadata, boolean loadOnHeap) { FieldSpec.DataType dataType = metadata.getDataType(); if (loadOnHeap) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/BaseDataSource.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/BaseDataSource.java index 4fc99cb..fc32487 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/BaseDataSource.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/BaseDataSource.java @@ -39,17 +39,20 @@ public abstract class BaseDataSource extends DataSource { private final DataFileReader _forwardIndex; private final Dictionary _dictionary; private final InvertedIndexReader _invertedIndex; + private final InvertedIndexReader _rangeIndex; private final BloomFilterReader _bloomFilter; private final NullValueVectorReader _nullValueVector; private final String _operatorName; public BaseDataSource(DataSourceMetadata dataSourceMetadata, DataFileReader forwardIndex, @Nullable Dictionary dictionary, @Nullable InvertedIndexReader invertedIndex, - @Nullable BloomFilterReader bloomFilter, @Nullable NullValueVectorReader nullValueVector, String operatorName) { + @Nullable InvertedIndexReader rangeIndex, @Nullable BloomFilterReader bloomFilter, + @Nullable NullValueVectorReader nullValueVector, String operatorName) { _dataSourceMetadata = dataSourceMetadata; _forwardIndex = forwardIndex; _dictionary = dictionary; _invertedIndex = invertedIndex; + _rangeIndex = rangeIndex; _bloomFilter = bloomFilter; _nullValueVector = nullValueVector; _operatorName = operatorName; @@ -79,6 +82,12 @@ public abstract class BaseDataSource extends DataSource { @Nullable @Override + public InvertedIndexReader getRangeIndex() { + return _rangeIndex; + } + + @Nullable + @Override public BloomFilterReader getBloomFilter() { return _bloomFilter; } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/ImmutableDataSource.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/ImmutableDataSource.java index a7009a8..b4b0f5a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/ImmutableDataSource.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/ImmutableDataSource.java @@ -35,7 +35,7 @@ public class ImmutableDataSource extends BaseDataSource { public ImmutableDataSource(ColumnMetadata columnMetadata, ColumnIndexContainer columnIndexContainer) { super(new ImmutableDataSourceMetadata(columnMetadata), columnIndexContainer.getForwardIndex(), - columnIndexContainer.getDictionary(), columnIndexContainer.getInvertedIndex(), + columnIndexContainer.getDictionary(), columnIndexContainer.getInvertedIndex(), columnIndexContainer.getRangeIndex(), columnIndexContainer.getBloomFilter(), columnIndexContainer.getNullValueVector(), OPERATOR_NAME_PREFIX + columnMetadata.getColumnName()); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/MutableDataSource.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/MutableDataSource.java index cf8e8bb..70610da 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/MutableDataSource.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/MutableDataSource.java @@ -40,10 +40,10 @@ public class MutableDataSource extends BaseDataSource { public MutableDataSource(FieldSpec fieldSpec, int numDocs, int numValues, int maxNumValuesPerMVEntry, @Nullable PartitionFunction partitionFunction, int partitionId, DataFileReader forwardIndex, - @Nullable Dictionary dictionary, @Nullable InvertedIndexReader invertedIndex, + @Nullable Dictionary dictionary, @Nullable InvertedIndexReader invertedIndex, @Nullable InvertedIndexReader rangeIndex, @Nullable BloomFilterReader bloomFilter, @Nullable NullValueVectorReader nullValueVector) { super(new MutableDataSourceMetadata(fieldSpec, numDocs, numValues, maxNumValuesPerMVEntry, partitionFunction, - partitionId), forwardIndex, dictionary, invertedIndex, bloomFilter, nullValueVector, + partitionId), forwardIndex, dictionary, invertedIndex, rangeIndex, bloomFilter, nullValueVector, OPERATOR_NAME_PREFIX + fieldSpec.getName()); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java index c3a4076..3db11dc 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java @@ -46,6 +46,7 @@ public class IndexLoadingConfig { private List<String> _sortedColumns = Collections.emptyList(); private Set<String> _invertedIndexColumns = new HashSet<>(); private Set<String> _textIndexColumns = new HashSet<>(); + private Set<String> _rangeIndexColumns = new HashSet<>(); private Set<String> _noDictionaryColumns = new HashSet<>(); // TODO: replace this by _noDictionaryConfig. private Map<String, String> _noDictionaryConfig = new HashMap<>(); private Set<String> _varLengthDictionaryColumns = new HashSet<>(); @@ -83,6 +84,11 @@ public class IndexLoadingConfig { _invertedIndexColumns.addAll(invertedIndexColumns); } + List<String> rangeIndexColumns = indexingConfig.getRangeIndexColumns(); + if (rangeIndexColumns != null) { + _rangeIndexColumns.addAll(rangeIndexColumns); + } + List<String> bloomFilterColumns = indexingConfig.getBloomFilterColumns(); if (bloomFilterColumns != null) { _bloomFilterColumns.addAll(bloomFilterColumns); @@ -194,6 +200,10 @@ public class IndexLoadingConfig { @Nonnull public Set<String> getInvertedIndexColumns() { return _invertedIndexColumns; + } @Nonnull + + public Set<String> getRangeIndexColumns() { + return _rangeIndexColumns; } /** @@ -220,6 +230,14 @@ public class IndexLoadingConfig { } /** + * For tests only. + */ + @VisibleForTesting + public void setRangeIndexColumns(@Nonnull Set<String> rangeIndexColumns) { + _rangeIndexColumns = rangeIndexColumns; + } + + /** * Used directly from text search unit test code since the test code * doesn't really have a table config and is directly testing the * query execution code of text search using data from generated segments diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessor.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessor.java index 564321b..89432ab 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessor.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessor.java @@ -31,6 +31,7 @@ import org.apache.pinot.core.segment.index.loader.columnminmaxvalue.ColumnMinMax import org.apache.pinot.core.segment.index.loader.defaultcolumn.DefaultColumnHandler; import org.apache.pinot.core.segment.index.loader.defaultcolumn.DefaultColumnHandlerFactory; import org.apache.pinot.core.segment.index.loader.invertedindex.InvertedIndexHandler; +import org.apache.pinot.core.segment.index.loader.invertedindex.RangeIndexHandler; import org.apache.pinot.core.segment.index.loader.invertedindex.TextIndexHandler; import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl; import org.apache.pinot.core.segment.store.SegmentDirectory; @@ -99,6 +100,11 @@ public class SegmentPreProcessor implements AutoCloseable { new InvertedIndexHandler(_indexDir, _segmentMetadata, _indexLoadingConfig, segmentWriter); invertedIndexHandler.createInvertedIndices(); + // Create column inverted indices according to the index config. + RangeIndexHandler rangeIndexHandler = + new RangeIndexHandler(_indexDir, _segmentMetadata, _indexLoadingConfig, segmentWriter); + rangeIndexHandler.createRangeIndices(); + Set<String> textIndexColumns = _indexLoadingConfig.getTextIndexColumns(); if (textIndexColumns.size() > 0) { TextIndexHandler textIndexHandler = diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/RangeIndexHandler.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/RangeIndexHandler.java new file mode 100644 index 0000000..7b2cb1c --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/RangeIndexHandler.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.segment.index.loader.invertedindex; + +import java.io.File; +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; +import javax.annotation.Nonnull; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.core.indexsegment.generator.SegmentVersion; +import org.apache.pinot.core.io.reader.DataFileReader; +import org.apache.pinot.core.io.reader.SingleColumnMultiValueReader; +import org.apache.pinot.core.io.reader.impl.v1.FixedBitMultiValueReader; +import org.apache.pinot.core.io.reader.impl.v1.FixedBitSingleValueReader; +import org.apache.pinot.core.segment.creator.impl.V1Constants; +import org.apache.pinot.core.segment.creator.impl.inv.DictionaryBasedRangeIndexCreator; +import org.apache.pinot.core.segment.creator.impl.inv.OffHeapBitmapInvertedIndexCreator; +import org.apache.pinot.core.segment.index.column.PhysicalColumnIndexContainer; +import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.core.segment.index.loader.LoaderUtils; +import org.apache.pinot.core.segment.index.metadata.ColumnMetadata; +import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl; +import org.apache.pinot.core.segment.index.readers.BaseImmutableDictionary; +import org.apache.pinot.core.segment.memory.PinotDataBuffer; +import org.apache.pinot.core.segment.store.ColumnIndexType; +import org.apache.pinot.core.segment.store.SegmentDirectory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class RangeIndexHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(RangeIndexHandler.class); + + private final File _indexDir; + private final SegmentDirectory.Writer _segmentWriter; + private final String _segmentName; + private final SegmentVersion _segmentVersion; + private final Set<ColumnMetadata> _rangeIndexColumns = new HashSet<>(); + + public RangeIndexHandler(@Nonnull File indexDir, @Nonnull SegmentMetadataImpl segmentMetadata, + @Nonnull IndexLoadingConfig indexLoadingConfig, @Nonnull SegmentDirectory.Writer segmentWriter) { + _indexDir = indexDir; + _segmentWriter = segmentWriter; + _segmentName = segmentMetadata.getName(); + _segmentVersion = SegmentVersion.valueOf(segmentMetadata.getVersion()); + + // Do not create inverted index for sorted column + for (String column : indexLoadingConfig.getRangeIndexColumns()) { + ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(column); + if (columnMetadata != null && !columnMetadata.isSorted()) { + _rangeIndexColumns.add(columnMetadata); + } + } + } + + public void createRangeIndices() + throws IOException { + for (ColumnMetadata columnMetadata : _rangeIndexColumns) { + createRangeIndexForColumn(columnMetadata); + } + } + + private void createRangeIndexForColumn(ColumnMetadata columnMetadata) + throws IOException { + String column = columnMetadata.getColumnName(); + + File inProgress = new File(_indexDir, column + ".range.inprogress"); + File rangeIndexFile = new File(_indexDir, column + V1Constants.Indexes.BITMAP_RANGE_INDEX_FILE_EXTENSION); + + if (!inProgress.exists()) { + // Marker file does not exist, which means last run ended normally. + + if (_segmentWriter.hasIndexFor(column, ColumnIndexType.RANGE_INDEX)) { + // Skip creating range index if already exists. + + LOGGER.info("Found Range index for segment: {}, column: {}", _segmentName, column); + return; + } + + // Create a marker file. + FileUtils.touch(inProgress); + } else { + // Marker file exists, which means last run gets interrupted. + // Remove inverted index if exists. + // For v1 and v2, it's the actual inverted index. For v3, it's the temporary inverted index. + FileUtils.deleteQuietly(rangeIndexFile); + } + + // Create new inverted index for the column. + LOGGER.info("Creating new range index for segment: {}, column: {}", _segmentName, column); + int numDocs = columnMetadata.getTotalDocs(); + + if (columnMetadata.hasDictionary()) { + PinotDataBuffer dictBuffer = + _segmentWriter.getIndexFor(columnMetadata.getColumnName(), ColumnIndexType.DICTIONARY); + BaseImmutableDictionary dictionary = + PhysicalColumnIndexContainer.loadDictionary(dictBuffer, columnMetadata, false); + handleDictionaryBasedColumn(dictionary, columnMetadata, numDocs); + } else { + + } + + // For v3, write the generated inverted index file into the single file and remove it. + if (_segmentVersion == SegmentVersion.v3) { + LoaderUtils.writeIndexToV3Format(_segmentWriter, column, rangeIndexFile, ColumnIndexType.RANGE_INDEX); + } + + // Delete the marker file. + FileUtils.deleteQuietly(inProgress); + + LOGGER.info("Created inverted index for segment: {}, column: {}", _segmentName, column); + } + + private void handleDictionaryBasedColumn(BaseImmutableDictionary dictionary, ColumnMetadata columnMetadata, int numDocs) + throws IOException { + try (DictionaryBasedRangeIndexCreator creator = new DictionaryBasedRangeIndexCreator(_indexDir, + columnMetadata.getFieldSpec(), columnMetadata.getCardinality(), numDocs, + columnMetadata.getTotalNumberOfEntries())) { + try (DataFileReader fwdIndex = getForwardIndexReader(columnMetadata, _segmentWriter)) { + if (columnMetadata.isSingleValue()) { + // Single-value column. + FixedBitSingleValueReader svFwdIndex = (FixedBitSingleValueReader) fwdIndex; + for (int i = 0; i < numDocs; i++) { + creator.add(svFwdIndex.getInt(i)); + } + } else { + // Multi-value column. + SingleColumnMultiValueReader mvFwdIndex = (SingleColumnMultiValueReader) fwdIndex; + int[] dictIds = new int[columnMetadata.getMaxNumberOfMultiValues()]; + for (int i = 0; i < numDocs; i++) { + int length = mvFwdIndex.getIntArray(i, dictIds); + creator.add(dictIds, length); + } + } + creator.seal(); + } + } + } + + private DataFileReader getForwardIndexReader(ColumnMetadata columnMetadata, SegmentDirectory.Writer segmentWriter) + throws IOException { + PinotDataBuffer buffer = segmentWriter.getIndexFor(columnMetadata.getColumnName(), ColumnIndexType.FORWARD_INDEX); + int numRows = columnMetadata.getTotalDocs(); + int numBitsPerValue = columnMetadata.getBitsPerElement(); + if (columnMetadata.isSingleValue()) { + return new FixedBitSingleValueReader(buffer, numRows, numBitsPerValue); + } else { + return new FixedBitMultiValueReader(buffer, numRows, columnMetadata.getTotalNumberOfEntries(), numBitsPerValue); + } + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadata.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadata.java index 8ee589a..d01406c 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadata.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadata.java @@ -96,6 +96,8 @@ public interface SegmentMetadata { String getBitmapInvertedIndexFileName(String column); + String getBitmapRangeIndexFileName(String column); + String getBloomFilterFileName(String column); String getNullValueVectorFileName(String column); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadataImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadataImpl.java index 5b58f9e..3bbdcdd 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadataImpl.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadataImpl.java @@ -452,6 +452,11 @@ public class SegmentMetadataImpl implements SegmentMetadata { } @Override + public String getBitmapRangeIndexFileName(String column) { + return column + V1Constants.Indexes.BITMAP_RANGE_INDEX_FILE_EXTENSION; + } + + @Override public String getBloomFilterFileName(String column) { return column + V1Constants.Indexes.BLOOM_FILTER_FILE_EXTENSION; } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BitmapRangeIndexReader.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BitmapRangeIndexReader.java new file mode 100644 index 0000000..b22004e --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BitmapRangeIndexReader.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.segment.index.readers; + +import com.google.common.base.Preconditions; +import java.io.IOException; +import java.lang.ref.SoftReference; +import java.nio.ByteBuffer; +import java.util.Arrays; +import org.apache.pinot.core.segment.memory.PinotDataBuffer; +import org.roaringbitmap.buffer.ImmutableRoaringBitmap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class BitmapRangeIndexReader implements InvertedIndexReader<ImmutableRoaringBitmap> { + public static final Logger LOGGER = LoggerFactory.getLogger(BitmapRangeIndexReader.class); + + private final PinotDataBuffer _buffer; + private final int _numRanges; + final int _bitmapIndexOffset; + private final int[] _rangeStartArray; + + private volatile SoftReference<SoftReference<ImmutableRoaringBitmap>[]> _bitmaps = null; + + /** + * Constructs an inverted index with the specified size. + * @param indexDataBuffer data buffer for the inverted index. + */ + public BitmapRangeIndexReader(PinotDataBuffer indexDataBuffer) { + _buffer = indexDataBuffer; + _numRanges = _buffer.getInt(0); + _rangeStartArray = new int[_numRanges]; + _bitmapIndexOffset = Integer.BYTES + _numRanges * Integer.BYTES; + final int lastOffset = _buffer.getInt(_numRanges * Integer.BYTES + (_numRanges + 1) * Integer.BYTES); + + Preconditions.checkState(lastOffset == _buffer.size(), + "The last offset should be equal to buffer size! Current lastOffset: " + lastOffset + ", buffer size: " + + _buffer.size()); + for (int i = 0; i < _numRanges; i++) { + _rangeStartArray[i] = _buffer.getInt(Integer.BYTES + i * Integer.BYTES); + } + System.out.println("_rangeStartArray = " + _rangeStartArray); + } + + /** + * {@inheritDoc} + */ + @Override + public ImmutableRoaringBitmap getDocIds(int rangeId) { + SoftReference<ImmutableRoaringBitmap>[] bitmapArrayReference = null; + // Return the bitmap if it's still on heap + if (_bitmaps != null) { + bitmapArrayReference = _bitmaps.get(); + if (bitmapArrayReference != null) { + SoftReference<ImmutableRoaringBitmap> bitmapReference = bitmapArrayReference[rangeId]; + if (bitmapReference != null) { + ImmutableRoaringBitmap value = bitmapReference.get(); + if (value != null) { + return value; + } + } + } else { + bitmapArrayReference = new SoftReference[_numRanges]; + _bitmaps = new SoftReference<SoftReference<ImmutableRoaringBitmap>[]>(bitmapArrayReference); + } + } else { + bitmapArrayReference = new SoftReference[_numRanges]; + _bitmaps = new SoftReference<SoftReference<ImmutableRoaringBitmap>[]>(bitmapArrayReference); + } + synchronized (this) { + ImmutableRoaringBitmap value; + if (bitmapArrayReference[rangeId] == null || bitmapArrayReference[rangeId].get() == null) { + value = buildRoaringBitmapForIndex(rangeId); + bitmapArrayReference[rangeId] = new SoftReference<ImmutableRoaringBitmap>(value); + } else { + value = bitmapArrayReference[rangeId].get(); + } + return value; + } + } + + @Override + public ImmutableRoaringBitmap getDocIds(Object value) { + // This should not be called from anywhere. If it happens, there is a bug + // and that's why we throw illegal state exception + throw new IllegalStateException("bitmap inverted index reader supports lookup only on dictionary id"); + } + + private synchronized ImmutableRoaringBitmap buildRoaringBitmapForIndex(final int rangeId) { + final int currentOffset = getOffset(rangeId); + final int nextOffset = getOffset(rangeId + 1); + final int bufferLength = nextOffset - currentOffset; + + // Slice the buffer appropriately for Roaring Bitmap + ByteBuffer bb = _buffer.toDirectByteBuffer(currentOffset, bufferLength); + return new ImmutableRoaringBitmap(bb); + } + + private int getOffset(final int rangeId) { + return _buffer.getInt(_bitmapIndexOffset + rangeId * Integer.BYTES); + } + + @Override + public void close() + throws IOException { + _buffer.close(); + } + + public int findRangeId(int dictId) { + for (int i = 0; i < _rangeStartArray.length - 1; i++) { + if (dictId >= _rangeStartArray[i] && dictId < _rangeStartArray[i + 1]) { + return i; + } + } + return _rangeStartArray.length - 1; + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/ColumnIndexDirectory.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/ColumnIndexDirectory.java index 65322b5..6b212d2 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/ColumnIndexDirectory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/ColumnIndexDirectory.java @@ -98,6 +98,15 @@ abstract class ColumnIndexDirectory implements Closeable { throws IOException; /** + * Get inverted index data buffer for a column + * @param column column name + * @return in-memory ByteBuffer like buffer for data + * @throws IOException + */ + public abstract PinotDataBuffer getRangeIndexBufferFor(String column) + throws IOException; + + /** * Get inverted bloom filter buffer for a column * @param column column name * @return in-memory ByteBuffer like buffer for data @@ -152,6 +161,16 @@ abstract class ColumnIndexDirectory implements Closeable { * @return in-memory ByteBuffer like buffer for data * @throws IOException */ + public abstract PinotDataBuffer newRangeIndexBuffer(String column, long sizeBytes) + throws IOException; + + /** + * Allocate a new data buffer of specified sizeBytes in the columnar index directory + * @param column column name + * @param sizeBytes sizeBytes for the buffer allocation + * @return in-memory ByteBuffer like buffer for data + * @throws IOException + */ public abstract PinotDataBuffer newBloomFilterBuffer(String column, long sizeBytes) throws IOException; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/ColumnIndexType.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/ColumnIndexType.java index bdb3b6a..dcd21df 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/ColumnIndexType.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/ColumnIndexType.java @@ -24,7 +24,8 @@ public enum ColumnIndexType { INVERTED_INDEX("inverted_index"), BLOOM_FILTER("bloom_filter"), NULLVALUE_VECTOR("nullvalue_vector"), - TEXT_INDEX("text_index"); + TEXT_INDEX("text_index"), + RANGE_INDEX("range_index"); private final String indexName; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/FilePerIndexDirectory.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/FilePerIndexDirectory.java index 9a2ef40..8f1299e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/FilePerIndexDirectory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/FilePerIndexDirectory.java @@ -85,6 +85,20 @@ class FilePerIndexDirectory extends ColumnIndexDirectory { } @Override + public PinotDataBuffer getRangeIndexBufferFor(String column) + throws IOException { + IndexKey key = new IndexKey(column, ColumnIndexType.RANGE_INDEX); + return getReadBufferFor(key); + } + + @Override + public PinotDataBuffer newRangeIndexBuffer(String column, long sizeBytes) + throws IOException { + IndexKey key = new IndexKey(column, ColumnIndexType.RANGE_INDEX); + return getWriteBufferFor(key, sizeBytes); + } + + @Override public PinotDataBuffer getBloomFilterBufferFor(String column) throws IOException { IndexKey key = new IndexKey(column, ColumnIndexType.BLOOM_FILTER); @@ -174,6 +188,9 @@ class FilePerIndexDirectory extends ColumnIndexDirectory { case INVERTED_INDEX: filename = metadata.getBitmapInvertedIndexFileName(column); break; + case RANGE_INDEX: + filename = metadata.getBitmapRangeIndexFileName(column); + break; case BLOOM_FILTER: filename = metadata.getBloomFilterFileName(column); break; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/SegmentLocalFSDirectory.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/SegmentLocalFSDirectory.java index 9e61e2e..c985d37 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/SegmentLocalFSDirectory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/SegmentLocalFSDirectory.java @@ -234,6 +234,9 @@ class SegmentLocalFSDirectory extends SegmentDirectory { case INVERTED_INDEX: buffer = columnIndexDirectory.getInvertedIndexBufferFor(column); break; + case RANGE_INDEX: + buffer = columnIndexDirectory.getRangeIndexBufferFor(column); + break; case BLOOM_FILTER: buffer = columnIndexDirectory.getBloomFilterBufferFor(column); break; @@ -355,6 +358,8 @@ class SegmentLocalFSDirectory extends SegmentDirectory { return columnIndexDirectory.newForwardIndexBuffer(key.name, sizeBytes); case INVERTED_INDEX: return columnIndexDirectory.newInvertedIndexBuffer(key.name, sizeBytes); + case RANGE_INDEX: + return columnIndexDirectory.newRangeIndexBuffer(key.name, sizeBytes); case BLOOM_FILTER: return columnIndexDirectory.newBloomFilterBuffer(key.name, sizeBytes); case NULLVALUE_VECTOR: diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/SingleFileIndexDirectory.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/SingleFileIndexDirectory.java index cc0ebcc..2ba8725 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/SingleFileIndexDirectory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/SingleFileIndexDirectory.java @@ -111,6 +111,12 @@ class SingleFileIndexDirectory extends ColumnIndexDirectory { } @Override + public PinotDataBuffer getRangeIndexBufferFor(String column) + throws IOException { + return checkAndGetIndexBuffer(column, ColumnIndexType.RANGE_INDEX); + } + + @Override public PinotDataBuffer getBloomFilterBufferFor(String column) throws IOException { return checkAndGetIndexBuffer(column, ColumnIndexType.BLOOM_FILTER); @@ -166,6 +172,12 @@ class SingleFileIndexDirectory extends ColumnIndexDirectory { } @Override + public PinotDataBuffer newRangeIndexBuffer(String column, long sizeBytes) + throws IOException { + return allocNewBufferInternal(column, ColumnIndexType.RANGE_INDEX, sizeBytes, "range_index.create"); + } + + @Override public PinotDataBuffer newBloomFilterBuffer(String column, long sizeBytes) throws IOException { return allocNewBufferInternal(column, ColumnIndexType.BLOOM_FILTER, sizeBytes, "bloom_filter.create"); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/VirtualColumnIndexContainer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/VirtualColumnIndexContainer.java index de2af42..c91500d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/VirtualColumnIndexContainer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/VirtualColumnIndexContainer.java @@ -52,6 +52,11 @@ public class VirtualColumnIndexContainer implements ColumnIndexContainer { } @Override + public InvertedIndexReader getRangeIndex() { + return null; + } + + @Override public Dictionary getDictionary() { return _dictionary; } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/startree/v2/store/StarTreeDataSource.java b/pinot-core/src/main/java/org/apache/pinot/core/startree/v2/store/StarTreeDataSource.java index da13528..0e6e077 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/startree/v2/store/StarTreeDataSource.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/v2/store/StarTreeDataSource.java @@ -25,6 +25,7 @@ import org.apache.pinot.core.data.partition.PartitionFunction; import org.apache.pinot.core.io.reader.SingleColumnSingleValueReader; import org.apache.pinot.core.segment.index.datasource.BaseDataSource; import org.apache.pinot.core.segment.index.readers.Dictionary; +import org.apache.pinot.core.segment.index.readers.InvertedIndexReader; import org.apache.pinot.spi.data.FieldSpec; @@ -33,7 +34,7 @@ public class StarTreeDataSource extends BaseDataSource { public StarTreeDataSource(FieldSpec fieldSpec, int numDocs, SingleColumnSingleValueReader forwardIndex, @Nullable Dictionary dictionary) { - super(new StarTreeDataSourceMetadata(fieldSpec, numDocs), forwardIndex, dictionary, null, null, null, + super(new StarTreeDataSourceMetadata(fieldSpec, numDocs), forwardIndex, dictionary, null, null,null, null, OPERATOR_NAME_PREFIX + fieldSpec.getName()); } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/RealtimeNoDictionaryTest.java b/pinot-core/src/test/java/org/apache/pinot/core/common/RealtimeNoDictionaryTest.java index 9e85b24..a0a5b95 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/common/RealtimeNoDictionaryTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/common/RealtimeNoDictionaryTest.java @@ -110,17 +110,17 @@ public class RealtimeNoDictionaryTest { Map<String, DataSource> dataSourceBlock = new HashMap<>(); dataSourceBlock.put(INT_COL_NAME, - new MutableDataSource(intSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, intRawIndex, null, null, null, null)); + new MutableDataSource(intSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, intRawIndex, null, null, null,null, null)); dataSourceBlock.put(LONG_COL_NAME, - new MutableDataSource(longSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, longRawIndex, null, null, null, null)); + new MutableDataSource(longSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, longRawIndex, null, null, null,null, null)); dataSourceBlock.put(FLOAT_COL_NAME, - new MutableDataSource(floatSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, floatRawIndex, null, null, null, null)); + new MutableDataSource(floatSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, floatRawIndex, null, null, null,null, null)); dataSourceBlock.put(DOUBLE_COL_NAME, - new MutableDataSource(doubleSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, doubleRawIndex, null, null, null, null)); + new MutableDataSource(doubleSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, doubleRawIndex, null, null, null,null, null)); dataSourceBlock.put(STRING_COL_NAME, - new MutableDataSource(stringSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, stringRawIndex, null, null, null, null)); + new MutableDataSource(stringSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, stringRawIndex, null, null, null,null, null)); dataSourceBlock.put(BYTES_COL_NAME, - new MutableDataSource(bytesSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, bytesRawIndex, null, null, null, null)); + new MutableDataSource(bytesSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, bytesRawIndex, null, null, null,null, null)); return new DataFetcher(dataSourceBlock); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/IndexingConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/IndexingConfig.java index e5dd595..732d36e 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/IndexingConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/IndexingConfig.java @@ -25,6 +25,7 @@ import javax.annotation.Nullable; public class IndexingConfig extends BaseJsonConfig { private List<String> _invertedIndexColumns; + private List<String> _rangeIndexColumns; private boolean _autoGeneratedInvertedIndex; private boolean _createInvertedIndexDuringSegmentGeneration; private List<String> _sortedColumn; @@ -57,6 +58,14 @@ public class IndexingConfig extends BaseJsonConfig { _invertedIndexColumns = invertedIndexColumns; } + public List<String> getRangeIndexColumns() { + return _rangeIndexColumns; + } + + public void setRangeIndexColumns(List<String> rangeIndexColumns) { + _rangeIndexColumns = rangeIndexColumns; + } + public boolean isAutoGeneratedInvertedIndex() { return _autoGeneratedInvertedIndex; } diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/scan/query/RangePredicateFilter.java b/pinot-tools/src/main/java/org/apache/pinot/tools/scan/query/RangePredicateFilter.java index f93ee87..226aef4 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/scan/query/RangePredicateFilter.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/scan/query/RangePredicateFilter.java @@ -80,4 +80,12 @@ public class RangePredicateFilter implements PredicateFilter { } return false; } + + public int getStartIndex() { + return _startIndex; + } + + public int getEndIndex() { + return _endIndex; + } } diff --git a/pinot-tools/src/main/resources/examples/batch/baseballStats/baseballStats_offline_table_config.json b/pinot-tools/src/main/resources/examples/batch/baseballStats/baseballStats_offline_table_config.json index 168c698..88f816b 100644 --- a/pinot-tools/src/main/resources/examples/batch/baseballStats/baseballStats_offline_table_config.json +++ b/pinot-tools/src/main/resources/examples/batch/baseballStats/baseballStats_offline_table_config.json @@ -14,6 +14,9 @@ "invertedIndexColumns": [ "playerID", "teamID" + ], + "rangeIndexColumns": [ + "hits" ] }, "metadata": { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org