This is an automated email from the ASF dual-hosted git repository. kishoreg pushed a commit to branch h3-index in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 5b13496533012a77efe9553a72db9cd753019b75 Author: kishoreg <g.kish...@gmail.com> AuthorDate: Sat Dec 19 09:08:32 2020 -0800 Wiring H3 Index for query processing --- .../org/apache/pinot/core/common/DataSource.java | 9 + .../operator/filter/H3IndexFilterOperator.java | 65 +++++++ .../org/apache/pinot/core/plan/FilterPlanNode.java | 17 +- .../request/context/predicate/GeoPredicate.java | 22 +++ .../core/segment/creator/impl/V1Constants.java | 1 + .../segment/index/column/ColumnIndexContainer.java | 6 + .../index/column/PhysicalColumnIndexContainer.java | 15 ++ .../segment/index/datasource/BaseDataSource.java | 11 +- .../index/datasource/ImmutableDataSource.java | 2 +- .../index/datasource/MutableDataSource.java | 7 +- .../segment/index/loader/IndexLoadingConfig.java | 10 + .../segment/index/loader/SegmentPreProcessor.java | 6 + .../index/loader/invertedindex/H3IndexHandler.java | 203 +++++++++++++++++++++ .../index/readers/geospatial/H3IndexReader.java | 1 - .../pinot/core/segment/store/ColumnIndexType.java | 3 +- .../virtualcolumn/VirtualColumnIndexContainer.java | 6 + .../core/startree/v2/store/StarTreeDataSource.java | 3 +- .../pinot/spi/config/table/IndexingConfig.java | 9 + 18 files changed, 385 insertions(+), 11 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/DataSource.java b/pinot-core/src/main/java/org/apache/pinot/core/common/DataSource.java index 75f0513..97d3609 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/DataSource.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/DataSource.java @@ -25,6 +25,8 @@ import org.apache.pinot.core.segment.index.readers.ForwardIndexReader; import org.apache.pinot.core.segment.index.readers.InvertedIndexReader; import org.apache.pinot.core.segment.index.readers.NullValueVectorReader; import org.apache.pinot.core.segment.index.readers.TextIndexReader; +import org.apache.pinot.core.segment.index.readers.geospatial.H3IndexReader; + /** * The {@code DataSource} contains all the indexes and metadata for a column for query execution purpose. @@ -61,6 +63,13 @@ public interface DataSource { InvertedIndexReader<?> getRangeIndex(); /** + * Returns the range index for the column if exists, or {@code null} if not. + * <p>TODO: Have a separate interface for range index. + */ + @Nullable + H3IndexReader getH3Index(); + + /** * Returns the text index for the column if exists, or {@code null} if not. */ @Nullable diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/H3IndexFilterOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/H3IndexFilterOperator.java new file mode 100644 index 0000000..98cdf64 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/H3IndexFilterOperator.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.filter; + +import org.apache.pinot.core.common.DataSource; +import org.apache.pinot.core.operator.blocks.FilterBlock; +import org.apache.pinot.core.operator.docidsets.BitmapDocIdSet; +import org.apache.pinot.core.query.request.context.predicate.GeoPredicate; +import org.apache.pinot.core.segment.index.readers.geospatial.H3IndexReader; +import org.roaringbitmap.buffer.ImmutableRoaringBitmap; + + +public class H3IndexFilterOperator extends BaseFilterOperator { + private static final String OPERATOR_NAME = "H3IndexFilterOperator"; + + // NOTE: Range index can only apply to dictionary-encoded columns for now + // TODO: Support raw index columns + private final GeoPredicate _geoPredicate; + private final DataSource _dataSource; + private final int _numDocs; + + public H3IndexFilterOperator(GeoPredicate geoPredicate, DataSource dataSource, int numDocs) { + _geoPredicate = geoPredicate; + _dataSource = dataSource; + _numDocs = numDocs; + } + + @Override + protected FilterBlock getNextBlock() { + H3IndexReader h3IndexReader = (H3IndexReader) _dataSource.getRangeIndex(); + assert h3IndexReader != null; + + long h3Id = 1000; + ImmutableRoaringBitmap docIds = h3IndexReader.getDocIds(h3Id); + return new FilterBlock(new BitmapDocIdSet(docIds, _numDocs) { + + // Override this method to reflect the entries scanned + @Override + public long getNumEntriesScannedInFilter() { + return 0; //TODO:Return the one from ScanBased + } + }); + } + + @Override + public String getOperatorName() { + return OPERATOR_NAME; + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java index 7e076e0..f31fc77 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java @@ -30,13 +30,16 @@ import org.apache.pinot.core.operator.filter.BitmapBasedFilterOperator; import org.apache.pinot.core.operator.filter.EmptyFilterOperator; import org.apache.pinot.core.operator.filter.ExpressionFilterOperator; import org.apache.pinot.core.operator.filter.FilterOperatorUtils; +import org.apache.pinot.core.operator.filter.H3IndexFilterOperator; import org.apache.pinot.core.operator.filter.MatchAllFilterOperator; import org.apache.pinot.core.operator.filter.TextMatchFilterOperator; import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator; import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluatorProvider; import org.apache.pinot.core.query.request.context.ExpressionContext; import org.apache.pinot.core.query.request.context.FilterContext; +import org.apache.pinot.core.query.request.context.FunctionContext; import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.request.context.predicate.GeoPredicate; import org.apache.pinot.core.query.request.context.predicate.Predicate; import org.apache.pinot.core.query.request.context.predicate.TextMatchPredicate; import org.apache.pinot.core.segment.index.readers.NullValueVectorReader; @@ -120,9 +123,17 @@ public class FilterPlanNode implements PlanNode { Predicate predicate = filter.getPredicate(); ExpressionContext lhs = predicate.getLhs(); if (lhs.getType() == ExpressionContext.Type.FUNCTION) { - // TODO: ExpressionFilterOperator does not support predicate types without PredicateEvaluator (IS_NULL, - // IS_NOT_NULL, TEXT_MATCH) - return new ExpressionFilterOperator(_indexSegment, predicate, _numDocs); + FunctionContext function = lhs.getFunction(); + if (function.getFunctionName().equalsIgnoreCase("H3_WITHIN")) { + String columnName = function.getArguments().get(0).getIdentifier(); + GeoPredicate geoPredicate = new GeoPredicate(); + //set geo predicate + return new H3IndexFilterOperator(geoPredicate, _indexSegment.getDataSource(columnName), _numDocs); + } else { + // TODO: ExpressionFilterOperator does not support predicate types without PredicateEvaluator (IS_NULL, + // IS_NOT_NULL, TEXT_MATCH) + return new ExpressionFilterOperator(_indexSegment, predicate, _numDocs); + } } else { DataSource dataSource = _indexSegment.getDataSource(lhs.getIdentifier()); switch (predicate.getType()) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/GeoPredicate.java b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/GeoPredicate.java new file mode 100644 index 0000000..8f51ae5 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/GeoPredicate.java @@ -0,0 +1,22 @@ +package org.apache.pinot.core.query.request.context.predicate; + +import org.apache.pinot.core.query.request.context.ExpressionContext; +import org.locationtech.jts.geom.Geometry; + + +//TODO: Make this flexible +public class GeoPredicate { + + //this is the column name + ExpressionContext _lhs; + + Type type; + + Geometry _geometry; + + double _distance; + + enum Type { + WITHIN, OVERLAP; + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java index 0a09394..362bdbb 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java @@ -37,6 +37,7 @@ public class V1Constants { public static final String UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION = ".mv.fwd"; public static final String BITMAP_INVERTED_INDEX_FILE_EXTENSION = ".bitmap.inv"; public static final String BITMAP_RANGE_INDEX_FILE_EXTENSION = ".bitmap.range"; + public static final String BITMAP_H3_INDEX_FILE_EXTENSION = ".bitmap.h3"; public static final String BLOOM_FILTER_FILE_EXTENSION = ".bloom"; public static final String NULLVALUE_VECTOR_FILE_EXTENSION = ".bitmap.nullvalue"; } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/ColumnIndexContainer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/ColumnIndexContainer.java index 087f4b2..f8af539 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/ColumnIndexContainer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/ColumnIndexContainer.java @@ -25,6 +25,7 @@ import org.apache.pinot.core.segment.index.readers.ForwardIndexReader; import org.apache.pinot.core.segment.index.readers.InvertedIndexReader; import org.apache.pinot.core.segment.index.readers.NullValueVectorReaderImpl; import org.apache.pinot.core.segment.index.readers.TextIndexReader; +import org.apache.pinot.core.segment.index.readers.geospatial.H3IndexReader; /** @@ -48,6 +49,11 @@ public interface ColumnIndexContainer extends Closeable { InvertedIndexReader<?> getRangeIndex(); /** + * Returns the range index for the column, or {@code null} if it does not exist. + */ + H3IndexReader getH3Index(); + + /** * Returns the text index for the column, or {@code null} if it does not exist. */ TextIndexReader getTextIndex(); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java index 1fe955c..e88d0a8 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java @@ -49,6 +49,7 @@ import org.apache.pinot.core.segment.index.readers.forward.FixedBitMVForwardInde import org.apache.pinot.core.segment.index.readers.forward.FixedBitSVForwardIndexReaderV2; import org.apache.pinot.core.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader; import org.apache.pinot.core.segment.index.readers.forward.VarByteChunkSVForwardIndexReader; +import org.apache.pinot.core.segment.index.readers.geospatial.H3IndexReader; import org.apache.pinot.core.segment.index.readers.sorted.SortedIndexReaderImpl; import org.apache.pinot.core.segment.index.readers.text.LuceneTextIndexReader; import org.apache.pinot.core.segment.memory.PinotDataBuffer; @@ -66,6 +67,7 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer private final ForwardIndexReader<?> _forwardIndex; private final InvertedIndexReader<?> _invertedIndex; private final InvertedIndexReader<?> _rangeIndex; + private final H3IndexReader _h3Index; private final TextIndexReader _textIndex; private final BaseImmutableDictionary _dictionary; private final BloomFilterReader _bloomFilter; @@ -77,6 +79,7 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer String columnName = metadata.getColumnName(); boolean loadInvertedIndex = indexLoadingConfig.getInvertedIndexColumns().contains(columnName); boolean loadRangeIndex = indexLoadingConfig.getRangeIndexColumns().contains(columnName); + boolean loadH3Index = indexLoadingConfig.getH3IndexColumns().contains(columnName); boolean loadTextIndex = indexLoadingConfig.getTextIndexColumns().contains(columnName); boolean loadOnHeapDictionary = indexLoadingConfig.getOnHeapDictionaryColumns().contains(columnName); BloomFilterConfig bloomFilterConfig = indexLoadingConfig.getBloomFilterConfigs().get(columnName); @@ -97,6 +100,11 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer _textIndex = null; } + if (loadH3Index) { + _h3Index = new H3IndexReader(segmentReader.getIndexFor(columnName, ColumnIndexType.H3_INDEX)); + } else { + _h3Index = null; + } PinotDataBuffer fwdIndexBuffer = segmentReader.getIndexFor(columnName, ColumnIndexType.FORWARD_INDEX); if (metadata.hasDictionary()) { @@ -142,6 +150,7 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer } else { _rangeIndex = null; } + } else { // Raw index _forwardIndex = loadRawForwardIndex(fwdIndexBuffer, metadata.getDataType()); @@ -150,6 +159,7 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer _rangeIndex = null; _invertedIndex = null; } + } @Override @@ -168,6 +178,11 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer } @Override + public H3IndexReader getH3Index() { + return _h3Index; + } + + @Override public TextIndexReader getTextIndex() { return _textIndex; } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/BaseDataSource.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/BaseDataSource.java index 172e8e6..cf4b449 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/BaseDataSource.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/BaseDataSource.java @@ -27,6 +27,7 @@ import org.apache.pinot.core.segment.index.readers.ForwardIndexReader; import org.apache.pinot.core.segment.index.readers.InvertedIndexReader; import org.apache.pinot.core.segment.index.readers.NullValueVectorReader; import org.apache.pinot.core.segment.index.readers.TextIndexReader; +import org.apache.pinot.core.segment.index.readers.geospatial.H3IndexReader; public abstract class BaseDataSource implements DataSource { @@ -35,19 +36,21 @@ public abstract class BaseDataSource implements DataSource { private final Dictionary _dictionary; private final InvertedIndexReader<?> _invertedIndex; private final InvertedIndexReader<?> _rangeIndex; + private final H3IndexReader _h3Index; private final TextIndexReader _textIndex; private final BloomFilterReader _bloomFilter; private final NullValueVectorReader _nullValueVector; public BaseDataSource(DataSourceMetadata dataSourceMetadata, ForwardIndexReader<?> forwardIndex, @Nullable Dictionary dictionary, @Nullable InvertedIndexReader<?> invertedIndex, - @Nullable InvertedIndexReader<?> rangeIndex, @Nullable TextIndexReader textIndex, + @Nullable InvertedIndexReader<?> rangeIndex, @Nullable H3IndexReader h3Index, @Nullable TextIndexReader textIndex, @Nullable BloomFilterReader bloomFilter, @Nullable NullValueVectorReader nullValueVector) { _dataSourceMetadata = dataSourceMetadata; _forwardIndex = forwardIndex; _dictionary = dictionary; _invertedIndex = invertedIndex; _rangeIndex = rangeIndex; + _h3Index = h3Index; _textIndex = textIndex; _bloomFilter = bloomFilter; _nullValueVector = nullValueVector; @@ -83,6 +86,12 @@ public abstract class BaseDataSource implements DataSource { @Nullable @Override + public H3IndexReader getH3Index() { + return _h3Index; + } + + @Nullable + @Override public TextIndexReader getTextIndex() { return _textIndex; } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/ImmutableDataSource.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/ImmutableDataSource.java index cc09adf..bd24dae 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/ImmutableDataSource.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/ImmutableDataSource.java @@ -35,7 +35,7 @@ public class ImmutableDataSource extends BaseDataSource { public ImmutableDataSource(ColumnMetadata columnMetadata, ColumnIndexContainer columnIndexContainer) { super(new ImmutableDataSourceMetadata(columnMetadata), columnIndexContainer.getForwardIndex(), columnIndexContainer.getDictionary(), columnIndexContainer.getInvertedIndex(), - columnIndexContainer.getRangeIndex(), columnIndexContainer.getTextIndex(), + columnIndexContainer.getRangeIndex(), columnIndexContainer.getH3Index(), columnIndexContainer.getTextIndex(), columnIndexContainer.getBloomFilter(), columnIndexContainer.getNullValueVector()); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/MutableDataSource.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/MutableDataSource.java index a927353..e433ff5 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/MutableDataSource.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/MutableDataSource.java @@ -28,6 +28,7 @@ import org.apache.pinot.core.segment.index.readers.ForwardIndexReader; import org.apache.pinot.core.segment.index.readers.InvertedIndexReader; import org.apache.pinot.core.segment.index.readers.NullValueVectorReader; import org.apache.pinot.core.segment.index.readers.TextIndexReader; +import org.apache.pinot.core.segment.index.readers.geospatial.H3IndexReader; import org.apache.pinot.spi.data.FieldSpec; @@ -41,11 +42,11 @@ public class MutableDataSource extends BaseDataSource { @Nullable PartitionFunction partitionFunction, @Nullable Set<Integer> partitions, @Nullable Comparable minValue, @Nullable Comparable maxValue, ForwardIndexReader forwardIndex, @Nullable Dictionary dictionary, @Nullable InvertedIndexReader invertedIndex, @Nullable InvertedIndexReader rangeIndex, - @Nullable TextIndexReader textIndex, @Nullable BloomFilterReader bloomFilter, + @Nullable H3IndexReader h3Index, @Nullable TextIndexReader textIndex, @Nullable BloomFilterReader bloomFilter, @Nullable NullValueVectorReader nullValueVector) { super(new MutableDataSourceMetadata(fieldSpec, numDocs, numValues, maxNumValuesPerMVEntry, partitionFunction, - partitions, minValue, maxValue), forwardIndex, dictionary, invertedIndex, rangeIndex, textIndex, bloomFilter, - nullValueVector); + partitions, minValue, maxValue), forwardIndex, dictionary, invertedIndex, rangeIndex, h3Index, textIndex, + bloomFilter, nullValueVector); } private static class MutableDataSourceMetadata implements DataSourceMetadata { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java index 04d91d1..62dd06a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java @@ -49,6 +49,7 @@ public class IndexLoadingConfig { private Set<String> _invertedIndexColumns = new HashSet<>(); private Set<String> _textIndexColumns = new HashSet<>(); private Set<String> _rangeIndexColumns = new HashSet<>(); + private Set<String> _h3IndexColumns = new HashSet<>(); private Set<String> _noDictionaryColumns = new HashSet<>(); // TODO: replace this by _noDictionaryConfig. private Map<String, String> _noDictionaryConfig = new HashMap<>(); private Set<String> _varLengthDictionaryColumns = new HashSet<>(); @@ -97,6 +98,11 @@ public class IndexLoadingConfig { _rangeIndexColumns.addAll(rangeIndexColumns); } + List<String> h3IndexColumns = indexingConfig.getH3IndexColumns(); + if (h3IndexColumns != null) { + _h3IndexColumns.addAll(h3IndexColumns); + } + List<String> bloomFilterColumns = indexingConfig.getBloomFilterColumns(); if (bloomFilterColumns != null) { for (String bloomFilterColumn : bloomFilterColumns) { @@ -226,6 +232,10 @@ public class IndexLoadingConfig { return _rangeIndexColumns; } + public Set<String> getH3IndexColumns() { + return _h3IndexColumns; + } + public Map<String, Map<String, String>> getColumnProperties() { return _columnProperties; } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessor.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessor.java index ca7f7e9..8a17eb6 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessor.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessor.java @@ -30,6 +30,7 @@ import org.apache.pinot.core.segment.index.loader.columnminmaxvalue.ColumnMinMax import org.apache.pinot.core.segment.index.loader.columnminmaxvalue.ColumnMinMaxValueGeneratorMode; import org.apache.pinot.core.segment.index.loader.defaultcolumn.DefaultColumnHandler; import org.apache.pinot.core.segment.index.loader.defaultcolumn.DefaultColumnHandlerFactory; +import org.apache.pinot.core.segment.index.loader.invertedindex.H3IndexHandler; import org.apache.pinot.core.segment.index.loader.invertedindex.InvertedIndexHandler; import org.apache.pinot.core.segment.index.loader.invertedindex.RangeIndexHandler; import org.apache.pinot.core.segment.index.loader.invertedindex.TextIndexHandler; @@ -113,6 +114,11 @@ public class SegmentPreProcessor implements AutoCloseable { new RangeIndexHandler(_indexDir, _segmentMetadata, _indexLoadingConfig, segmentWriter); rangeIndexHandler.createRangeIndices(); + // Create column H3 indices according to the index config. + H3IndexHandler h3IndexHandler = + new H3IndexHandler(_indexDir, _segmentMetadata, _indexLoadingConfig, segmentWriter); + h3IndexHandler.createH3Indices(); + Set<String> textIndexColumns = _indexLoadingConfig.getTextIndexColumns(); if (textIndexColumns.size() > 0) { TextIndexHandler textIndexHandler = diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/H3IndexHandler.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/H3IndexHandler.java new file mode 100644 index 0000000..e8bcbec --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/H3IndexHandler.java @@ -0,0 +1,203 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.segment.index.loader.invertedindex; + +import java.io.File; +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.core.geospatial.serde.GeometrySerializer; +import org.apache.pinot.core.indexsegment.generator.SegmentVersion; +import org.apache.pinot.core.segment.creator.impl.V1Constants; +import org.apache.pinot.core.segment.creator.impl.geospatial.H3IndexCreator; +import org.apache.pinot.core.segment.creator.impl.inv.RangeIndexCreator; +import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.core.segment.index.loader.LoaderUtils; +import org.apache.pinot.core.segment.index.metadata.ColumnMetadata; +import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl; +import org.apache.pinot.core.segment.index.readers.ForwardIndexReader; +import org.apache.pinot.core.segment.index.readers.ForwardIndexReaderContext; +import org.apache.pinot.core.segment.index.readers.forward.FixedBitMVForwardIndexReader; +import org.apache.pinot.core.segment.index.readers.forward.FixedBitSVForwardIndexReaderV2; +import org.apache.pinot.core.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader; +import org.apache.pinot.core.segment.memory.PinotDataBuffer; +import org.apache.pinot.core.segment.store.ColumnIndexType; +import org.apache.pinot.core.segment.store.SegmentDirectory; +import org.apache.pinot.spi.data.FieldSpec; +import org.locationtech.jts.geom.Coordinate; +import org.locationtech.jts.geom.Geometry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +@SuppressWarnings({"rawtypes", "unchecked"}) +public class H3IndexHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(H3IndexHandler.class); + + private final File _indexDir; + private final SegmentDirectory.Writer _segmentWriter; + private final String _segmentName; + private final SegmentVersion _segmentVersion; + private final Set<ColumnMetadata> _h3IndexColumns = new HashSet<>(); + + public H3IndexHandler(File indexDir, SegmentMetadataImpl segmentMetadata, IndexLoadingConfig indexLoadingConfig, + SegmentDirectory.Writer segmentWriter) { + _indexDir = indexDir; + _segmentWriter = segmentWriter; + _segmentName = segmentMetadata.getName(); + _segmentVersion = SegmentVersion.valueOf(segmentMetadata.getVersion()); + + // Only create H3 index on non-dictionary-encoded columns + for (String column : indexLoadingConfig.getH3IndexColumns()) { + ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(column); + if (columnMetadata != null && !columnMetadata.hasDictionary()) { + _h3IndexColumns.add(columnMetadata); + } + } + } + + public void createH3Indices() + throws Exception { + for (ColumnMetadata columnMetadata : _h3IndexColumns) { + createH3IndexForColumn(columnMetadata); + } + } + + private void createH3IndexForColumn(ColumnMetadata columnMetadata) + throws Exception { + String column = columnMetadata.getColumnName(); + File inProgress = new File(_indexDir, column + ".h3.inprogress"); + File h3IndexFile = new File(_indexDir, column + V1Constants.Indexes.BITMAP_H3_INDEX_FILE_EXTENSION); + + if (!inProgress.exists()) { + // Marker file does not exist, which means last run ended normally. + + if (_segmentWriter.hasIndexFor(column, ColumnIndexType.H3_INDEX)) { + // Skip creating range index if already exists. + + LOGGER.info("Found h3 index for segment: {}, column: {}", _segmentName, column); + return; + } + + // Create a marker file. + FileUtils.touch(inProgress); + } else { + // Marker file exists, which means last run gets interrupted. + // Remove range index if exists. + // For v1 and v2, it's the actual range index. For v3, it's the temporary range index. + FileUtils.deleteQuietly(h3IndexFile); + } + + // Create new range index for the column. + LOGGER.info("Creating new h3 index for segment: {}, column: {}", _segmentName, column); + if (columnMetadata.hasDictionary()) { +// handleDictionaryBasedColumn(columnMetadata); + } else { + handleNonDictionaryBasedColumn(columnMetadata); + } + + // For v3, write the generated range index file into the single file and remove it. + if (_segmentVersion == SegmentVersion.v3) { + LoaderUtils.writeIndexToV3Format(_segmentWriter, column, h3IndexFile, ColumnIndexType.H3_INDEX); + } + + // Delete the marker file. + FileUtils.deleteQuietly(inProgress); + + LOGGER.info("Created range index for segment: {}, column: {}", _segmentName, column); + } + + //TODO: add later + private void handleDictionaryBasedColumn(ColumnMetadata columnMetadata) + throws IOException { +// int numDocs = columnMetadata.getTotalDocs(); +// try (ForwardIndexReader forwardIndexReader = getForwardIndexReader(columnMetadata, _segmentWriter); +// ForwardIndexReaderContext readerContext = forwardIndexReader.createContext(); +// H3IndexCreator h3IndexCreator = new H3IndexCreator(_indexDir, columnMetadata.getFieldSpec(), 5)) { +// if (columnMetadata.isSingleValue()) { +// // Single-value column +// for (int i = 0; i < numDocs; i++) { +// forwardIndexReader.getDictId(i, readerContext); +//// h3IndexCreator.add(); +// } +// } else { +// // Multi-value column +//// int[] dictIds = new int[columnMetadata.getMaxNumberOfMultiValues()]; +//// for (int i = 0; i < numDocs; i++) { +//// int length = forwardIndexReader.getDictIdMV(i, dictIds, readerContext); +//// rangeIndexCreator.add(dictIds, length); +//// } +// } +// h3IndexCreator.seal(); +// } + } + + private void handleNonDictionaryBasedColumn(ColumnMetadata columnMetadata) + throws Exception { + int numDocs = columnMetadata.getTotalDocs(); + try (ForwardIndexReader forwardIndexReader = getForwardIndexReader(columnMetadata, _segmentWriter); + ForwardIndexReaderContext readerContext = forwardIndexReader.createContext(); + H3IndexCreator h3IndexCreator = new H3IndexCreator(_indexDir, columnMetadata.getFieldSpec(), 5)) { + if (columnMetadata.isSingleValue()) { + // Single-value column. + switch (columnMetadata.getDataType()) { + case BYTES: + for (int i = 0; i < numDocs; i++) { + byte[] bytes = forwardIndexReader.getBytes(i, readerContext); + Geometry geometry = GeometrySerializer.deserialize(bytes); + Coordinate coordinate = geometry.getCoordinate(); + h3IndexCreator.add(i, coordinate.x, coordinate.y); + } + break; + default: + throw new IllegalStateException("Unsupported data type: " + columnMetadata.getDataType()); + } + } else { + // Multi-value column + //TODO + throw new IllegalStateException( + "H3 indexing is not supported for Multivalue column : " + columnMetadata.getDataType()); + } + h3IndexCreator.seal(); + } + } + + private ForwardIndexReader<?> getForwardIndexReader(ColumnMetadata columnMetadata, + SegmentDirectory.Writer segmentWriter) + throws IOException { + PinotDataBuffer buffer = segmentWriter.getIndexFor(columnMetadata.getColumnName(), ColumnIndexType.FORWARD_INDEX); + int numRows = columnMetadata.getTotalDocs(); + int numBitsPerValue = columnMetadata.getBitsPerElement(); + if (columnMetadata.isSingleValue()) { + if (columnMetadata.hasDictionary()) { + return new FixedBitSVForwardIndexReaderV2(buffer, numRows, numBitsPerValue); + } else { + return new FixedByteChunkSVForwardIndexReader(buffer, columnMetadata.getDataType()); + } + } else { + if (columnMetadata.hasDictionary()) { + return new FixedBitMVForwardIndexReader(buffer, numRows, columnMetadata.getTotalNumberOfEntries(), + numBitsPerValue); + } else { + throw new IllegalStateException("Raw index on multi-value column is not supported"); + } + } + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/geospatial/H3IndexReader.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/geospatial/H3IndexReader.java index 96fa586..b8a6a6a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/geospatial/H3IndexReader.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/geospatial/H3IndexReader.java @@ -78,7 +78,6 @@ public class H3IndexReader implements Closeable { } } - //todo: fix this private synchronized ImmutableRoaringBitmap buildRoaringBitmapForIndex(final int index) { int currentOffset = getOffset(index); int bufferLength; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/ColumnIndexType.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/ColumnIndexType.java index dcd21df..7cb285f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/ColumnIndexType.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/ColumnIndexType.java @@ -25,7 +25,8 @@ public enum ColumnIndexType { BLOOM_FILTER("bloom_filter"), NULLVALUE_VECTOR("nullvalue_vector"), TEXT_INDEX("text_index"), - RANGE_INDEX("range_index"); + RANGE_INDEX("range_index"), + H3_INDEX("h3_index"); private final String indexName; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/VirtualColumnIndexContainer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/VirtualColumnIndexContainer.java index dc67771..3750bfb 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/VirtualColumnIndexContainer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/VirtualColumnIndexContainer.java @@ -26,6 +26,7 @@ import org.apache.pinot.core.segment.index.readers.ForwardIndexReader; import org.apache.pinot.core.segment.index.readers.InvertedIndexReader; import org.apache.pinot.core.segment.index.readers.NullValueVectorReaderImpl; import org.apache.pinot.core.segment.index.readers.TextIndexReader; +import org.apache.pinot.core.segment.index.readers.geospatial.H3IndexReader; /** @@ -79,6 +80,11 @@ public class VirtualColumnIndexContainer implements ColumnIndexContainer { } @Override + public H3IndexReader getH3Index() { + return null; + } + + @Override public void close() throws IOException { _forwardIndex.close(); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/startree/v2/store/StarTreeDataSource.java b/pinot-core/src/main/java/org/apache/pinot/core/startree/v2/store/StarTreeDataSource.java index c24fb04..868cb47 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/startree/v2/store/StarTreeDataSource.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/v2/store/StarTreeDataSource.java @@ -32,7 +32,8 @@ public class StarTreeDataSource extends BaseDataSource { public StarTreeDataSource(FieldSpec fieldSpec, int numDocs, ForwardIndexReader<?> forwardIndex, @Nullable Dictionary dictionary) { - super(new StarTreeDataSourceMetadata(fieldSpec, numDocs), forwardIndex, dictionary, null, null, null, null, null); + super(new StarTreeDataSourceMetadata(fieldSpec, numDocs), forwardIndex, dictionary, null, null, null, null, null, + null); } private static final class StarTreeDataSourceMetadata implements DataSourceMetadata { diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java index 184d00a..92fc97f 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java @@ -28,6 +28,7 @@ import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; public class IndexingConfig extends BaseJsonConfig { private List<String> _invertedIndexColumns; private List<String> _rangeIndexColumns; + private List<String> _h3IndexColumns; private boolean _autoGeneratedInvertedIndex; private boolean _createInvertedIndexDuringSegmentGeneration; private List<String> _sortedColumn; @@ -65,6 +66,14 @@ public class IndexingConfig extends BaseJsonConfig { _invertedIndexColumns = invertedIndexColumns; } + public List<String> getH3IndexColumns() { + return _h3IndexColumns; + } + + public void setH3IndexColumns(List<String> h3IndexColumns) { + _h3IndexColumns = h3IndexColumns; + } + public List<String> getRangeIndexColumns() { return _rangeIndexColumns; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org