This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new f9ab252 make index creator provision pluggable (#7885)
f9ab252 is described below
commit f9ab252980e4f973d60b9db2a0f5e7d5764bdaf2
Author: Richard Startin <[email protected]>
AuthorDate: Thu Dec 16 20:21:48 2021 +0000
make index creator provision pluggable (#7885)
This allows index creation to be intercepted, so that the current static
logic in SegmentIndexCreator can be extended or overridden. This is achieved by
introducing a new interface IndexCreatorProvider which provides various new
index creators from an IndexCreationContext which bundles all information about
index creation. External users can register a decorator which can enhance or
entirely replace the default index creator provision logic. Typically, a
registered decorator should pa [...]
---
.../pinot/core/minion/RawIndexConverter.java | 19 +-
.../org/apache/pinot/perf/BenchmarkRangeIndex.java | 11 +-
.../ConvertToRawIndexTaskExecutor.java | 4 +-
pinot-segment-local/pom.xml | 3 +-
.../creator/impl/DefaultIndexCreatorProvider.java | 274 ++++++++++++
.../creator/impl/SegmentColumnarIndexCreator.java | 290 +++----------
.../impl/inv/BitSlicedRangeIndexCreator.java | 55 ++-
.../segment/index/loader/IndexHandlerFactory.java | 20 +-
.../loader/bloomfilter/BloomFilterHandler.java | 12 +-
.../loader/invertedindex/FSTIndexHandler.java | 17 +-
.../index/loader/invertedindex/H3IndexHandler.java | 12 +-
.../loader/invertedindex/InvertedIndexHandler.java | 14 +-
.../loader/invertedindex/JsonIndexHandler.java | 16 +-
.../loader/invertedindex/RangeIndexHandler.java | 25 +-
.../loader/invertedindex/TextIndexHandler.java | 10 +-
.../creator/impl/IndexCreatorOverrideTest.java | 88 ++++
.../index/creator/BitSlicedIndexCreatorTest.java | 20 +-
pinot-segment-spi/pom.xml | 6 +
.../spi/creator/BloomFilterCreatorProvider.java | 36 ++
.../spi/creator/ForwardIndexCreatorProvider.java | 35 ++
.../creator/GeoSpatialIndexCreatorProvider.java | 37 ++
.../segment/spi/creator/IndexCreationContext.java | 467 +++++++++++++++++++++
.../segment/spi/creator/IndexCreatorProvider.java | 28 ++
.../segment/spi/creator/IndexCreatorProviders.java | 159 +++++++
.../spi/creator/InvertedIndexCreatorProvider.java | 36 ++
.../spi/creator/JsonIndexCreatorProvider.java | 36 ++
.../spi/creator/RangeIndexCreatorProvider.java | 36 ++
.../spi/creator/TextIndexCreatorProvider.java | 37 ++
.../spi/index/metadata/ColumnMetadataImpl.java | 4 +
.../spi/creator/IndexCreatorProvidersTest.java | 53 +++
.../converter/DictionaryToRawIndexConverter.java | 4 +-
31 files changed, 1538 insertions(+), 326 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java
b/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java
index 78552dd..f6d2a7d 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java
@@ -26,8 +26,6 @@ import
org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
-import
org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter;
-import
org.apache.pinot.segment.local.segment.creator.impl.SegmentColumnarIndexCreator;
import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.utils.CrcUtils;
@@ -36,12 +34,15 @@ import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.creator.ForwardIndexCreatorProvider;
+import org.apache.pinot.segment.spi.creator.IndexCreationContext;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
import org.apache.pinot.segment.spi.index.reader.Dictionary;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.MetricFieldSpec;
@@ -82,13 +83,14 @@ public class RawIndexConverter {
private final File _convertedIndexDir;
private final PropertiesConfiguration _convertedProperties;
private final String _columnsToConvert;
+ private final ForwardIndexCreatorProvider _indexCreatorProvider;
/**
* NOTE: original segment should be in V1 format.
* TODO: support V3 format
*/
public RawIndexConverter(String rawTableName, File originalIndexDir, File
convertedIndexDir,
- @Nullable String columnsToConvert)
+ @Nullable String columnsToConvert, ForwardIndexCreatorProvider
indexCreatorProvider)
throws Exception {
FileUtils.copyDirectory(originalIndexDir, convertedIndexDir);
IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
@@ -101,6 +103,7 @@ public class RawIndexConverter {
_convertedProperties =
new PropertiesConfiguration(new File(_convertedIndexDir,
V1Constants.MetadataKeys.METADATA_FILE_NAME));
_columnsToConvert = columnsToConvert;
+ _indexCreatorProvider = indexCreatorProvider;
}
public boolean convert()
@@ -205,11 +208,11 @@ public class RawIndexConverter {
assert dictionary != null;
DataType storedType = dictionary.getValueType();
int numDocs = _originalSegmentMetadata.getTotalDocs();
- int lengthOfLongestEntry =
_originalSegmentMetadata.getColumnMetadataFor(columnName).getColumnMaxLength();
- try (ForwardIndexCreator rawIndexCreator = SegmentColumnarIndexCreator
- .getRawIndexCreatorForSVColumn(_convertedIndexDir,
ChunkCompressionType.LZ4, columnName,
- storedType, numDocs, lengthOfLongestEntry, false,
- BaseChunkSVForwardIndexWriter.DEFAULT_VERSION);
+ ColumnMetadata columnMetadata =
_originalSegmentMetadata.getColumnMetadataFor(columnName);
+ try (ForwardIndexCreator rawIndexCreator =
_indexCreatorProvider.newForwardIndexCreator(
+
IndexCreationContext.builder().withIndexDir(_convertedIndexDir).withColumnMetadata(columnMetadata)
+ .withFieldSpec(new DimensionFieldSpec(columnName, storedType,
columnMetadata.isSingleValue()))
+
.withDictionary(false).build().forForwardIndex(ChunkCompressionType.LZ4, null));
ForwardIndexReaderContext readerContext = reader.createContext()) {
switch (storedType) {
case INT:
diff --git
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRangeIndex.java
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRangeIndex.java
index 60db2a3..e250eb6 100644
--- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRangeIndex.java
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRangeIndex.java
@@ -191,14 +191,7 @@ public class BenchmarkRangeIndex {
public void setup()
throws IOException {
super.setup();
- ColumnMetadata metadata = new ColumnMetadataImpl.Builder()
- .setFieldSpec(_fieldSpec)
- .setTotalDocs(_numDocs)
- .setHasDictionary(false)
- .setMaxValue(max())
- .setMinValue(min())
- .build();
- _creator = new BitSlicedRangeIndexCreator(_indexDir, metadata);
+ _creator = new BitSlicedRangeIndexCreator(_indexDir, _fieldSpec, min(),
max());
}
}
@@ -328,7 +321,7 @@ public class BenchmarkRangeIndex {
@Override
protected RawValueBasedInvertedIndexCreator newCreator() {
- return new BitSlicedRangeIndexCreator(_indexDir, metadata());
+ return new BitSlicedRangeIndexCreator(_indexDir, _fieldSpec, min(),
max());
}
}
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskExecutor.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskExecutor.java
index 7a5a33f..ba3e610 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskExecutor.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskExecutor.java
@@ -27,6 +27,7 @@ import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.core.minion.RawIndexConverter;
import
org.apache.pinot.plugin.minion.tasks.BaseSingleSegmentConversionExecutor;
import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
+import org.apache.pinot.segment.spi.creator.IndexCreatorProviders;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -39,7 +40,8 @@ public class ConvertToRawIndexTaskExecutor extends
BaseSingleSegmentConversionEx
String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
new RawIndexConverter(rawTableName, indexDir, workingDir,
-
configs.get(MinionConstants.ConvertToRawIndexTask.COLUMNS_TO_CONVERT_KEY)).convert();
+
configs.get(MinionConstants.ConvertToRawIndexTask.COLUMNS_TO_CONVERT_KEY),
+ IndexCreatorProviders.getIndexCreatorProvider()).convert();
return new SegmentConversionResult.Builder().setFile(workingDir)
.setTableNameWithType(configs.get(MinionConstants.TABLE_NAME_KEY))
.setSegmentName(configs.get(MinionConstants.SEGMENT_NAME_KEY)).build();
diff --git a/pinot-segment-local/pom.xml b/pinot-segment-local/pom.xml
index c60da9c..e1bab72 100644
--- a/pinot-segment-local/pom.xml
+++ b/pinot-segment-local/pom.xml
@@ -122,9 +122,10 @@
<artifactId>testng</artifactId>
<scope>test</scope>
</dependency>
+ <!-- required for static mock in IndexCreatorOverrideTest -->
<dependency>
<groupId>org.mockito</groupId>
- <artifactId>mockito-core</artifactId>
+ <artifactId>mockito-inline</artifactId>
<scope>test</scope>
</dependency>
<dependency>
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/DefaultIndexCreatorProvider.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/DefaultIndexCreatorProvider.java
new file mode 100644
index 0000000..2753459
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/DefaultIndexCreatorProvider.java
@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+import
org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter;
+import
org.apache.pinot.segment.local.segment.creator.impl.bloom.OnHeapGuavaBloomFilterCreator;
+import
org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueFixedByteRawIndexCreator;
+import
org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueUnsortedForwardIndexCreator;
+import
org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueVarByteRawIndexCreator;
+import
org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueFixedByteRawIndexCreator;
+import
org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueSortedForwardIndexCreator;
+import
org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueUnsortedForwardIndexCreator;
+import
org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueVarByteRawIndexCreator;
+import
org.apache.pinot.segment.local.segment.creator.impl.inv.BitSlicedRangeIndexCreator;
+import
org.apache.pinot.segment.local.segment.creator.impl.inv.OffHeapBitmapInvertedIndexCreator;
+import
org.apache.pinot.segment.local.segment.creator.impl.inv.OnHeapBitmapInvertedIndexCreator;
+import
org.apache.pinot.segment.local.segment.creator.impl.inv.RangeIndexCreator;
+import
org.apache.pinot.segment.local.segment.creator.impl.inv.geospatial.OffHeapH3IndexCreator;
+import
org.apache.pinot.segment.local.segment.creator.impl.inv.geospatial.OnHeapH3IndexCreator;
+import
org.apache.pinot.segment.local.segment.creator.impl.inv.json.OffHeapJsonIndexCreator;
+import
org.apache.pinot.segment.local.segment.creator.impl.inv.json.OnHeapJsonIndexCreator;
+import
org.apache.pinot.segment.local.segment.creator.impl.inv.text.LuceneFSTIndexCreator;
+import
org.apache.pinot.segment.local.segment.creator.impl.text.LuceneTextIndexCreator;
+import org.apache.pinot.segment.local.utils.nativefst.NativeFSTIndexCreator;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
+import org.apache.pinot.segment.spi.index.creator.BloomFilterCreator;
+import org.apache.pinot.segment.spi.index.creator.CombinedInvertedIndexCreator;
+import
org.apache.pinot.segment.spi.index.creator.DictionaryBasedInvertedIndexCreator;
+import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
+import org.apache.pinot.segment.spi.index.creator.GeoSpatialIndexCreator;
+import org.apache.pinot.segment.spi.index.creator.JsonIndexCreator;
+import org.apache.pinot.segment.spi.index.creator.TextIndexCreator;
+import org.apache.pinot.segment.spi.index.reader.H3IndexResolution;
+import org.apache.pinot.spi.config.table.FSTType;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/**
+ * This class centralizes logic for how to create indexes. It can be overridden
+ * by SPI {@see IndexCreatorProviders} and should not be constructed directly,
but
+ * accessed only via {@see IndexCreatorProviders#getIndexCreatorProvider}.
Unless
+ * a user provides an override, this is the logic which will be used to create
+ * each index type.
+ */
+public final class DefaultIndexCreatorProvider implements IndexCreatorProvider
{
+
+ @Override
+ public ForwardIndexCreator
newForwardIndexCreator(IndexCreationContext.Forward context)
+ throws Exception {
+ if (!context.hasDictionary()) {
+ boolean deriveNumDocsPerChunk =
+ shouldDeriveNumDocsPerChunk(context.getFieldSpec().getName(),
context.getColumnProperties());
+ int writerVersion =
getRawIndexWriterVersion(context.getFieldSpec().getName(),
context.getColumnProperties());
+ if (context.getFieldSpec().isSingleValueField()) {
+ return getRawIndexCreatorForSVColumn(context.getIndexDir(),
context.getChunkCompressionType(),
+ context.getFieldSpec().getName(),
context.getFieldSpec().getDataType().getStoredType(),
+ context.getTotalDocs(), context.getLengthOfLongestEntry(),
deriveNumDocsPerChunk, writerVersion);
+ } else {
+ return getRawIndexCreatorForMVColumn(context.getIndexDir(),
context.getChunkCompressionType(),
+ context.getFieldSpec().getName(),
context.getFieldSpec().getDataType().getStoredType(),
+ context.getTotalDocs(),
context.getMaxNumberOfMultiValueElements(), deriveNumDocsPerChunk,
writerVersion,
+ context.getMaxRowLengthInBytes());
+ }
+ } else {
+ if (context.getFieldSpec().isSingleValueField()) {
+ if (context.isSorted()) {
+ return new
SingleValueSortedForwardIndexCreator(context.getIndexDir(),
context.getFieldSpec().getName(),
+ context.getCardinality());
+ } else {
+ return new
SingleValueUnsortedForwardIndexCreator(context.getIndexDir(),
context.getFieldSpec().getName(),
+ context.getCardinality(), context.getTotalDocs());
+ }
+ } else {
+ return new
MultiValueUnsortedForwardIndexCreator(context.getIndexDir(),
context.getFieldSpec().getName(),
+ context.getCardinality(), context.getTotalDocs(),
context.getTotalNumberOfEntries());
+ }
+ }
+ }
+
+ @Override
+ public DictionaryBasedInvertedIndexCreator
newInvertedIndexCreator(IndexCreationContext.Inverted context)
+ throws IOException {
+ if (context.isOnHeap()) {
+ return new OnHeapBitmapInvertedIndexCreator(context.getIndexDir(),
context.getFieldSpec().getName(),
+ context.getCardinality());
+ } else {
+ return new OffHeapBitmapInvertedIndexCreator(context.getIndexDir(),
context.getFieldSpec(),
+ context.getCardinality(), context.getTotalDocs(),
context.getTotalNumberOfEntries());
+ }
+ }
+
+ @Override
+ public JsonIndexCreator newJsonIndexCreator(IndexCreationContext.Json
context)
+ throws IOException {
+ Preconditions.checkState(context.getFieldSpec().isSingleValueField(),
+ "Json index is currently only supported on single-value columns");
+
Preconditions.checkState(context.getFieldSpec().getDataType().getStoredType()
== FieldSpec.DataType.STRING,
+ "Json index is currently only supported on STRING columns");
+ return context.isOnHeap() ? new
OnHeapJsonIndexCreator(context.getIndexDir(), context.getFieldSpec().getName())
+ : new OffHeapJsonIndexCreator(context.getIndexDir(),
context.getFieldSpec().getName());
+ }
+
+ @Override
+ public TextIndexCreator newTextIndexCreator(IndexCreationContext.Text
context)
+ throws IOException {
+ if (context.isFst()) {
+ Preconditions.checkState(context.getFieldSpec().isSingleValueField(),
+ "FST index is currently only supported on single-value columns");
+
Preconditions.checkState(context.getFieldSpec().getDataType().getStoredType()
== FieldSpec.DataType.STRING,
+ "FST index is currently only supported on STRING type columns");
+ Preconditions.checkState(context.hasDictionary(),
+ "FST index is currently only supported on dictionary-encoded
columns");
+ String[] sortedValues = context.getSortedUniqueElementsArray();
+ if (context.getFstType() == FSTType.NATIVE) {
+ return new NativeFSTIndexCreator(context.getIndexDir(),
context.getFieldSpec().getName(), sortedValues);
+ } else {
+ return new LuceneFSTIndexCreator(context.getIndexDir(),
context.getFieldSpec().getName(), sortedValues);
+ }
+ } else {
+
Preconditions.checkState(context.getFieldSpec().getDataType().getStoredType()
== FieldSpec.DataType.STRING,
+ "Text index is currently only supported on STRING type columns");
+ return new LuceneTextIndexCreator(context.getFieldSpec().getName(),
context.getIndexDir(),
+ context.isCommitOnClose());
+ }
+ }
+
+ @Override
+ public GeoSpatialIndexCreator
newGeoSpatialIndexCreator(IndexCreationContext.Geospatial context)
+ throws IOException {
+ Preconditions.checkState(context.getFieldSpec().isSingleValueField(),
+ "H3 index is currently only supported on single-value columns");
+
Preconditions.checkState(context.getFieldSpec().getDataType().getStoredType()
== FieldSpec.DataType.BYTES,
+ "H3 index is currently only supported on BYTES columns");
+ H3IndexResolution resolution =
Objects.requireNonNull(context.getH3IndexConfig()).getResolution();
+ return context.isOnHeap() ? new
OnHeapH3IndexCreator(context.getIndexDir(), context.getFieldSpec().getName(),
+ resolution) : new OffHeapH3IndexCreator(context.getIndexDir(),
context.getFieldSpec().getName(), resolution);
+ }
+
+ public static boolean shouldDeriveNumDocsPerChunk(String columnName,
+ Map<String, Map<String, String>> columnProperties) {
+ if (columnProperties != null) {
+ Map<String, String> properties = columnProperties.get(columnName);
+ return properties != null && Boolean.parseBoolean(
+ properties.get(FieldConfig.DERIVE_NUM_DOCS_PER_CHUNK_RAW_INDEX_KEY));
+ }
+ return false;
+ }
+
+ public static int getRawIndexWriterVersion(String columnName, Map<String,
Map<String, String>> columnProperties) {
+ if (columnProperties != null && columnProperties.get(columnName) != null) {
+ Map<String, String> properties = columnProperties.get(columnName);
+ String version = properties.get(FieldConfig.RAW_INDEX_WRITER_VERSION);
+ if (version == null) {
+ return BaseChunkSVForwardIndexWriter.DEFAULT_VERSION;
+ }
+ return Integer.parseInt(version);
+ }
+ return BaseChunkSVForwardIndexWriter.DEFAULT_VERSION;
+ }
+
+ /**
+ * Helper method to build the raw index creator for the column.
+ * Assumes that column to be indexed is single valued.
+ *
+ * @param file Output index file
+ * @param column Column name
+ * @param totalDocs Total number of documents to index
+ * @param lengthOfLongestEntry Length of longest entry
+ * @param deriveNumDocsPerChunk true if varbyte writer should auto-derive
the number of rows per chunk
+ * @param writerVersion version to use for the raw index writer
+ * @return raw index creator
+ */
+ public static ForwardIndexCreator getRawIndexCreatorForSVColumn(File file,
ChunkCompressionType compressionType,
+ String column, FieldSpec.DataType dataType, int totalDocs, int
lengthOfLongestEntry,
+ boolean deriveNumDocsPerChunk,
+ int writerVersion)
+ throws IOException {
+ switch (dataType.getStoredType()) {
+ case INT:
+ case LONG:
+ case FLOAT:
+ case DOUBLE:
+ return new SingleValueFixedByteRawIndexCreator(file, compressionType,
column, totalDocs, dataType,
+ writerVersion);
+ case STRING:
+ case BYTES:
+ return new SingleValueVarByteRawIndexCreator(file, compressionType,
column, totalDocs, dataType,
+ lengthOfLongestEntry, deriveNumDocsPerChunk, writerVersion);
+ default:
+ throw new UnsupportedOperationException("Data type not supported for
raw indexing: " + dataType);
+ }
+ }
+
+ /**
+ * Helper method to build the raw index creator for the column.
+ * Assumes that column to be indexed is single valued.
+ *
+ * @param file Output index file
+ * @param column Column name
+ * @param totalDocs Total number of documents to index
+ * @param deriveNumDocsPerChunk true if varbyte writer should auto-derive
the number of rows
+ * per chunk
+ * @param writerVersion version to use for the raw index writer
+ * @param maxRowLengthInBytes the length of the longest row in bytes
+ * @return raw index creator
+ */
+ public static ForwardIndexCreator getRawIndexCreatorForMVColumn(File file,
ChunkCompressionType compressionType,
+ String column, FieldSpec.DataType dataType, final int totalDocs, int
maxNumberOfMultiValueElements,
+ boolean deriveNumDocsPerChunk, int writerVersion, int
maxRowLengthInBytes)
+ throws IOException {
+ switch (dataType.getStoredType()) {
+ case INT:
+ case LONG:
+ case FLOAT:
+ case DOUBLE:
+ return new MultiValueFixedByteRawIndexCreator(file, compressionType,
column, totalDocs, dataType,
+ maxNumberOfMultiValueElements, deriveNumDocsPerChunk,
writerVersion);
+ case STRING:
+ case BYTES:
+ return new MultiValueVarByteRawIndexCreator(file, compressionType,
column, totalDocs, dataType, writerVersion,
+ maxRowLengthInBytes, maxNumberOfMultiValueElements);
+ default:
+ throw new UnsupportedOperationException("Data type not supported for
raw indexing: " + dataType);
+ }
+ }
+
+ @Override
+ public BloomFilterCreator
newBloomFilterCreator(IndexCreationContext.BloomFilter context)
+ throws IOException {
+ return new OnHeapGuavaBloomFilterCreator(context.getIndexDir(),
context.getFieldSpec().getName(),
+ context.getCardinality(),
Objects.requireNonNull(context.getBloomFilterConfig()));
+ }
+
+ @Override
+ public CombinedInvertedIndexCreator
newRangeIndexCreator(IndexCreationContext.Range context)
+ throws IOException {
+ if (context.getRangeIndexVersion() == BitSlicedRangeIndexCreator.VERSION
&& context.getFieldSpec()
+ .isSingleValueField()) {
+ if (context.hasDictionary()) {
+ return new BitSlicedRangeIndexCreator(context.getIndexDir(),
context.getFieldSpec(), context.getCardinality());
+ }
+ return new BitSlicedRangeIndexCreator(context.getIndexDir(),
context.getFieldSpec(), context.getMin(),
+ context.getMax());
+ }
+ // default to RangeIndexCreator for the time being
+ return new RangeIndexCreator(context.getIndexDir(), context.getFieldSpec(),
+ context.hasDictionary() ? FieldSpec.DataType.INT :
context.getFieldSpec().getDataType(), -1,
+ -1, context.getTotalDocs(), context.getTotalNumberOfEntries());
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
index 764d36f..b01dd3f 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
@@ -33,28 +33,14 @@ import
org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.pinot.common.utils.FileUtils;
import org.apache.pinot.segment.local.io.util.PinotDataBitSet;
-import
org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter;
-import
org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueFixedByteRawIndexCreator;
-import
org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueUnsortedForwardIndexCreator;
-import
org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueVarByteRawIndexCreator;
-import
org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueFixedByteRawIndexCreator;
-import
org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueSortedForwardIndexCreator;
-import
org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueUnsortedForwardIndexCreator;
-import
org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueVarByteRawIndexCreator;
-import
org.apache.pinot.segment.local.segment.creator.impl.inv.OffHeapBitmapInvertedIndexCreator;
-import
org.apache.pinot.segment.local.segment.creator.impl.inv.OnHeapBitmapInvertedIndexCreator;
-import
org.apache.pinot.segment.local.segment.creator.impl.inv.geospatial.OffHeapH3IndexCreator;
-import
org.apache.pinot.segment.local.segment.creator.impl.inv.geospatial.OnHeapH3IndexCreator;
-import
org.apache.pinot.segment.local.segment.creator.impl.inv.json.OffHeapJsonIndexCreator;
-import
org.apache.pinot.segment.local.segment.creator.impl.inv.json.OnHeapJsonIndexCreator;
-import
org.apache.pinot.segment.local.segment.creator.impl.inv.text.LuceneFSTIndexCreator;
import
org.apache.pinot.segment.local.segment.creator.impl.nullvalue.NullValueVectorCreator;
-import
org.apache.pinot.segment.local.segment.creator.impl.text.LuceneTextIndexCreator;
import org.apache.pinot.segment.local.utils.GeometrySerializer;
-import org.apache.pinot.segment.local.utils.nativefst.NativeFSTIndexCreator;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.segment.spi.creator.ColumnIndexCreationInfo;
+import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
+import org.apache.pinot.segment.spi.creator.IndexCreatorProviders;
import org.apache.pinot.segment.spi.creator.SegmentCreator;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import
org.apache.pinot.segment.spi.index.creator.DictionaryBasedInvertedIndexCreator;
@@ -64,9 +50,7 @@ import
org.apache.pinot.segment.spi.index.creator.H3IndexConfig;
import org.apache.pinot.segment.spi.index.creator.JsonIndexCreator;
import org.apache.pinot.segment.spi.index.creator.SegmentIndexCreationInfo;
import org.apache.pinot.segment.spi.index.creator.TextIndexCreator;
-import org.apache.pinot.segment.spi.index.reader.H3IndexResolution;
import org.apache.pinot.segment.spi.partition.PartitionFunction;
-import org.apache.pinot.spi.config.table.FSTType;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.DateTimeFormatSpec;
@@ -99,6 +83,7 @@ public class SegmentColumnarIndexCreator implements
SegmentCreator {
private SegmentGeneratorConfig _config;
private Map<String, ColumnIndexCreationInfo> _indexCreationInfoMap;
+ private final IndexCreatorProvider _indexCreatorProvider =
IndexCreatorProviders.getIndexCreatorProvider();
private final Map<String, SegmentDictionaryCreator> _dictionaryCreatorMap =
new HashMap<>();
private final Map<String, ForwardIndexCreator> _forwardIndexCreatorMap = new
HashMap<>();
private final Map<String, DictionaryBasedInvertedIndexCreator>
_invertedIndexCreatorMap = new HashMap<>();
@@ -179,126 +164,70 @@ public class SegmentColumnarIndexCreator implements
SegmentCreator {
}
String columnName = fieldSpec.getName();
- DataType storedType = fieldSpec.getDataType().getStoredType();
- ColumnIndexCreationInfo indexCreationInfo =
indexCreationInfoMap.get(columnName);
- Preconditions.checkNotNull(indexCreationInfo, "Missing index creation
info for column: %s", columnName);
- boolean dictEnabledColumn = createDictionaryForColumn(indexCreationInfo,
segmentCreationSpec, fieldSpec);
-
+ ColumnIndexCreationInfo columnIndexCreationInfo =
indexCreationInfoMap.get(columnName);
+ Preconditions.checkNotNull(columnIndexCreationInfo, "Missing index
creation info for column: %s", columnName);
+ boolean dictEnabledColumn =
createDictionaryForColumn(columnIndexCreationInfo, segmentCreationSpec,
fieldSpec);
+ Preconditions.checkState(dictEnabledColumn ||
!invertedIndexColumns.contains(columnName),
+ "Cannot create inverted index for raw index column: %s", columnName);
+
+ IndexCreationContext.Common context = IndexCreationContext.builder()
+ .withIndexDir(_indexDir)
+ .withCardinality(columnIndexCreationInfo.getDistinctValueCount())
+ .withDictionary(dictEnabledColumn)
+ .withFieldSpec(fieldSpec)
+ .withTotalDocs(segmentIndexCreationInfo.getTotalDocs())
+
.withTotalNumberOfEntries(columnIndexCreationInfo.getTotalNumberOfEntries())
+ .withColumnIndexCreationInfo(columnIndexCreationInfo)
+ .sorted(columnIndexCreationInfo.isSorted())
+ .onHeap(segmentCreationSpec.isOnHeap())
+ .build();
+ // Initialize forward index creator
+ ChunkCompressionType chunkCompressionType =
+ dictEnabledColumn ? null :
getColumnCompressionType(segmentCreationSpec, fieldSpec);
+ _forwardIndexCreatorMap.put(columnName,
_indexCreatorProvider.newForwardIndexCreator(
+ context.forForwardIndex(chunkCompressionType,
segmentCreationSpec.getColumnProperties())));
+
+ // Initialize inverted index creator; skip creating inverted index if
sorted
+ if (invertedIndexColumns.contains(columnName) &&
!columnIndexCreationInfo.isSorted()) {
+ _invertedIndexCreatorMap.put(columnName,
+
_indexCreatorProvider.newInvertedIndexCreator(context.forInvertedIndex()));
+ }
if (dictEnabledColumn) {
// Create dictionary-encoded index
-
// Initialize dictionary creator
SegmentDictionaryCreator dictionaryCreator =
- new
SegmentDictionaryCreator(indexCreationInfo.getSortedUniqueElementsArray(),
fieldSpec, _indexDir,
- indexCreationInfo.isUseVarLengthDictionary());
+ new
SegmentDictionaryCreator(columnIndexCreationInfo.getSortedUniqueElementsArray(),
fieldSpec, _indexDir,
+ columnIndexCreationInfo.isUseVarLengthDictionary());
_dictionaryCreatorMap.put(columnName, dictionaryCreator);
-
// Create dictionary
try {
dictionaryCreator.build();
} catch (Exception e) {
LOGGER.error("Error building dictionary for field: {}, cardinality:
{}, number of bytes per entry: {}",
- fieldSpec.getName(), indexCreationInfo.getDistinctValueCount(),
dictionaryCreator.getNumBytesPerEntry());
+ fieldSpec.getName(),
columnIndexCreationInfo.getDistinctValueCount(),
+ dictionaryCreator.getNumBytesPerEntry());
throw e;
}
-
- // Initialize forward index creator
- int cardinality = indexCreationInfo.getDistinctValueCount();
- if (fieldSpec.isSingleValueField()) {
- if (indexCreationInfo.isSorted()) {
- _forwardIndexCreatorMap.put(columnName,
- new SingleValueSortedForwardIndexCreator(_indexDir,
columnName, cardinality));
- } else {
- _forwardIndexCreatorMap.put(columnName,
- new SingleValueUnsortedForwardIndexCreator(_indexDir,
columnName, cardinality, _totalDocs));
- }
- } else {
- _forwardIndexCreatorMap.put(columnName,
- new MultiValueUnsortedForwardIndexCreator(_indexDir, columnName,
cardinality, _totalDocs,
- indexCreationInfo.getTotalNumberOfEntries()));
- }
-
- // Initialize inverted index creator; skip creating inverted index if
sorted
- if (invertedIndexColumns.contains(columnName) &&
!indexCreationInfo.isSorted()) {
- if (segmentCreationSpec.isOnHeap()) {
- _invertedIndexCreatorMap.put(columnName,
- new OnHeapBitmapInvertedIndexCreator(_indexDir, columnName,
cardinality));
- } else {
- _invertedIndexCreatorMap.put(columnName,
- new OffHeapBitmapInvertedIndexCreator(_indexDir, fieldSpec,
cardinality, _totalDocs,
- indexCreationInfo.getTotalNumberOfEntries()));
- }
- }
- } else {
- // Create raw index
- Preconditions.checkState(!invertedIndexColumns.contains(columnName),
- "Cannot create inverted index for raw index column: %s",
columnName);
-
- ChunkCompressionType compressionType =
getColumnCompressionType(segmentCreationSpec, fieldSpec);
-
- // Initialize forward index creator
- boolean deriveNumDocsPerChunk =
- shouldDeriveNumDocsPerChunk(columnName,
segmentCreationSpec.getColumnProperties());
- int writerVersion = rawIndexWriterVersion(columnName,
segmentCreationSpec.getColumnProperties());
- if (fieldSpec.isSingleValueField()) {
- _forwardIndexCreatorMap.put(columnName,
- getRawIndexCreatorForSVColumn(_indexDir, compressionType,
columnName, storedType, _totalDocs,
- indexCreationInfo.getLengthOfLongestEntry(),
deriveNumDocsPerChunk, writerVersion));
- } else {
- _forwardIndexCreatorMap.put(columnName,
- getRawIndexCreatorForMVColumn(_indexDir, compressionType,
columnName, storedType, _totalDocs,
- indexCreationInfo.getMaxNumberOfMultiValueElements(),
deriveNumDocsPerChunk, writerVersion,
- indexCreationInfo.getMaxRowLengthInBytes()));
- }
}
if (textIndexColumns.contains(columnName)) {
- // Initialize text index creator
- Preconditions.checkState(storedType == DataType.STRING,
- "Text index is currently only supported on STRING type columns");
- _textIndexCreatorMap.put(columnName,
- new LuceneTextIndexCreator(columnName, _indexDir, true /*
commitOnClose */));
+ _textIndexCreatorMap.put(columnName,
_indexCreatorProvider.newTextIndexCreator(context.forTextIndex(true)));
}
if (fstIndexColumns.contains(columnName)) {
- Preconditions.checkState(fieldSpec.isSingleValueField(),
- "FST index is currently only supported on single-value columns");
- Preconditions.checkState(storedType == DataType.STRING,
- "FST index is currently only supported on STRING type columns");
- Preconditions.checkState(dictEnabledColumn,
- "FST index is currently only supported on dictionary-encoded
columns");
- String[] sortedValues = (String[])
indexCreationInfo.getSortedUniqueElementsArray();
- TextIndexCreator textIndexCreator;
- if (_config.getFSTIndexType() == FSTType.NATIVE) {
- textIndexCreator = new NativeFSTIndexCreator(_indexDir, columnName,
sortedValues);
- } else {
- textIndexCreator = new LuceneFSTIndexCreator(_indexDir, columnName,
sortedValues);
- }
-
- _fstIndexCreatorMap.put(columnName, textIndexCreator);
+ _fstIndexCreatorMap.put(columnName,
_indexCreatorProvider.newTextIndexCreator(
+ context.forFSTIndex(_config.getFSTIndexType(),
+ (String[])
columnIndexCreationInfo.getSortedUniqueElementsArray())));
}
if (jsonIndexColumns.contains(columnName)) {
- Preconditions.checkState(fieldSpec.isSingleValueField(),
- "Json index is currently only supported on single-value columns");
- Preconditions.checkState(storedType == DataType.STRING,
- "Json index is currently only supported on STRING columns");
- JsonIndexCreator jsonIndexCreator =
- segmentCreationSpec.isOnHeap() ? new
OnHeapJsonIndexCreator(_indexDir, columnName)
- : new OffHeapJsonIndexCreator(_indexDir, columnName);
- _jsonIndexCreatorMap.put(columnName, jsonIndexCreator);
+ _jsonIndexCreatorMap.put(columnName,
_indexCreatorProvider.newJsonIndexCreator(context.forJsonIndex()));
}
H3IndexConfig h3IndexConfig = h3IndexConfigs.get(columnName);
if (h3IndexConfig != null) {
- Preconditions.checkState(fieldSpec.isSingleValueField(),
- "H3 index is currently only supported on single-value columns");
- Preconditions.checkState(storedType == DataType.BYTES, "H3 index is
currently only supported on BYTES columns");
- H3IndexResolution resolution = h3IndexConfig.getResolution();
- GeoSpatialIndexCreator h3IndexCreator =
- segmentCreationSpec.isOnHeap() ? new
OnHeapH3IndexCreator(_indexDir, columnName, resolution)
- : new OffHeapH3IndexCreator(_indexDir, columnName, resolution);
- _h3IndexCreatorMap.put(columnName, h3IndexCreator);
+ _h3IndexCreatorMap.put(columnName,
+
_indexCreatorProvider.newGeoSpatialIndexCreator(context.forGeospatialIndex(h3IndexConfig)));
}
_nullHandlingEnabled = _config.isNullHandlingEnabled();
@@ -309,26 +238,29 @@ public class SegmentColumnarIndexCreator implements
SegmentCreator {
}
}
- public static boolean shouldDeriveNumDocsPerChunk(String columnName,
- Map<String, Map<String, String>> columnProperties) {
- if (columnProperties != null) {
- Map<String, String> properties = columnProperties.get(columnName);
- return properties != null && Boolean.parseBoolean(
- properties.get(FieldConfig.DERIVE_NUM_DOCS_PER_CHUNK_RAW_INDEX_KEY));
- }
- return false;
- }
-
- public static int rawIndexWriterVersion(String columnName, Map<String,
Map<String, String>> columnProperties) {
- if (columnProperties != null && columnProperties.get(columnName) != null) {
- Map<String, String> properties = columnProperties.get(columnName);
- String version = properties.get(FieldConfig.RAW_INDEX_WRITER_VERSION);
- if (version == null) {
- return BaseChunkSVForwardIndexWriter.DEFAULT_VERSION;
- }
- return Integer.parseInt(version);
+ /**
+ * Returns true if dictionary should be created for a column, false
otherwise.
+ * Currently there are two sources for this config:
+ * <ul>
+ * <li> ColumnIndexCreationInfo (this is currently hard-coded to always
return dictionary). </li>
+ * <li> SegmentGeneratorConfig</li>
+ * </ul>
+ *
+ * This method gives preference to the SegmentGeneratorConfig first.
+ *
+ * @param info Column index creation info
+ * @param config Segment generation config
+ * @param spec Field spec for the column
+ * @return True if dictionary should be created for the column, false
otherwise
+ */
+ private boolean createDictionaryForColumn(ColumnIndexCreationInfo info,
SegmentGeneratorConfig config,
+ FieldSpec spec) {
+ String column = spec.getName();
+ if (config.getRawIndexCreationColumns().contains(column) ||
config.getRawIndexCompressionType()
+ .containsKey(column)) {
+ return false;
}
- return BaseChunkSVForwardIndexWriter.DEFAULT_VERSION;
+ return info.isCreateDictionary();
}
/**
@@ -347,7 +279,7 @@ public class SegmentColumnarIndexCreator implements
SegmentCreator {
FieldSpec fieldSpec) {
ChunkCompressionType compressionType =
segmentCreationSpec.getRawIndexCompressionType().get(fieldSpec.getName());
if (compressionType == null) {
- if (fieldSpec.getFieldType() == FieldType.METRIC) {
+ if (fieldSpec.getFieldType() == FieldSpec.FieldType.METRIC) {
return ChunkCompressionType.PASS_THROUGH;
} else {
return ChunkCompressionType.LZ4;
@@ -357,31 +289,6 @@ public class SegmentColumnarIndexCreator implements
SegmentCreator {
}
}
- /**
- * Returns true if dictionary should be created for a column, false
otherwise.
- * Currently there are two sources for this config:
- * <ul>
- * <li> ColumnIndexCreationInfo (this is currently hard-coded to always
return dictionary). </li>
- * <li> SegmentGeneratorConfig</li>
- * </ul>
- *
- * This method gives preference to the SegmentGeneratorConfig first.
- *
- * @param info Column index creation info
- * @param config Segment generation config
- * @param spec Field spec for the column
- * @return True if dictionary should be created for the column, false
otherwise
- */
- private boolean createDictionaryForColumn(ColumnIndexCreationInfo info,
SegmentGeneratorConfig config,
- FieldSpec spec) {
- String column = spec.getName();
- if (config.getRawIndexCreationColumns().contains(column) ||
config.getRawIndexCompressionType()
- .containsKey(column)) {
- return false;
- }
- return info.isCreateDictionary();
- }
-
@Override
public void indexRow(GenericRow row)
throws IOException {
@@ -795,71 +702,6 @@ public class SegmentColumnarIndexCreator implements
SegmentCreator {
properties.subset(COLUMN_PROPS_KEY_PREFIX + column).clear();
}
- /**
- * Helper method to build the raw index creator for the column.
- * Assumes that column to be indexed is single valued.
- *
- * @param file Output index file
- * @param column Column name
- * @param totalDocs Total number of documents to index
- * @param lengthOfLongestEntry Length of longest entry
- * @param deriveNumDocsPerChunk true if varbyte writer should auto-derive
the number of rows per chunk
- * @param writerVersion version to use for the raw index writer
- * @return raw index creator
- */
- public static ForwardIndexCreator getRawIndexCreatorForSVColumn(File file,
ChunkCompressionType compressionType,
- String column, DataType dataType, int totalDocs, int
lengthOfLongestEntry, boolean deriveNumDocsPerChunk,
- int writerVersion)
- throws IOException {
- switch (dataType.getStoredType()) {
- case INT:
- case LONG:
- case FLOAT:
- case DOUBLE:
- return new SingleValueFixedByteRawIndexCreator(file, compressionType,
column, totalDocs, dataType,
- writerVersion);
- case STRING:
- case BYTES:
- return new SingleValueVarByteRawIndexCreator(file, compressionType,
column, totalDocs, dataType,
- lengthOfLongestEntry, deriveNumDocsPerChunk, writerVersion);
- default:
- throw new UnsupportedOperationException("Data type not supported for
raw indexing: " + dataType);
- }
- }
-
- /**
- * Helper method to build the raw index creator for the column.
- * Assumes that column to be indexed is single valued.
- *
- * @param file Output index file
- * @param column Column name
- * @param totalDocs Total number of documents to index
- * @param deriveNumDocsPerChunk true if varbyte writer should auto-derive
the number of rows
- * per chunk
- * @param writerVersion version to use for the raw index writer
- * @param maxRowLengthInBytes the length of the longest row in bytes
- * @return raw index creator
- */
- public static ForwardIndexCreator getRawIndexCreatorForMVColumn(File file,
ChunkCompressionType compressionType,
- String column, DataType dataType, final int totalDocs, int
maxNumberOfMultiValueElements,
- boolean deriveNumDocsPerChunk, int writerVersion, int
maxRowLengthInBytes)
- throws IOException {
- switch (dataType.getStoredType()) {
- case INT:
- case LONG:
- case FLOAT:
- case DOUBLE:
- return new MultiValueFixedByteRawIndexCreator(file, compressionType,
column, totalDocs, dataType,
- maxNumberOfMultiValueElements, deriveNumDocsPerChunk,
writerVersion);
- case STRING:
- case BYTES:
- return new MultiValueVarByteRawIndexCreator(file, compressionType,
column, totalDocs, dataType, writerVersion,
- maxRowLengthInBytes, maxNumberOfMultiValueElements);
- default:
- throw new UnsupportedOperationException("Data type not supported for
raw indexing: " + dataType);
- }
- }
-
@Override
public void close()
throws IOException {
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/BitSlicedRangeIndexCreator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/BitSlicedRangeIndexCreator.java
index 4c0b98b..0f96bf4 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/BitSlicedRangeIndexCreator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/BitSlicedRangeIndexCreator.java
@@ -18,10 +18,10 @@
*/
package org.apache.pinot.segment.local.segment.creator.impl.inv;
+import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
import org.apache.pinot.segment.local.utils.FPOrdering;
-import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.index.creator.CombinedInvertedIndexCreator;
import org.apache.pinot.spi.data.FieldSpec;
import org.roaringbitmap.RangeBitmap;
@@ -41,13 +41,33 @@ public class BitSlicedRangeIndexCreator implements
CombinedInvertedIndexCreator
private final File _rangeIndexFile;
private final long _minValue;
- public BitSlicedRangeIndexCreator(File indexDir, ColumnMetadata metadata) {
- if (!metadata.isSingleValue()) {
- throw new IllegalArgumentException("MV columns not supported");
- }
- _appender = RangeBitmap.appender(maxValue(metadata));
- _rangeIndexFile = new File(indexDir, metadata.getColumnName() +
BITMAP_RANGE_INDEX_FILE_EXTENSION);
- _minValue = minValue(metadata);
+ private BitSlicedRangeIndexCreator(File indexDir, FieldSpec fieldSpec, long
minValue, long maxValue) {
+ Preconditions.checkArgument(fieldSpec.isSingleValueField(), "MV columns
not supported");
+ _rangeIndexFile = new File(indexDir, fieldSpec.getName() +
BITMAP_RANGE_INDEX_FILE_EXTENSION);
+ _appender = RangeBitmap.appender(maxValue);
+ _minValue = minValue;
+ }
+
+ /**
+ * For dictionarized columns
+ * @param indexDir the directory for the index
+ * @param fieldSpec the specification of the field
+ * @param cardinality the cardinality of the dictionary
+ */
+ public BitSlicedRangeIndexCreator(File indexDir, FieldSpec fieldSpec, int
cardinality) {
+ this(indexDir, fieldSpec, 0, cardinality - 1);
+ }
+
+ /**
+ * For raw columns
+ * @param indexDir the directory for the index
+ * @param fieldSpec the specification of the field
+ * @param minValue the minimum value
+ * @param maxValue the maximum value
+ */
+ public BitSlicedRangeIndexCreator(File indexDir, FieldSpec fieldSpec,
Comparable<?> minValue,
+ Comparable<?> maxValue) {
+ this(indexDir, fieldSpec, minValue(fieldSpec, minValue),
maxValue(fieldSpec, minValue, maxValue));
}
@Override
@@ -110,13 +130,8 @@ public class BitSlicedRangeIndexCreator implements
CombinedInvertedIndexCreator
throws IOException {
}
- private static long maxValue(ColumnMetadata metadata) {
- if (metadata.hasDictionary()) {
- return metadata.getCardinality() - 1;
- }
- FieldSpec.DataType storedType = metadata.getDataType().getStoredType();
- Comparable<?> minValue = metadata.getMinValue();
- Comparable<?> maxValue = metadata.getMaxValue();
+ private static long maxValue(FieldSpec fieldSpec, Comparable<?> minValue,
Comparable<?> maxValue) {
+ FieldSpec.DataType storedType = fieldSpec.getDataType().getStoredType();
if (storedType == INT || storedType == LONG) {
return ((Number) maxValue).longValue() - ((Number) minValue).longValue();
}
@@ -126,15 +141,11 @@ public class BitSlicedRangeIndexCreator implements
CombinedInvertedIndexCreator
if (storedType == DOUBLE) {
return 0xFFFFFFFFFFFFFFFFL;
}
- throw new IllegalArgumentException("Unsupported data type: " +
metadata.getDataType());
+ throw new IllegalArgumentException("Unsupported data type: " +
fieldSpec.getDataType());
}
- private static long minValue(ColumnMetadata metadata) {
- if (metadata.hasDictionary()) {
- return 0;
- }
- FieldSpec.DataType storedType = metadata.getDataType().getStoredType();
- Comparable<?> minValue = metadata.getMinValue();
+ private static long minValue(FieldSpec fieldSpec, Comparable<?> minValue) {
+ FieldSpec.DataType storedType = fieldSpec.getDataType().getStoredType();
if (storedType == INT || storedType == LONG) {
return ((Number) minValue).longValue();
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandlerFactory.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandlerFactory.java
index 9dd9ecb..a6fe925 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandlerFactory.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandlerFactory.java
@@ -26,6 +26,8 @@ import
org.apache.pinot.segment.local.segment.index.loader.invertedindex.Inverte
import
org.apache.pinot.segment.local.segment.index.loader.invertedindex.JsonIndexHandler;
import
org.apache.pinot.segment.local.segment.index.loader.invertedindex.RangeIndexHandler;
import
org.apache.pinot.segment.local.segment.index.loader.invertedindex.TextIndexHandler;
+import org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
+import org.apache.pinot.segment.spi.creator.IndexCreatorProviders;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.segment.spi.store.ColumnIndexType;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
@@ -40,22 +42,26 @@ public class IndexHandlerFactory {
public static IndexHandler getIndexHandler(ColumnIndexType type, File
indexDir, SegmentMetadataImpl segmentMetadata,
IndexLoadingConfig indexLoadingConfig, SegmentDirectory.Writer
segmentWriter) {
+ IndexCreatorProvider indexCreatorProvider =
IndexCreatorProviders.getIndexCreatorProvider();
switch (type) {
case INVERTED_INDEX:
- return new InvertedIndexHandler(indexDir, segmentMetadata,
indexLoadingConfig, segmentWriter);
+ return new InvertedIndexHandler(indexDir, segmentMetadata,
indexLoadingConfig, segmentWriter,
+ indexCreatorProvider);
case RANGE_INDEX:
- return new RangeIndexHandler(indexDir, segmentMetadata,
indexLoadingConfig, segmentWriter);
+ return new RangeIndexHandler(indexDir, segmentMetadata,
indexLoadingConfig, segmentWriter,
+ indexCreatorProvider);
case TEXT_INDEX:
- return new TextIndexHandler(indexDir, segmentMetadata,
indexLoadingConfig, segmentWriter);
+ return new TextIndexHandler(indexDir, segmentMetadata,
indexLoadingConfig, segmentWriter, indexCreatorProvider);
case FST_INDEX:
return new FSTIndexHandler(indexDir, segmentMetadata,
indexLoadingConfig, segmentWriter,
- indexLoadingConfig.getFSTIndexType());
+ indexLoadingConfig.getFSTIndexType(), indexCreatorProvider);
case JSON_INDEX:
- return new JsonIndexHandler(indexDir, segmentMetadata,
indexLoadingConfig, segmentWriter);
+ return new JsonIndexHandler(indexDir, segmentMetadata,
indexLoadingConfig, segmentWriter, indexCreatorProvider);
case H3_INDEX:
- return new H3IndexHandler(indexDir, segmentMetadata,
indexLoadingConfig, segmentWriter);
+ return new H3IndexHandler(indexDir, segmentMetadata,
indexLoadingConfig, segmentWriter, indexCreatorProvider);
case BLOOM_FILTER:
- return new BloomFilterHandler(indexDir, segmentMetadata,
indexLoadingConfig, segmentWriter);
+ return new BloomFilterHandler(indexDir, segmentMetadata,
indexLoadingConfig, segmentWriter,
+ indexCreatorProvider);
default:
return NO_OP_HANDLER;
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/bloomfilter/BloomFilterHandler.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/bloomfilter/BloomFilterHandler.java
index b8a3264..f992344 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/bloomfilter/BloomFilterHandler.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/bloomfilter/BloomFilterHandler.java
@@ -24,7 +24,6 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.commons.io.FileUtils;
-import
org.apache.pinot.segment.local.segment.creator.impl.bloom.OnHeapGuavaBloomFilterCreator;
import org.apache.pinot.segment.local.segment.index.loader.IndexHandler;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
@@ -37,6 +36,8 @@ import
org.apache.pinot.segment.local.segment.index.readers.LongDictionary;
import org.apache.pinot.segment.local.segment.index.readers.StringDictionary;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.creator.BloomFilterCreatorProvider;
+import org.apache.pinot.segment.spi.creator.IndexCreationContext;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
import org.apache.pinot.segment.spi.index.creator.BloomFilterCreator;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
@@ -57,13 +58,15 @@ public class BloomFilterHandler implements IndexHandler {
private final SegmentMetadataImpl _segmentMetadata;
private final SegmentDirectory.Writer _segmentWriter;
private final Map<String, BloomFilterConfig> _bloomFilterConfigs;
+ private final BloomFilterCreatorProvider _indexCreatorProvider;
public BloomFilterHandler(File indexDir, SegmentMetadataImpl
segmentMetadata, IndexLoadingConfig indexLoadingConfig,
- SegmentDirectory.Writer segmentWriter) {
+ SegmentDirectory.Writer segmentWriter, BloomFilterCreatorProvider
indexCreatorProvider) {
_indexDir = indexDir;
_segmentWriter = segmentWriter;
_segmentMetadata = segmentMetadata;
_bloomFilterConfigs = indexLoadingConfig.getBloomFilterConfigs();
+ _indexCreatorProvider = indexCreatorProvider;
}
@Override
@@ -112,8 +115,9 @@ public class BloomFilterHandler implements IndexHandler {
BloomFilterConfig bloomFilterConfig = _bloomFilterConfigs.get(columnName);
LOGGER.info("Creating new bloom filter for segment: {}, column: {} with
config: {}", segmentName, columnName,
bloomFilterConfig);
- try (BloomFilterCreator bloomFilterCreator = new
OnHeapGuavaBloomFilterCreator(_indexDir, columnName,
- columnMetadata.getCardinality(), bloomFilterConfig);
+ try (BloomFilterCreator bloomFilterCreator =
_indexCreatorProvider.newBloomFilterCreator(
+
IndexCreationContext.builder().withIndexDir(_indexDir).withColumnMetadata(columnMetadata)
+ .build().forBloomFilter(bloomFilterConfig));
Dictionary dictionary = getDictionaryReader(columnMetadata,
_segmentWriter)) {
int length = dictionary.length();
for (int i = 0; i < length; i++) {
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/FSTIndexHandler.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/FSTIndexHandler.java
index 50f2b67..292bf0c 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/FSTIndexHandler.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/FSTIndexHandler.java
@@ -24,15 +24,15 @@ import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.io.FileUtils;
-import
org.apache.pinot.segment.local.segment.creator.impl.inv.text.LuceneFSTIndexCreator;
import org.apache.pinot.segment.local.segment.index.loader.IndexHandler;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
import org.apache.pinot.segment.local.segment.index.loader.SegmentPreProcessor;
-import org.apache.pinot.segment.local.utils.nativefst.NativeFSTIndexCreator;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.segment.spi.creator.IndexCreationContext;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
+import org.apache.pinot.segment.spi.creator.TextIndexCreatorProvider;
import org.apache.pinot.segment.spi.index.creator.TextIndexCreator;
import org.apache.pinot.segment.spi.index.reader.Dictionary;
import org.apache.pinot.segment.spi.store.ColumnIndexType;
@@ -71,14 +71,16 @@ public class FSTIndexHandler implements IndexHandler {
private final SegmentDirectory.Writer _segmentWriter;
private final Set<String> _columnsToAddIdx;
private final FSTType _fstType;
+ private final TextIndexCreatorProvider _indexCreatorProvider;
public FSTIndexHandler(File indexDir, SegmentMetadata segmentMetadata,
IndexLoadingConfig indexLoadingConfig,
- SegmentDirectory.Writer segmentWriter, FSTType fstType) {
+ SegmentDirectory.Writer segmentWriter, FSTType fstType,
TextIndexCreatorProvider indexCreatorProvider) {
_indexDir = indexDir;
_segmentMetadata = segmentMetadata;
_segmentWriter = segmentWriter;
_columnsToAddIdx = new HashSet<>(indexLoadingConfig.getFSTIndexColumns());
_fstType = fstType;
+ _indexCreatorProvider = indexCreatorProvider;
}
@Override
@@ -136,12 +138,9 @@ public class FSTIndexHandler implements IndexHandler {
LOGGER.info("Creating new FST index for column: {} in segment: {},
cardinality: {}", column, segmentName,
columnMetadata.getCardinality());
- TextIndexCreator fstIndexCreator;
- if (_fstType == FSTType.LUCENE) {
- fstIndexCreator = new LuceneFSTIndexCreator(_indexDir, column, null);
- } else {
- fstIndexCreator = new NativeFSTIndexCreator(_indexDir, column, null);
- }
+ TextIndexCreator fstIndexCreator =
_indexCreatorProvider.newTextIndexCreator(
+
IndexCreationContext.builder().withIndexDir(_indexDir).withColumnMetadata(columnMetadata)
+ .build().forFSTIndex(_fstType, null));
try (Dictionary dictionary = LoaderUtils.getDictionary(_segmentWriter,
columnMetadata)) {
for (int dictId = 0; dictId < dictionary.length(); dictId++) {
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/H3IndexHandler.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/H3IndexHandler.java
index ef96114..d2fceaa 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/H3IndexHandler.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/H3IndexHandler.java
@@ -32,7 +32,10 @@ import
org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
import org.apache.pinot.segment.local.utils.GeometrySerializer;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
+import org.apache.pinot.segment.spi.index.creator.GeoSpatialIndexCreator;
import org.apache.pinot.segment.spi.index.creator.H3IndexConfig;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.segment.spi.index.reader.Dictionary;
@@ -53,13 +56,15 @@ public class H3IndexHandler implements IndexHandler {
private final SegmentMetadataImpl _segmentMetadata;
private final SegmentDirectory.Writer _segmentWriter;
private final Map<String, H3IndexConfig> _h3Configs;
+ private final IndexCreatorProvider _indexCreatorProvider;
public H3IndexHandler(File indexDir, SegmentMetadataImpl segmentMetadata,
IndexLoadingConfig indexLoadingConfig,
- SegmentDirectory.Writer segmentWriter) {
+ SegmentDirectory.Writer segmentWriter, IndexCreatorProvider
indexCreatorProvider) {
_indexDir = indexDir;
_segmentMetadata = segmentMetadata;
_segmentWriter = segmentWriter;
_h3Configs = indexLoadingConfig.getH3IndexConfigs();
+ _indexCreatorProvider = indexCreatorProvider;
}
@Override
@@ -129,8 +134,9 @@ public class H3IndexHandler implements IndexHandler {
try (ForwardIndexReader forwardIndexReader =
LoaderUtils.getForwardIndexReader(_segmentWriter, columnMetadata);
ForwardIndexReaderContext readerContext =
forwardIndexReader.createContext();
Dictionary dictionary = LoaderUtils.getDictionary(_segmentWriter,
columnMetadata);
- OffHeapH3IndexCreator h3IndexCreator = new
OffHeapH3IndexCreator(_indexDir, columnName,
- _h3Configs.get(columnName).getResolution())) {
+ GeoSpatialIndexCreator h3IndexCreator =
_indexCreatorProvider.newGeoSpatialIndexCreator(
+
IndexCreationContext.builder().withIndexDir(_indexDir).withColumnMetadata(columnMetadata)
+ .build().forGeospatialIndex(_h3Configs.get(columnName)))) {
int numDocs = columnMetadata.getTotalDocs();
for (int i = 0; i < numDocs; i++) {
int dictId = forwardIndexReader.getDictId(i, readerContext);
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/InvertedIndexHandler.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/InvertedIndexHandler.java
index ee9d78f..693ca32 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/InvertedIndexHandler.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/InvertedIndexHandler.java
@@ -23,14 +23,16 @@ import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.io.FileUtils;
-import
org.apache.pinot.segment.local.segment.creator.impl.inv.OffHeapBitmapInvertedIndexCreator;
import org.apache.pinot.segment.local.segment.index.loader.IndexHandler;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.creator.InvertedIndexCreatorProvider;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
+import
org.apache.pinot.segment.spi.index.creator.DictionaryBasedInvertedIndexCreator;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
import org.apache.pinot.segment.spi.store.ColumnIndexType;
@@ -47,13 +49,15 @@ public class InvertedIndexHandler implements IndexHandler {
private final SegmentMetadata _segmentMetadata;
private final SegmentDirectory.Writer _segmentWriter;
private final HashSet<String> _columnsToAddIdx;
+ private final InvertedIndexCreatorProvider _indexCreatorProvider;
public InvertedIndexHandler(File indexDir, SegmentMetadata segmentMetadata,
IndexLoadingConfig indexLoadingConfig,
- SegmentDirectory.Writer segmentWriter) {
+ SegmentDirectory.Writer segmentWriter, InvertedIndexCreatorProvider
indexCreatorProvider) {
_indexDir = indexDir;
_segmentMetadata = segmentMetadata;
_segmentWriter = segmentWriter;
_columnsToAddIdx = new
HashSet<>(indexLoadingConfig.getInvertedIndexColumns());
+ _indexCreatorProvider = indexCreatorProvider;
}
@Override
@@ -100,9 +104,9 @@ public class InvertedIndexHandler implements IndexHandler {
// Create new inverted index for the column.
LOGGER.info("Creating new inverted index for segment: {}, column: {}",
segmentName, column);
int numDocs = columnMetadata.getTotalDocs();
- try (OffHeapBitmapInvertedIndexCreator creator = new
OffHeapBitmapInvertedIndexCreator(_indexDir,
- columnMetadata.getFieldSpec(), columnMetadata.getCardinality(),
numDocs,
- columnMetadata.getTotalNumberOfEntries())) {
+ try (DictionaryBasedInvertedIndexCreator creator =
_indexCreatorProvider.newInvertedIndexCreator(
+
IndexCreationContext.builder().withIndexDir(_indexDir).withColumnMetadata(columnMetadata).build()
+ .forInvertedIndex())) {
try (ForwardIndexReader forwardIndexReader =
LoaderUtils.getForwardIndexReader(_segmentWriter, columnMetadata);
ForwardIndexReaderContext readerContext =
forwardIndexReader.createContext()) {
if (columnMetadata.isSingleValue()) {
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/JsonIndexHandler.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/JsonIndexHandler.java
index 93d52e8..27b8ade 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/JsonIndexHandler.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/JsonIndexHandler.java
@@ -24,14 +24,16 @@ import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.io.FileUtils;
-import
org.apache.pinot.segment.local.segment.creator.impl.inv.json.OffHeapJsonIndexCreator;
import org.apache.pinot.segment.local.segment.index.loader.IndexHandler;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.creator.JsonIndexCreatorProvider;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
+import org.apache.pinot.segment.spi.index.creator.JsonIndexCreator;
import org.apache.pinot.segment.spi.index.reader.Dictionary;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
@@ -50,13 +52,15 @@ public class JsonIndexHandler implements IndexHandler {
private final SegmentMetadata _segmentMetadata;
private final SegmentDirectory.Writer _segmentWriter;
private final HashSet<String> _columnsToAddIdx;
+ private final JsonIndexCreatorProvider _indexCreatorProvider;
public JsonIndexHandler(File indexDir, SegmentMetadata segmentMetadata,
IndexLoadingConfig indexLoadingConfig,
- SegmentDirectory.Writer segmentWriter) {
+ SegmentDirectory.Writer segmentWriter, JsonIndexCreatorProvider
indexCreatorProvider) {
_indexDir = indexDir;
_segmentMetadata = segmentMetadata;
_segmentWriter = segmentWriter;
_columnsToAddIdx = new HashSet<>(indexLoadingConfig.getJsonIndexColumns());
+ _indexCreatorProvider = indexCreatorProvider;
}
@Override
@@ -122,11 +126,11 @@ public class JsonIndexHandler implements IndexHandler {
private void handleDictionaryBasedColumn(ColumnMetadata columnMetadata)
throws IOException {
- String columnName = columnMetadata.getColumnName();
try (ForwardIndexReader forwardIndexReader =
LoaderUtils.getForwardIndexReader(_segmentWriter, columnMetadata);
ForwardIndexReaderContext readerContext =
forwardIndexReader.createContext();
Dictionary dictionary = LoaderUtils.getDictionary(_segmentWriter,
columnMetadata);
- OffHeapJsonIndexCreator jsonIndexCreator = new
OffHeapJsonIndexCreator(_indexDir, columnName)) {
+ JsonIndexCreator jsonIndexCreator =
_indexCreatorProvider.newJsonIndexCreator(IndexCreationContext.builder()
+
.withIndexDir(_indexDir).withColumnMetadata(columnMetadata).build().forJsonIndex()))
{
int numDocs = columnMetadata.getTotalDocs();
for (int i = 0; i < numDocs; i++) {
int dictId = forwardIndexReader.getDictId(i, readerContext);
@@ -138,10 +142,10 @@ public class JsonIndexHandler implements IndexHandler {
private void handleNonDictionaryBasedColumn(ColumnMetadata columnMetadata)
throws IOException {
- String columnName = columnMetadata.getColumnName();
try (ForwardIndexReader forwardIndexReader =
LoaderUtils.getForwardIndexReader(_segmentWriter, columnMetadata);
ForwardIndexReaderContext readerContext =
forwardIndexReader.createContext();
- OffHeapJsonIndexCreator jsonIndexCreator = new
OffHeapJsonIndexCreator(_indexDir, columnName)) {
+ JsonIndexCreator jsonIndexCreator =
_indexCreatorProvider.newJsonIndexCreator(IndexCreationContext.builder()
+
.withIndexDir(_indexDir).withColumnMetadata(columnMetadata).build().forJsonIndex()))
{
int numDocs = columnMetadata.getTotalDocs();
for (int i = 0; i < numDocs; i++) {
jsonIndexCreator.add(forwardIndexReader.getString(i, readerContext));
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/RangeIndexHandler.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/RangeIndexHandler.java
index 55c964b..3d5551d 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/RangeIndexHandler.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/RangeIndexHandler.java
@@ -23,21 +23,20 @@ import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.io.FileUtils;
-import
org.apache.pinot.segment.local.segment.creator.impl.inv.BitSlicedRangeIndexCreator;
-import
org.apache.pinot.segment.local.segment.creator.impl.inv.RangeIndexCreator;
import org.apache.pinot.segment.local.segment.index.loader.IndexHandler;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.creator.RangeIndexCreatorProvider;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
import org.apache.pinot.segment.spi.index.creator.CombinedInvertedIndexCreator;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
import org.apache.pinot.segment.spi.store.ColumnIndexType;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
-import org.apache.pinot.spi.data.FieldSpec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,14 +50,16 @@ public class RangeIndexHandler implements IndexHandler {
private final SegmentDirectory.Writer _segmentWriter;
private final Set<String> _columnsToAddIdx;
private final int _rangeIndexVersion;
+ private final RangeIndexCreatorProvider _indexCreatorProvider;
public RangeIndexHandler(File indexDir, SegmentMetadata segmentMetadata,
IndexLoadingConfig indexLoadingConfig,
- SegmentDirectory.Writer segmentWriter) {
+ SegmentDirectory.Writer segmentWriter, RangeIndexCreatorProvider
indexCreatorProvider) {
_indexDir = indexDir;
_segmentMetadata = segmentMetadata;
_segmentWriter = segmentWriter;
_columnsToAddIdx = new
HashSet<>(indexLoadingConfig.getRangeIndexColumns());
_rangeIndexVersion = indexLoadingConfig.getRangeIndexVersion();
+ _indexCreatorProvider = indexCreatorProvider;
}
@Override
@@ -125,7 +126,7 @@ public class RangeIndexHandler implements IndexHandler {
int numDocs = columnMetadata.getTotalDocs();
try (ForwardIndexReader forwardIndexReader =
LoaderUtils.getForwardIndexReader(_segmentWriter, columnMetadata);
ForwardIndexReaderContext readerContext =
forwardIndexReader.createContext();
- CombinedInvertedIndexCreator rangeIndexCreator =
newRangeIndexCreator(columnMetadata, FieldSpec.DataType.INT)) {
+ CombinedInvertedIndexCreator rangeIndexCreator =
newRangeIndexCreator(columnMetadata)) {
if (columnMetadata.isSingleValue()) {
// Single-value column
for (int i = 0; i < numDocs; i++) {
@@ -148,8 +149,7 @@ public class RangeIndexHandler implements IndexHandler {
int numDocs = columnMetadata.getTotalDocs();
try (ForwardIndexReader forwardIndexReader =
LoaderUtils.getForwardIndexReader(_segmentWriter, columnMetadata);
ForwardIndexReaderContext readerContext =
forwardIndexReader.createContext();
- CombinedInvertedIndexCreator rangeIndexCreator =
newRangeIndexCreator(columnMetadata,
- columnMetadata.getDataType())) {
+ CombinedInvertedIndexCreator rangeIndexCreator =
newRangeIndexCreator(columnMetadata)) {
if (columnMetadata.isSingleValue()) {
// Single-value column.
switch (columnMetadata.getDataType()) {
@@ -216,13 +216,10 @@ public class RangeIndexHandler implements IndexHandler {
}
}
- private CombinedInvertedIndexCreator newRangeIndexCreator(ColumnMetadata
columnMetadata, FieldSpec.DataType dataType)
+ private CombinedInvertedIndexCreator newRangeIndexCreator(ColumnMetadata
columnMetadata)
throws IOException {
- if (_rangeIndexVersion == BitSlicedRangeIndexCreator.VERSION &&
columnMetadata.isSingleValue()) {
- return new BitSlicedRangeIndexCreator(_indexDir, columnMetadata);
- }
- // default to RangeIndexCreator for the time being
- return new RangeIndexCreator(_indexDir, columnMetadata.getFieldSpec(),
dataType, -1, -1,
- columnMetadata.getTotalDocs(),
columnMetadata.getTotalNumberOfEntries());
+ return _indexCreatorProvider.newRangeIndexCreator(
+
IndexCreationContext.builder().withIndexDir(_indexDir).withColumnMetadata(columnMetadata).build()
+ .forRangeIndex(_rangeIndexVersion, columnMetadata.getMinValue(),
columnMetadata.getMaxValue()));
}
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/TextIndexHandler.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/TextIndexHandler.java
index 19e3a91..6e7c64e 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/TextIndexHandler.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/TextIndexHandler.java
@@ -40,13 +40,14 @@ import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
-import
org.apache.pinot.segment.local.segment.creator.impl.text.LuceneTextIndexCreator;
import org.apache.pinot.segment.local.segment.index.loader.IndexHandler;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
import org.apache.pinot.segment.local.segment.index.loader.SegmentPreProcessor;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.creator.TextIndexCreatorProvider;
import org.apache.pinot.segment.spi.index.creator.TextIndexCreator;
import org.apache.pinot.segment.spi.index.reader.Dictionary;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
@@ -85,13 +86,15 @@ public class TextIndexHandler implements IndexHandler {
private final SegmentMetadata _segmentMetadata;
private final SegmentDirectory.Writer _segmentWriter;
private final Set<String> _columnsToAddIdx;
+ private final TextIndexCreatorProvider _textIndexCreatorProvider;
public TextIndexHandler(File indexDir, SegmentMetadata segmentMetadata,
IndexLoadingConfig indexLoadingConfig,
- SegmentDirectory.Writer segmentWriter) {
+ SegmentDirectory.Writer segmentWriter, TextIndexCreatorProvider
textIndexCreatorProvider) {
_indexDir = indexDir;
_segmentMetadata = segmentMetadata;
_segmentWriter = segmentWriter;
_columnsToAddIdx = new HashSet<>(indexLoadingConfig.getTextIndexColumns());
+ _textIndexCreatorProvider = textIndexCreatorProvider;
}
@Override
@@ -144,7 +147,8 @@ public class TextIndexHandler implements IndexHandler {
// based on segmentVersion.
try (ForwardIndexReader forwardIndexReader =
LoaderUtils.getForwardIndexReader(_segmentWriter, columnMetadata);
ForwardIndexReaderContext readerContext =
forwardIndexReader.createContext();
- LuceneTextIndexCreator textIndexCreator = new
LuceneTextIndexCreator(column, segmentDirectory, true)) {
+ TextIndexCreator textIndexCreator =
_textIndexCreatorProvider.newTextIndexCreator(IndexCreationContext.builder()
+
.withColumnMetadata(columnMetadata).withIndexDir(segmentDirectory).build().forTextIndex(true)))
{
if (columnMetadata.isSingleValue()) {
processSVField(hasDictionary, forwardIndexReader, readerContext,
textIndexCreator, numDocs, columnMetadata);
} else {
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/impl/IndexCreatorOverrideTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/impl/IndexCreatorOverrideTest.java
new file mode 100644
index 0000000..6e32884
--- /dev/null
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/impl/IndexCreatorOverrideTest.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.UUID;
+import
org.apache.pinot.segment.local.segment.creator.impl.inv.OffHeapBitmapInvertedIndexCreator;
+import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
+import org.apache.pinot.segment.spi.creator.IndexCreatorProviders;
+import
org.apache.pinot.segment.spi.index.creator.DictionaryBasedInvertedIndexCreator;
+import org.apache.pinot.segment.spi.index.metadata.ColumnMetadataImpl;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+import static org.apache.commons.io.FileUtils.deleteQuietly;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+public class IndexCreatorOverrideTest {
+
+ private File _file;
+
+ @BeforeTest
+ public void before()
+ throws IOException {
+ _file = Files.createTempFile("IndexCreatorOverrideTest",
UUID.randomUUID().toString()).toFile();
+ }
+
+ @AfterTest
+ public void cleanup() {
+ deleteQuietly(_file);
+ }
+
+ @Test
+ public void testOverrideInvertedIndexCreation()
+ throws IOException {
+ DictionaryBasedInvertedIndexCreator highCardinalityInvertedIndex =
mock(DictionaryBasedInvertedIndexCreator.class);
+ IndexCreatorProvider provider = new IndexCreatorProviders.Default() {
+ @Override
+ public DictionaryBasedInvertedIndexCreator
newInvertedIndexCreator(IndexCreationContext.Inverted context)
+ throws IOException {
+ if (context.getCardinality() >= 10000) {
+ return highCardinalityInvertedIndex;
+ }
+ return super.newInvertedIndexCreator(context);
+ }
+ };
+
mockStatic(IndexCreatorProviders.class).when(IndexCreatorProviders::getIndexCreatorProvider).thenReturn(provider);
+ IndexCreationContext.Inverted highCardinalityContext =
newContext(Integer.MAX_VALUE);
+
assertEquals(IndexCreatorProviders.getIndexCreatorProvider().newInvertedIndexCreator(highCardinalityContext),
+ highCardinalityInvertedIndex);
+ IndexCreationContext.Inverted lowCardinalityContext = newContext(1);
+ assertTrue(IndexCreatorProviders.getIndexCreatorProvider()
+ .newInvertedIndexCreator(lowCardinalityContext) instanceof
OffHeapBitmapInvertedIndexCreator);
+ }
+
+ private IndexCreationContext.Inverted newContext(int cardinality) {
+ FieldSpec fieldSpec = new DimensionFieldSpec("test",
FieldSpec.DataType.INT, true);
+ return IndexCreationContext.builder().withIndexDir(_file)
+
.withColumnMetadata(ColumnMetadataImpl.builder().setFieldSpec(fieldSpec).setCardinality(cardinality).build())
+ .build().forInvertedIndex();
+ }
+}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/BitSlicedIndexCreatorTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/BitSlicedIndexCreatorTest.java
index c06ebee..0ad213c 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/BitSlicedIndexCreatorTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/BitSlicedIndexCreatorTest.java
@@ -65,14 +65,12 @@ public class BitSlicedIndexCreatorTest {
@Test(expectedExceptions = IllegalArgumentException.class)
public void testFailToCreateRawString() {
- new BitSlicedRangeIndexCreator(INDEX_DIR, new ColumnMetadataImpl.Builder()
- .setFieldSpec(new DimensionFieldSpec("foo", STRING, true)).build());
+ new BitSlicedRangeIndexCreator(INDEX_DIR, new DimensionFieldSpec("foo",
STRING, true), null, null);
}
@Test(expectedExceptions = IllegalArgumentException.class)
public void testFailToCreateMV() {
- new BitSlicedRangeIndexCreator(INDEX_DIR, new ColumnMetadataImpl.Builder()
- .setFieldSpec(new DimensionFieldSpec("foo", INT, false)).build());
+ new BitSlicedRangeIndexCreator(INDEX_DIR, new DimensionFieldSpec("foo",
INT, false), 0, 10);
}
@Test
@@ -153,7 +151,7 @@ public class BitSlicedIndexCreatorTest {
private void testInt(Dataset<int[]> dataset)
throws IOException {
ColumnMetadata metadata = dataset.toColumnMetadata();
- try (BitSlicedRangeIndexCreator creator = new
BitSlicedRangeIndexCreator(INDEX_DIR, metadata)) {
+ try (BitSlicedRangeIndexCreator creator =
newBitslicedIndexCreator(metadata)) {
for (int value : dataset.values()) {
creator.add(value);
}
@@ -181,7 +179,7 @@ public class BitSlicedIndexCreatorTest {
private void testLong(Dataset<long[]> dataset)
throws IOException {
ColumnMetadata metadata = dataset.toColumnMetadata();
- try (BitSlicedRangeIndexCreator creator = new
BitSlicedRangeIndexCreator(INDEX_DIR, metadata)) {
+ try (BitSlicedRangeIndexCreator creator =
newBitslicedIndexCreator(metadata)) {
for (long value : dataset.values()) {
creator.add(value);
}
@@ -209,7 +207,7 @@ public class BitSlicedIndexCreatorTest {
private void testFloat(Dataset<float[]> dataset)
throws IOException {
ColumnMetadata metadata = dataset.toColumnMetadata();
- try (BitSlicedRangeIndexCreator creator = new
BitSlicedRangeIndexCreator(INDEX_DIR, metadata)) {
+ try (BitSlicedRangeIndexCreator creator =
newBitslicedIndexCreator(metadata)) {
for (float value : dataset.values()) {
creator.add(value);
}
@@ -237,7 +235,7 @@ public class BitSlicedIndexCreatorTest {
private void testDouble(Dataset<double[]> dataset)
throws IOException {
ColumnMetadata metadata = dataset.toColumnMetadata();
- try (BitSlicedRangeIndexCreator creator = new
BitSlicedRangeIndexCreator(INDEX_DIR, metadata)) {
+ try (BitSlicedRangeIndexCreator creator =
newBitslicedIndexCreator(metadata)) {
for (double value : dataset.values()) {
creator.add(value);
}
@@ -262,6 +260,12 @@ public class BitSlicedIndexCreatorTest {
}
}
+ private static BitSlicedRangeIndexCreator
newBitslicedIndexCreator(ColumnMetadata metadata) {
+ return metadata.hasDictionary() ? new BitSlicedRangeIndexCreator(INDEX_DIR,
+ metadata.getFieldSpec(), metadata.getCardinality()) : new
BitSlicedRangeIndexCreator(INDEX_DIR,
+ metadata.getFieldSpec(), metadata.getMinValue(),
metadata.getMaxValue());
+ }
+
enum Distribution {
NORMAL {
@Override
diff --git a/pinot-segment-spi/pom.xml b/pinot-segment-spi/pom.xml
index 1af8184..89a56d9 100644
--- a/pinot-segment-spi/pom.xml
+++ b/pinot-segment-spi/pom.xml
@@ -93,5 +93,11 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
</project>
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/BloomFilterCreatorProvider.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/BloomFilterCreatorProvider.java
new file mode 100644
index 0000000..ba23359
--- /dev/null
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/BloomFilterCreatorProvider.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.creator;
+
+import java.io.IOException;
+import org.apache.pinot.segment.spi.index.creator.BloomFilterCreator;
+
+
+public interface BloomFilterCreatorProvider {
+ /**
+ * Creates a {@see BloomFilterCreator} from information about index creation.
+ * This allows a plugin to pattern match index creation information to select
+ * an appropriate implementation.
+ * @param context context about the index creation.
+ * @return a {@see ForwardIndexCreator}
+ * @throws IOException whenever something goes wrong matching or
constructing the creator
+ */
+ BloomFilterCreator newBloomFilterCreator(IndexCreationContext.BloomFilter
context)
+ throws IOException;
+}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/ForwardIndexCreatorProvider.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/ForwardIndexCreatorProvider.java
new file mode 100644
index 0000000..8c15dbb
--- /dev/null
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/ForwardIndexCreatorProvider.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.creator;
+
+import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
+
+
+public interface ForwardIndexCreatorProvider {
+ /**
+ * Creates a {@see ForwardIndexCreator} from information about index
creation.
+ * This allows a plugin to pattern match index creation information to select
+ * an appropriate implementation.
+ * @param context context about the index creation.
+ * @return a {@see ForwardIndexCreator}
+ * @throws Exception whenever something goes wrong matching or constructing
the creator
+ */
+ ForwardIndexCreator newForwardIndexCreator(IndexCreationContext.Forward
context)
+ throws Exception;
+}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/GeoSpatialIndexCreatorProvider.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/GeoSpatialIndexCreatorProvider.java
new file mode 100644
index 0000000..34341a1
--- /dev/null
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/GeoSpatialIndexCreatorProvider.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.creator;
+
+import java.io.IOException;
+import org.apache.pinot.segment.spi.index.creator.GeoSpatialIndexCreator;
+
+
+public interface GeoSpatialIndexCreatorProvider {
+
+ /**
+ * Creates a {@see GeoSpatialIndexCreator} from information about index
creation.
+ * This allows a plugin to pattern match index creation information to select
+ * an appropriate implementation.
+ * @param context context about the index creation.
+ * @return a {@see ForwardIndexCreator}
+ * @throws IOException whenever something goes wrong matching or
constructing the creator
+ */
+ GeoSpatialIndexCreator
newGeoSpatialIndexCreator(IndexCreationContext.Geospatial context)
+ throws IOException;
+}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java
new file mode 100644
index 0000000..c1a20ba
--- /dev/null
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java
@@ -0,0 +1,467 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.creator;
+
+import java.io.File;
+import java.util.Map;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.spi.ColumnMetadata;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.index.creator.H3IndexConfig;
+import org.apache.pinot.spi.config.table.BloomFilterConfig;
+import org.apache.pinot.spi.config.table.FSTType;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/**
+ * Provides parameters for constructing indexes via {@see
IndexCreatorProvider}.
+ * The responsibility for ensuring that the correct parameters for a particular
+ * index type lies with the caller.
+ */
+public interface IndexCreationContext {
+
+ FieldSpec getFieldSpec();
+
+ File getIndexDir();
+
+ boolean isOnHeap();
+
+ int getLengthOfLongestEntry();
+
+ int getMaxNumberOfMultiValueElements();
+
+ int getMaxRowLengthInBytes();
+
+ boolean isSorted();
+
+ int getCardinality();
+
+ int getTotalNumberOfEntries();
+
+ int getTotalDocs();
+
+ boolean hasDictionary();
+
+ final class Builder {
+ private File _indexDir;
+ private int _lengthOfLongestEntry;
+ private int _maxNumberOfMultiValueElements;
+ private int _maxRowLengthInBytes;
+ private boolean _onHeap = false;
+ private FieldSpec _fieldSpec;
+ private boolean _sorted;
+ private int _cardinality;
+ private int _totalNumberOfEntries;
+ private int _totalDocs;
+ private boolean _hasDictionary = true;
+
+ public Builder withColumnIndexCreationInfo(ColumnIndexCreationInfo
columnIndexCreationInfo) {
+ return
withLengthOfLongestEntry(columnIndexCreationInfo.getLengthOfLongestEntry())
+
.withMaxNumberOfMultiValueElements(columnIndexCreationInfo.getMaxNumberOfMultiValueElements())
+
.withMaxRowLengthInBytes(columnIndexCreationInfo.getMaxRowLengthInBytes());
+ }
+
+ public Builder withIndexDir(File indexDir) {
+ _indexDir = indexDir;
+ return this;
+ }
+
+ public Builder onHeap(boolean onHeap) {
+ _onHeap = onHeap;
+ return this;
+ }
+
+ public Builder withColumnMetadata(ColumnMetadata columnMetadata) {
+ return withFieldSpec(columnMetadata.getFieldSpec())
+ .sorted(columnMetadata.isSorted())
+ .withCardinality(columnMetadata.getCardinality())
+ .withTotalNumberOfEntries(columnMetadata.getTotalNumberOfEntries())
+ .withTotalDocs(columnMetadata.getTotalDocs())
+ .withDictionary(columnMetadata.hasDictionary());
+ }
+
+ public Builder withLengthOfLongestEntry(int lengthOfLongestEntry) {
+ _lengthOfLongestEntry = lengthOfLongestEntry;
+ return this;
+ }
+
+ public Builder withMaxNumberOfMultiValueElements(int
maxNumberOfMultiValueElements) {
+ _maxNumberOfMultiValueElements = maxNumberOfMultiValueElements;
+ return this;
+ }
+
+ public Builder withMaxRowLengthInBytes(int maxRowLengthInBytes) {
+ _maxRowLengthInBytes = maxRowLengthInBytes;
+ return this;
+ }
+
+ public Builder withFieldSpec(FieldSpec fieldSpec) {
+ _fieldSpec = fieldSpec;
+ return this;
+ }
+
+ public Builder sorted(boolean sorted) {
+ _sorted = sorted;
+ return this;
+ }
+
+ public Builder withCardinality(int cardinality) {
+ _cardinality = cardinality;
+ return this;
+ }
+
+ public Builder withTotalNumberOfEntries(int totalNumberOfEntries) {
+ _totalNumberOfEntries = totalNumberOfEntries;
+ return this;
+ }
+
+ public Builder withTotalDocs(int totalDocs) {
+ _totalDocs = totalDocs;
+ return this;
+ }
+
+ public Builder withDictionary(boolean hasDictionary) {
+ _hasDictionary = hasDictionary;
+ return this;
+ }
+
+ public Common build() {
+ return new Common(Objects.requireNonNull(_indexDir),
+ _lengthOfLongestEntry, _maxNumberOfMultiValueElements,
_maxRowLengthInBytes,
+ _onHeap, Objects.requireNonNull(_fieldSpec),
+ _sorted, _cardinality, _totalNumberOfEntries, _totalDocs,
_hasDictionary);
+ }
+ }
+
+ static Builder builder() {
+ return new Builder();
+ }
+
+ final class Common implements IndexCreationContext {
+
+ private final File _indexDir;
+ private final int _lengthOfLongestEntry;
+ private final int _maxNumberOfMultiValueElements;
+ private final int _maxRowLengthInBytes;
+ private final boolean _onHeap;
+ private final FieldSpec _fieldSpec;
+ private final boolean _sorted;
+ private final int _cardinality;
+ private final int _totalNumberOfEntries;
+ private final int _totalDocs;
+ private final boolean _hasDictionary;
+
+ public Common(File indexDir, int lengthOfLongestEntry,
+ int maxNumberOfMultiValueElements, int maxRowLengthInBytes, boolean
onHeap,
+ FieldSpec fieldSpec, boolean sorted, int cardinality, int
totalNumberOfEntries,
+ int totalDocs, boolean hasDictionary) {
+ _indexDir = indexDir;
+ _lengthOfLongestEntry = lengthOfLongestEntry;
+ _maxNumberOfMultiValueElements = maxNumberOfMultiValueElements;
+ _maxRowLengthInBytes = maxRowLengthInBytes;
+ _onHeap = onHeap;
+ _fieldSpec = fieldSpec;
+ _sorted = sorted;
+ _cardinality = cardinality;
+ _totalNumberOfEntries = totalNumberOfEntries;
+ _totalDocs = totalDocs;
+ _hasDictionary = hasDictionary;
+ }
+
+ public FieldSpec getFieldSpec() {
+ return _fieldSpec;
+ }
+
+ public File getIndexDir() {
+ return _indexDir;
+ }
+
+ public boolean isOnHeap() {
+ return _onHeap;
+ }
+
+ public int getLengthOfLongestEntry() {
+ return _lengthOfLongestEntry;
+ }
+
+ public int getMaxNumberOfMultiValueElements() {
+ return _maxNumberOfMultiValueElements;
+ }
+
+ public int getMaxRowLengthInBytes() {
+ return _maxRowLengthInBytes;
+ }
+
+ public boolean isSorted() {
+ return _sorted;
+ }
+
+ public int getCardinality() {
+ return _cardinality;
+ }
+
+ public int getTotalNumberOfEntries() {
+ return _totalNumberOfEntries;
+ }
+
+ public int getTotalDocs() {
+ return _totalDocs;
+ }
+
+ public boolean hasDictionary() {
+ return _hasDictionary;
+ }
+
+ public BloomFilter forBloomFilter(BloomFilterConfig bloomFilterConfig) {
+ return new BloomFilter(this, bloomFilterConfig);
+ }
+
+ public Forward forForwardIndex(ChunkCompressionType chunkCompressionType,
+ @Nullable Map<String, Map<String, String>> columnProperties) {
+ return new Forward(this, chunkCompressionType, columnProperties);
+ }
+
+ public Text forFSTIndex(FSTType fstType, String[]
sortedUniqueElementsArray) {
+ return new Text(this, fstType, sortedUniqueElementsArray);
+ }
+
+ public Geospatial forGeospatialIndex(H3IndexConfig h3IndexConfig) {
+ return new Geospatial(this, h3IndexConfig);
+ }
+
+ public Inverted forInvertedIndex() {
+ return new Inverted(this);
+ }
+
+ public Json forJsonIndex() {
+ return new Json(this);
+ }
+
+ public Range forRangeIndex(int rangeIndexVersion, Comparable<?> min,
Comparable<?> max) {
+ return new Range(this, rangeIndexVersion, min, max);
+ }
+
+ public Text forTextIndex(boolean commitOnClose) {
+ return new Text(this, commitOnClose);
+ }
+ }
+
+ class Wrapper implements IndexCreationContext {
+
+ private final IndexCreationContext _delegate;
+
+ Wrapper(IndexCreationContext delegate) {
+ _delegate = delegate;
+ }
+
+ @Override
+ public FieldSpec getFieldSpec() {
+ return _delegate.getFieldSpec();
+ }
+
+ @Override
+ public File getIndexDir() {
+ return _delegate.getIndexDir();
+ }
+
+ @Override
+ public boolean isOnHeap() {
+ return _delegate.isOnHeap();
+ }
+
+ @Override
+ public int getLengthOfLongestEntry() {
+ return _delegate.getLengthOfLongestEntry();
+ }
+
+ @Override
+ public int getMaxNumberOfMultiValueElements() {
+ return _delegate.getMaxNumberOfMultiValueElements();
+ }
+
+ @Override
+ public int getMaxRowLengthInBytes() {
+ return _delegate.getMaxRowLengthInBytes();
+ }
+
+ @Override
+ public boolean isSorted() {
+ return _delegate.isSorted();
+ }
+
+ @Override
+ public int getCardinality() {
+ return _delegate.getCardinality();
+ }
+
+ @Override
+ public int getTotalNumberOfEntries() {
+ return _delegate.getTotalNumberOfEntries();
+ }
+
+ @Override
+ public int getTotalDocs() {
+ return _delegate.getTotalDocs();
+ }
+
+ @Override
+ public boolean hasDictionary() {
+ return _delegate.hasDictionary();
+ }
+ }
+
+ class BloomFilter extends Wrapper {
+
+ private final BloomFilterConfig _bloomFilterConfig;
+
+ public BloomFilter(IndexCreationContext wrapped, BloomFilterConfig
bloomFilterConfig) {
+ super(wrapped);
+ _bloomFilterConfig = bloomFilterConfig;
+ }
+
+ @Nullable
+ public BloomFilterConfig getBloomFilterConfig() {
+ return _bloomFilterConfig;
+ }
+ }
+
+ class Forward extends Wrapper {
+
+ private final ChunkCompressionType _chunkCompressionType;
+ private final Map<String, Map<String, String>> _columnProperties;
+
+ Forward(IndexCreationContext delegate,
+ ChunkCompressionType chunkCompressionType,
+ @Nullable Map<String, Map<String, String>> columnProperties) {
+ super(delegate);
+ _chunkCompressionType = chunkCompressionType;
+ _columnProperties = columnProperties;
+ }
+
+ public ChunkCompressionType getChunkCompressionType() {
+ return _chunkCompressionType;
+ }
+
+ @Nullable
+ public Map<String, Map<String, String>> getColumnProperties() {
+ return _columnProperties;
+ }
+ }
+
+ class Geospatial extends Wrapper {
+
+ private final H3IndexConfig _h3IndexConfig;
+
+ Geospatial(IndexCreationContext delegate, H3IndexConfig h3IndexConfig) {
+ super(delegate);
+ _h3IndexConfig = h3IndexConfig;
+ }
+
+ public H3IndexConfig getH3IndexConfig() {
+ return _h3IndexConfig;
+ }
+ }
+
+ class Inverted extends Wrapper {
+
+ Inverted(IndexCreationContext delegate) {
+ super(delegate);
+ }
+ }
+
+ class Json extends Wrapper {
+
+ Json(IndexCreationContext delegate) {
+ super(delegate);
+ }
+ }
+
+ class Range extends Wrapper {
+
+ private final Comparable<?> _min;
+ private final Comparable<?> _max;
+ private final int _rangeIndexVersion;
+
+
+ Range(IndexCreationContext delegate, int rangeIndexVersion, Comparable<?>
min, Comparable<?> max) {
+ super(delegate);
+ _rangeIndexVersion = rangeIndexVersion;
+ _min = min;
+ _max = max;
+ }
+
+ public Comparable<?> getMin() {
+ return _min;
+ }
+
+ public Comparable<?> getMax() {
+ return _max;
+ }
+
+ public int getRangeIndexVersion() {
+ return _rangeIndexVersion;
+ }
+ }
+
+ class Text extends Wrapper {
+ private final boolean _commitOnClose;
+ private final boolean _isFst;
+ private final FSTType _fstType;
+ private final String[] _sortedUniqueElementsArray;
+
+ /**
+ * For text indexes
+ */
+ public Text(IndexCreationContext wrapped, boolean commitOnClose) {
+ super(wrapped);
+ _commitOnClose = commitOnClose;
+ _fstType = null;
+ _sortedUniqueElementsArray = null;
+ _isFst = false;
+ }
+
+ /**
+ * For FST indexes
+ */
+ public Text(IndexCreationContext wrapped, FSTType fstType, String[]
sortedUniqueElementsArray) {
+ super(wrapped);
+ _commitOnClose = true;
+ _fstType = fstType;
+ _sortedUniqueElementsArray = sortedUniqueElementsArray;
+ _isFst = true;
+ }
+
+ public boolean isCommitOnClose() {
+ return _commitOnClose;
+ }
+
+ public FSTType getFstType() {
+ return _fstType;
+ }
+
+ public boolean isFst() {
+ return _isFst;
+ }
+
+ public String[] getSortedUniqueElementsArray() {
+ return _sortedUniqueElementsArray;
+ }
+ }
+}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreatorProvider.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreatorProvider.java
new file mode 100644
index 0000000..4dd1a02
--- /dev/null
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreatorProvider.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.creator;
+
+/**
+ * Plugin interface to abstract index creation.
+ */
+public interface IndexCreatorProvider
+ extends ForwardIndexCreatorProvider, InvertedIndexCreatorProvider,
JsonIndexCreatorProvider,
+ TextIndexCreatorProvider, GeoSpatialIndexCreatorProvider,
RangeIndexCreatorProvider,
+ BloomFilterCreatorProvider {
+}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreatorProviders.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreatorProviders.java
new file mode 100644
index 0000000..40466f1
--- /dev/null
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreatorProviders.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.creator;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.MethodType;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.pinot.segment.spi.index.creator.BloomFilterCreator;
+import org.apache.pinot.segment.spi.index.creator.CombinedInvertedIndexCreator;
+import
org.apache.pinot.segment.spi.index.creator.DictionaryBasedInvertedIndexCreator;
+import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
+import org.apache.pinot.segment.spi.index.creator.GeoSpatialIndexCreator;
+import org.apache.pinot.segment.spi.index.creator.JsonIndexCreator;
+import org.apache.pinot.segment.spi.index.creator.TextIndexCreator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Plugin registration point to allow index creation logic to be swapped out.
+ * Plugins should not reimplement Pinot's default index creation logic.
+ * Users provide an override to Pinot's index creation logic. This is
simplified
+ * by extending {@see IndexCreatorProviders.Default}
+ */
+public final class IndexCreatorProviders {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(IndexCreatorProviders.class);
+
+ private static final IndexCreatorProvider DEFAULT = defaultProvider();
+ private static final AtomicReference<IndexCreatorProvider> REGISTRATION =
new AtomicReference<>(DEFAULT);
+
+ private IndexCreatorProviders() {
+ }
+
+ /**
+ * The caller provides a decorator to wrap the default provider, which
allows plugins to create
+ * a delegation chain.
+ * @param provider index creation override
+ * @return true if this is the first invocation and the provider has not yet
been used.
+ */
+ public static boolean registerProvider(IndexCreatorProvider provider) {
+ return REGISTRATION.compareAndSet(DEFAULT, provider);
+ }
+
+ /**
+ * Obtain the registered index creator provider. If the user has provided an
override, then it will be used instead.
+ * If the user has not provided an override yet, then this action will
prevent them from doing so.
+ * @return the global index provision logic.
+ */
+ public static IndexCreatorProvider getIndexCreatorProvider() {
+ return Holder.PROVIDER;
+ }
+
+ private static final class Holder {
+ public static final IndexCreatorProvider PROVIDER = REGISTRATION.get();
+ }
+
+ private static IndexCreatorProvider defaultProvider() {
+ // use MethodHandle to break circular dependency and keep implementation
details encapsulated within
+ // pinot-segment-local
+ String className =
"org.apache.pinot.segment.local.segment.creator.impl.DefaultIndexCreatorProvider";
+ try {
+ Class<?> clazz = Class.forName(className, false,
IndexCreatorProviders.class.getClassLoader());
+ return (IndexCreatorProvider) MethodHandles.publicLookup()
+ .findConstructor(clazz, MethodType.methodType(void.class)).invoke();
+ } catch (Throwable missing) {
+ LOGGER.error("could not construct MethodHandle for {}", className,
missing);
+ // this means pinot-segment-local isn't on the classpath, but this means
+ // no indexes will be created, so it's ok to return null
+ return null;
+ }
+ }
+
+ /**
+ * Extend this class to override index creation
+ */
+ public static class Default implements IndexCreatorProvider {
+
+ @Override
+ public BloomFilterCreator
newBloomFilterCreator(IndexCreationContext.BloomFilter context)
+ throws IOException {
+ if (DEFAULT == null) {
+ throw new UnsupportedOperationException("default implementation not
present on classpath");
+ }
+ return DEFAULT.newBloomFilterCreator(context);
+ }
+
+ @Override
+ public ForwardIndexCreator
newForwardIndexCreator(IndexCreationContext.Forward context)
+ throws Exception {
+ if (DEFAULT == null) {
+ throw new UnsupportedOperationException("default implementation not
present on classpath");
+ }
+ return DEFAULT.newForwardIndexCreator(context);
+ }
+
+ @Override
+ public GeoSpatialIndexCreator
newGeoSpatialIndexCreator(IndexCreationContext.Geospatial context)
+ throws IOException {
+ if (DEFAULT == null) {
+ throw new UnsupportedOperationException("default implementation not
present on classpath");
+ }
+ return DEFAULT.newGeoSpatialIndexCreator(context);
+ }
+
+ @Override
+ public DictionaryBasedInvertedIndexCreator
newInvertedIndexCreator(IndexCreationContext.Inverted context)
+ throws IOException {
+ if (DEFAULT == null) {
+ throw new UnsupportedOperationException("default implementation not
present on classpath");
+ }
+ return DEFAULT.newInvertedIndexCreator(context);
+ }
+
+ @Override
+ public JsonIndexCreator newJsonIndexCreator(IndexCreationContext.Json
context)
+ throws IOException {
+ if (DEFAULT == null) {
+ throw new UnsupportedOperationException("default implementation not
present on classpath");
+ }
+ return DEFAULT.newJsonIndexCreator(context);
+ }
+
+ @Override
+ public CombinedInvertedIndexCreator
newRangeIndexCreator(IndexCreationContext.Range context)
+ throws IOException {
+ if (DEFAULT == null) {
+ throw new UnsupportedOperationException("default implementation not
present on classpath");
+ }
+ return DEFAULT.newRangeIndexCreator(context);
+ }
+
+ @Override
+ public TextIndexCreator newTextIndexCreator(IndexCreationContext.Text
context)
+ throws IOException {
+ if (DEFAULT == null) {
+ throw new UnsupportedOperationException("default implementation not
present on classpath");
+ }
+ return DEFAULT.newTextIndexCreator(context);
+ }
+ }
+}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/InvertedIndexCreatorProvider.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/InvertedIndexCreatorProvider.java
new file mode 100644
index 0000000..2bf7f1a
--- /dev/null
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/InvertedIndexCreatorProvider.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.creator;
+
+import java.io.IOException;
+import
org.apache.pinot.segment.spi.index.creator.DictionaryBasedInvertedIndexCreator;
+
+
+public interface InvertedIndexCreatorProvider {
+ /**
+ * Creates a {@see DictionaryBasedInvertedIndexCreator} from information
about index creation.
+ * This allows a plugin to pattern match index creation information to select
+ * an appropriate implementation.
+ * @param context context about the index creation.
+ * @return a {@see ForwardIndexCreator}
+ * @throws IOException whenever something goes wrong matching or
constructing the creator
+ */
+ DictionaryBasedInvertedIndexCreator
newInvertedIndexCreator(IndexCreationContext.Inverted context)
+ throws IOException;
+}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/JsonIndexCreatorProvider.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/JsonIndexCreatorProvider.java
new file mode 100644
index 0000000..22e7f54
--- /dev/null
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/JsonIndexCreatorProvider.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.creator;
+
+import java.io.IOException;
+import org.apache.pinot.segment.spi.index.creator.JsonIndexCreator;
+
+
+public interface JsonIndexCreatorProvider {
+ /**
+ * Creates a {@see JsonIndexCreator} from information about index creation.
+ * This allows a plugin to pattern match index creation information to select
+ * an appropriate implementation.
+ * @param context context about the index creation.
+ * @return a {@see ForwardIndexCreator}
+ * @throws IOException whenever something goes wrong matching or
constructing the creator
+ */
+ JsonIndexCreator newJsonIndexCreator(IndexCreationContext.Json context)
+ throws IOException;
+}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/RangeIndexCreatorProvider.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/RangeIndexCreatorProvider.java
new file mode 100644
index 0000000..a3abfc6
--- /dev/null
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/RangeIndexCreatorProvider.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.creator;
+
+import java.io.IOException;
+import org.apache.pinot.segment.spi.index.creator.CombinedInvertedIndexCreator;
+
+
+public interface RangeIndexCreatorProvider {
+ /**
+ * Creates a {@see CombinedInvertedIndexCreator} from information about
index creation.
+ * This allows a plugin to pattern match index creation information to select
+ * an appropriate implementation.
+ * @param context context about the index creation.
+ * @return a {@see ForwardIndexCreator}
+ * @throws IOException whenever something goes wrong matching or
constructing the creator
+ */
+ CombinedInvertedIndexCreator newRangeIndexCreator(IndexCreationContext.Range
context)
+ throws IOException;
+}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/TextIndexCreatorProvider.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/TextIndexCreatorProvider.java
new file mode 100644
index 0000000..05f53ea
--- /dev/null
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/TextIndexCreatorProvider.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.creator;
+
+import java.io.IOException;
+import org.apache.pinot.segment.spi.index.creator.TextIndexCreator;
+
+
+public interface TextIndexCreatorProvider {
+
+ /**
+ * Creates a {@see TextIndexCreator} from information about index creation.
+ * This allows a plugin to pattern match index creation information to select
+ * an appropriate implementation.
+ * @param context context about the index creation.
+ * @return a {@see ForwardIndexCreator}
+ * @throws IOException whenever something goes wrong matching or
constructing the creator
+ */
+ TextIndexCreator newTextIndexCreator(IndexCreationContext.Text context)
+ throws IOException;
+}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/ColumnMetadataImpl.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/ColumnMetadataImpl.java
index 19dbb2a..519f7c1 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/ColumnMetadataImpl.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/ColumnMetadataImpl.java
@@ -289,6 +289,10 @@ public class ColumnMetadataImpl implements ColumnMetadata {
return builder.build();
}
+ public static Builder builder() {
+ return new Builder();
+ }
+
public static class Builder {
private FieldSpec _fieldSpec;
private int _totalDocs;
diff --git
a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/creator/IndexCreatorProvidersTest.java
b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/creator/IndexCreatorProvidersTest.java
new file mode 100644
index 0000000..bfaebb3
--- /dev/null
+++
b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/creator/IndexCreatorProvidersTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.creator;
+
+import java.io.IOException;
+import org.apache.pinot.segment.spi.index.creator.BloomFilterCreator;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+public class IndexCreatorProvidersTest {
+
+ @Test
+ public void indexCreatorProvidersLoadableWithoutDefaultImplementation()
+ throws IOException {
+ BloomFilterCreator mockBloomFilterCreator = mock(BloomFilterCreator.class);
+ assertTrue(IndexCreatorProviders.registerProvider(new
IndexCreatorProviders.Default() {
+ @Override
+ public BloomFilterCreator
newBloomFilterCreator(IndexCreationContext.BloomFilter context) {
+ return mockBloomFilterCreator;
+ }
+ }));
+ // it's ok to load external overrides without an internal implementation
present, e.g. for testing
+ assertEquals(mockBloomFilterCreator,
IndexCreatorProviders.getIndexCreatorProvider()
+ .newBloomFilterCreator(mock(IndexCreationContext.BloomFilter.class)));
+ }
+
+ @Test(expectedExceptions = UnsupportedOperationException.class)
+ public void
whenDefaultImplementationMissingThrowUnsupportedOperationException()
+ throws IOException {
+ // the implementation is missing so no indexes will be created anyway...
+ new
IndexCreatorProviders.Default().newBloomFilterCreator(mock(IndexCreationContext.BloomFilter.class));
+ }
+}
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java
index d349da8..0367203 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java
@@ -31,7 +31,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
import
org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter;
-import
org.apache.pinot.segment.local.segment.creator.impl.SegmentColumnarIndexCreator;
+import
org.apache.pinot.segment.local.segment.creator.impl.DefaultIndexCreatorProvider;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
@@ -314,7 +314,7 @@ public class DictionaryToRawIndexConverter {
int numDocs = segment.getSegmentMetadata().getTotalDocs();
int lengthOfLongestEntry = (storedType == DataType.STRING) ?
getLengthOfLongestEntry(dictionary) : -1;
- try (ForwardIndexCreator rawIndexCreator = SegmentColumnarIndexCreator
+ try (ForwardIndexCreator rawIndexCreator = DefaultIndexCreatorProvider
.getRawIndexCreatorForSVColumn(newSegment, compressionType, column,
storedType, numDocs, lengthOfLongestEntry,
false, BaseChunkSVForwardIndexWriter.DEFAULT_VERSION);
ForwardIndexReaderContext readerContext = reader.createContext()) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]