This is an automated email from the ASF dual-hosted git repository.
richardstartin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d3fe951 Make realtime indexes pluggable (#8279)
d3fe951 is described below
commit d3fe9510ba89f9d4781cce19ee081df56bdf57b3
Author: Richard Startin <[email protected]>
AuthorDate: Mon Mar 7 21:10:50 2022 +0000
Make realtime indexes pluggable (#8279)
* abstract mutable inverted, text, json indexes in order to swap them out
* make realtime JSON, forward, inverted indexes pluggable"
* review comments
---
.../pinot/queries/TextSearchQueriesTest.java | 3 +-
.../mutable/DefaultMutableIndexProvider.java | 135 +++++++++++
.../indexsegment/mutable/MutableSegmentImpl.java | 138 ++++--------
...IndexReader.java => RealtimeInvertedIndex.java} | 7 +-
.../RealtimeLuceneIndexReaderRefreshThread.java | 4 +-
.../RealtimeLuceneIndexRefreshState.java | 6 +-
...dexReader.java => RealtimeLuceneTextIndex.java} | 9 +-
...bleJsonIndex.java => MutableJsonIndexImpl.java} | 7 +-
.../creator/impl/text/LuceneTextIndexCreator.java | 4 +-
.../RealtimeInvertedIndexReaderTest.java | 2 +-
.../segment/local/segment/index/JsonIndexTest.java | 6 +-
.../pinot/segment/spi/index/IndexingOverrides.java | 63 +++++-
.../spi/index/mutable/MutableInvertedIndex.java | 32 +++
.../spi/index/mutable/MutableJsonIndex.java | 32 +++
.../spi/index/mutable/MutableTextIndex.java | 30 +++
.../provider/MutableDictionaryProvider.java | 26 +++
.../provider/MutableForwardIndexProvider.java | 26 +++
.../mutable/provider/MutableIndexContext.java | 249 +++++++++++++++++++++
.../mutable/provider/MutableIndexProvider.java | 24 ++
.../provider/MutableInvertedIndexProvider.java | 26 +++
.../mutable/provider/MutableJsonIndexProvider.java | 26 +++
.../provider/MutableTextIndexReaderProvider.java | 26 +++
22 files changed, 761 insertions(+), 120 deletions(-)
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/TextSearchQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/TextSearchQueriesTest.java
index 98f3bc5..46b8878 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/TextSearchQueriesTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/TextSearchQueriesTest.java
@@ -60,6 +60,7 @@ import
org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
import org.apache.pinot.core.operator.query.AggregationOperator;
import org.apache.pinot.core.operator.query.SelectionOnlyOperator;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import
org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneTextIndex;
import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
@@ -1299,7 +1300,7 @@ public class TextSearchQueriesTest extends
BaseQueriesTest {
/**
* Test the reference counting mechanism of {@link SearcherManager}
- * used by {@link
org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneTextIndexReader}
+ * used by {@link RealtimeLuceneTextIndex}
* for near realtime text search.
*/
@Test
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/DefaultMutableIndexProvider.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/DefaultMutableIndexProvider.java
new file mode 100644
index 0000000..bbe9a7f
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/DefaultMutableIndexProvider.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.indexsegment.mutable;
+
+import
org.apache.pinot.segment.local.realtime.impl.dictionary.MutableDictionaryFactory;
+import
org.apache.pinot.segment.local.realtime.impl.forward.FixedByteMVMutableForwardIndex;
+import
org.apache.pinot.segment.local.realtime.impl.forward.FixedByteSVMutableForwardIndex;
+import
org.apache.pinot.segment.local.realtime.impl.forward.VarByteSVMutableForwardIndex;
+import
org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeInvertedIndex;
+import org.apache.pinot.segment.local.realtime.impl.json.MutableJsonIndexImpl;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.index.mutable.MutableDictionary;
+import org.apache.pinot.segment.spi.index.mutable.MutableForwardIndex;
+import org.apache.pinot.segment.spi.index.mutable.MutableInvertedIndex;
+import org.apache.pinot.segment.spi.index.mutable.MutableJsonIndex;
+import org.apache.pinot.segment.spi.index.mutable.MutableTextIndex;
+import org.apache.pinot.segment.spi.index.mutable.provider.MutableIndexContext;
+import
org.apache.pinot.segment.spi.index.mutable.provider.MutableIndexProvider;
+import org.apache.pinot.spi.data.FieldSpec;
+
+import static org.apache.pinot.spi.data.FieldSpec.DataType.INT;
+
+
+public class DefaultMutableIndexProvider implements MutableIndexProvider {
+ // For multi-valued column, forward-index.
+ // Maximum number of multi-values per row. We assert on this.
+ private static final int MAX_MULTI_VALUES_PER_ROW = 1000;
+ private static final int
NODICT_VARIABLE_WIDTH_ESTIMATED_AVERAGE_VALUE_LENGTH_DEFAULT = 100;
+ private static final int
NODICT_VARIABLE_WIDTH_ESTIMATED_NUMBER_OF_VALUES_DEFAULT = 100_000;
+
+ @Override
+ public MutableForwardIndex newForwardIndex(MutableIndexContext.Forward
context) {
+ String column = context.getFieldSpec().getName();
+ String segmentName = context.getSegmentName();
+ FieldSpec.DataType storedType =
context.getFieldSpec().getDataType().getStoredType();
+ boolean isSingleValue = context.getFieldSpec().isSingleValueField();
+ if (!context.hasDictionary()) {
+ // No dictionary column must be single-valued
+ assert isSingleValue;
+ String allocationContext =
+ buildAllocationContext(context.getSegmentName(),
context.getFieldSpec().getName(),
+ V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
+ if (storedType.isFixedWidth()) {
+ return new FixedByteSVMutableForwardIndex(false, storedType,
context.getCapacity(), context.getMemoryManager(),
+ allocationContext);
+ } else {
+ // RealtimeSegmentStatsHistory does not have the stats for
no-dictionary columns from previous consuming
+ // segments
+ // TODO: Add support for updating RealtimeSegmentStatsHistory with
average column value size for no dictionary
+ // columns as well
+ // TODO: Use the stats to get estimated average length
+ // Use a smaller capacity as opposed to segment flush size
+ int initialCapacity = Math.min(context.getCapacity(),
NODICT_VARIABLE_WIDTH_ESTIMATED_NUMBER_OF_VALUES_DEFAULT);
+ return new VarByteSVMutableForwardIndex(storedType,
context.getMemoryManager(), allocationContext,
+ initialCapacity,
NODICT_VARIABLE_WIDTH_ESTIMATED_AVERAGE_VALUE_LENGTH_DEFAULT);
+ }
+ } else {
+ if (isSingleValue) {
+ String allocationContext = buildAllocationContext(segmentName, column,
+ V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION);
+ return new FixedByteSVMutableForwardIndex(true, INT,
context.getCapacity(), context.getMemoryManager(),
+ allocationContext);
+ } else {
+ String allocationContext = buildAllocationContext(segmentName, column,
+ V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION);
+ // TODO: Start with a smaller capacity on
FixedByteMVForwardIndexReaderWriter and let it expand
+ return new FixedByteMVMutableForwardIndex(MAX_MULTI_VALUES_PER_ROW,
context.getAvgNumMultiValues(),
+ context.getCapacity(), Integer.BYTES,
+ context.getMemoryManager(), allocationContext);
+ }
+ }
+ }
+
+ @Override
+ public MutableInvertedIndex newInvertedIndex(MutableIndexContext.Inverted
context) {
+ return new RealtimeInvertedIndex();
+ }
+
+ @Override
+ public MutableJsonIndex newJsonIndex(MutableIndexContext.Json context) {
+ return new MutableJsonIndexImpl();
+ }
+
+ @Override
+ public MutableTextIndex newTextIndex(MutableIndexContext.Text context) {
+ return null;
+ }
+
+ @Override
+ public MutableDictionary newDictionary(MutableIndexContext.Dictionary
context) {
+ String column = context.getFieldSpec().getName();
+ String segmentName = context.getSegmentName();
+ FieldSpec.DataType storedType =
context.getFieldSpec().getDataType().getStoredType();
+ int dictionaryColumnSize;
+ if (storedType.isFixedWidth()) {
+ dictionaryColumnSize = storedType.size();
+ } else {
+ dictionaryColumnSize = context.getEstimatedColSize();
+ }
+ // NOTE: preserve 10% buffer for cardinality to reduce the chance of
re-sizing the dictionary
+ int estimatedCardinality = (int) (context.getEstimatedCardinality() * 1.1);
+ String dictionaryAllocationContext =
+ buildAllocationContext(segmentName, column,
V1Constants.Dict.FILE_EXTENSION);
+ return MutableDictionaryFactory.getMutableDictionary(storedType,
context.isOffHeap(), context.getMemoryManager(),
+ dictionaryColumnSize, Math.min(estimatedCardinality,
context.getCapacity()), dictionaryAllocationContext);
+ }
+
+ /**
+ * Helper method that builds allocation context that includes segment name,
column name, and index type.
+ *
+ * @param segmentName Name of segment.
+ * @param columnName Name of column.
+ * @param indexType Index type.
+ * @return Allocation context built from segment name, column name and index
type.
+ */
+ private static String buildAllocationContext(String segmentName, String
columnName, String indexType) {
+ return segmentName + ":" + columnName + indexType;
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index c2c520c..902a9cc 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -40,15 +40,9 @@ import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
import
org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
import
org.apache.pinot.segment.local.realtime.impl.dictionary.BaseOffHeapMutableDictionary;
-import
org.apache.pinot.segment.local.realtime.impl.dictionary.MutableDictionaryFactory;
-import
org.apache.pinot.segment.local.realtime.impl.forward.FixedByteMVMutableForwardIndex;
-import
org.apache.pinot.segment.local.realtime.impl.forward.FixedByteSVMutableForwardIndex;
-import
org.apache.pinot.segment.local.realtime.impl.forward.VarByteSVMutableForwardIndex;
import org.apache.pinot.segment.local.realtime.impl.geospatial.MutableH3Index;
-import
org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeInvertedIndexReader;
import
org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneIndexRefreshState;
-import
org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneTextIndexReader;
-import org.apache.pinot.segment.local.realtime.impl.json.MutableJsonIndex;
+import
org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneTextIndex;
import
org.apache.pinot.segment.local.realtime.impl.nullvalue.MutableNullValueVector;
import
org.apache.pinot.segment.local.segment.index.datasource.ImmutableDataSource;
import
org.apache.pinot.segment.local.segment.index.datasource.MutableDataSource;
@@ -62,13 +56,18 @@ import org.apache.pinot.segment.local.utils.IdMap;
import org.apache.pinot.segment.local.utils.IngestionUtils;
import org.apache.pinot.segment.spi.MutableSegment;
import org.apache.pinot.segment.spi.SegmentMetadata;
-import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.index.IndexingOverrides;
import org.apache.pinot.segment.spi.index.creator.H3IndexConfig;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.segment.spi.index.mutable.MutableDictionary;
import org.apache.pinot.segment.spi.index.mutable.MutableForwardIndex;
+import org.apache.pinot.segment.spi.index.mutable.MutableInvertedIndex;
+import org.apache.pinot.segment.spi.index.mutable.MutableJsonIndex;
+import org.apache.pinot.segment.spi.index.mutable.MutableTextIndex;
import
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.segment.spi.index.mutable.provider.MutableIndexContext;
+import
org.apache.pinot.segment.spi.index.mutable.provider.MutableIndexProvider;
import org.apache.pinot.segment.spi.index.reader.BloomFilterReader;
import org.apache.pinot.segment.spi.index.reader.RangeIndexReader;
import org.apache.pinot.segment.spi.index.startree.StarTreeV2;
@@ -95,23 +94,17 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.pinot.spi.data.FieldSpec.DataType.BYTES;
-import static org.apache.pinot.spi.data.FieldSpec.DataType.INT;
import static org.apache.pinot.spi.data.FieldSpec.DataType.STRING;
@SuppressWarnings({"rawtypes", "unchecked"})
public class MutableSegmentImpl implements MutableSegment {
- // For multi-valued column, forward-index.
- // Maximum number of multi-values per row. We assert on this.
- private static final int MAX_MULTI_VALUES_PER_ROW = 1000;
+
private static final String RECORD_ID_MAP = "__recordIdMap__";
private static final int EXPECTED_COMPRESSION = 1000;
private static final int MIN_ROWS_TO_INDEX = 1000_000; // Min size of
recordIdMap for updatable metrics.
private static final int MIN_RECORD_ID_MAP_CACHE_SIZE = 10000; // Min
overflow map size for updatable metrics.
- private static final int
NODICT_VARIABLE_WIDTH_ESTIMATED_AVERAGE_VALUE_LENGTH_DEFAULT = 100;
- private static final int
NODICT_VARIABLE_WIDTH_ESTIMATED_NUMBER_OF_VALUES_DEFAULT = 100_000;
-
private final Logger _logger;
private final long _startTimeMillis = System.currentTimeMillis();
private final ServerMetrics _serverMetrics;
@@ -242,6 +235,11 @@ public class MutableSegmentImpl implements MutableSegment {
// Initialize for each column
for (FieldSpec fieldSpec : _physicalFieldSpecs) {
String column = fieldSpec.getName();
+ boolean isDictionary = !isNoDictionaryColumn(noDictionaryColumns,
invertedIndexColumns, fieldSpec, column);
+ MutableIndexContext.Common context =
+
MutableIndexContext.builder().withFieldSpec(fieldSpec).withMemoryManager(_memoryManager)
+
.withDictionary(isDictionary).withCapacity(_capacity).offHeap(_offHeap).withSegmentName(_segmentName)
+ .build();
// Partition info
PartitionFunction partitionFunction = null;
@@ -262,75 +260,33 @@ public class MutableSegmentImpl implements MutableSegment
{
// consuming. After consumption completes and the segment is built, all
single-value columns can have raw index
DataType storedType = fieldSpec.getDataType().getStoredType();
boolean isFixedWidthColumn = storedType.isFixedWidth();
- MutableForwardIndex forwardIndex;
- MutableDictionary dictionary;
- if (isNoDictionaryColumn(noDictionaryColumns, invertedIndexColumns,
fieldSpec, column)) {
- // No dictionary column (always single-valued)
- assert fieldSpec.isSingleValueField();
-
- dictionary = null;
- String allocationContext =
- buildAllocationContext(_segmentName, column,
V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
- if (isFixedWidthColumn) {
- forwardIndex =
- new FixedByteSVMutableForwardIndex(false, storedType, _capacity,
_memoryManager, allocationContext);
- } else {
- // RealtimeSegmentStatsHistory does not have the stats for
no-dictionary columns from previous consuming
- // segments
- // TODO: Add support for updating RealtimeSegmentStatsHistory with
average column value size for no dictionary
- // columns as well
- // TODO: Use the stats to get estimated average length
- // Use a smaller capacity as opposed to segment flush size
- int initialCapacity = Math.min(_capacity,
NODICT_VARIABLE_WIDTH_ESTIMATED_NUMBER_OF_VALUES_DEFAULT);
- forwardIndex =
- new VarByteSVMutableForwardIndex(storedType, _memoryManager,
allocationContext, initialCapacity,
-
NODICT_VARIABLE_WIDTH_ESTIMATED_AVERAGE_VALUE_LENGTH_DEFAULT);
- }
- } else {
- // Dictionary-encoded column
-
- int dictionaryColumnSize;
- if (isFixedWidthColumn) {
- dictionaryColumnSize = storedType.size();
- } else {
- dictionaryColumnSize = _statsHistory.getEstimatedAvgColSize(column);
- }
+ MutableIndexProvider indexProvider =
IndexingOverrides.getMutableIndexProvider();
+ MutableForwardIndex forwardIndex =
indexProvider.newForwardIndex(context.forForwardIndex(avgNumMultiValues));
+
+ // Dictionary-encoded column
+ MutableDictionary dictionary = null;
+ if (isDictionary) {
+ int dictionaryColumnSize =
+ isFixedWidthColumn ? storedType.size() :
_statsHistory.getEstimatedAvgColSize(column);
// NOTE: preserve 10% buffer for cardinality to reduce the chance of
re-sizing the dictionary
int estimatedCardinality = (int)
(_statsHistory.getEstimatedCardinality(column) * 1.1);
- String dictionaryAllocationContext =
- buildAllocationContext(_segmentName, column,
V1Constants.Dict.FILE_EXTENSION);
- dictionary =
- MutableDictionaryFactory.getMutableDictionary(storedType,
_offHeap, _memoryManager, dictionaryColumnSize,
- Math.min(estimatedCardinality, _capacity),
dictionaryAllocationContext);
-
- if (fieldSpec.isSingleValueField()) {
- // Single-value dictionary-encoded forward index
- String allocationContext = buildAllocationContext(_segmentName,
column,
- V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION);
- forwardIndex = new FixedByteSVMutableForwardIndex(true, INT,
_capacity, _memoryManager, allocationContext);
- } else {
- // Multi-value dictionary-encoded forward index
- String allocationContext = buildAllocationContext(_segmentName,
column,
- V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION);
- // TODO: Start with a smaller capacity on
FixedByteMVForwardIndexReaderWriter and let it expand
- forwardIndex =
- new FixedByteMVMutableForwardIndex(MAX_MULTI_VALUES_PER_ROW,
avgNumMultiValues, _capacity, Integer.BYTES,
- _memoryManager, allocationContext);
- }
-
+ dictionary =
indexProvider.newDictionary(context.forDictionary(dictionaryColumnSize,
estimatedCardinality));
// Even though the column is defined as 'no-dictionary' in the config,
we did create dictionary for consuming
// segment.
noDictionaryColumns.remove(column);
}
// Inverted index
- RealtimeInvertedIndexReader invertedIndexReader =
- invertedIndexColumns.contains(column) ? new
RealtimeInvertedIndexReader() : null;
+ MutableInvertedIndex invertedIndexReader =
+ invertedIndexColumns.contains(column) ?
indexProvider.newInvertedIndex(context.forInvertedIndex()) : null;
// Text index
- RealtimeLuceneTextIndexReader textIndex;
+ RealtimeLuceneTextIndex textIndex;
if (textIndexColumns.contains(column)) {
- textIndex = new RealtimeLuceneTextIndexReader(column, new
File(config.getConsumerDir()), _segmentName);
+ // TODO - this logic is in the wrong place and belongs in a
Lucene-specific submodule,
+ // it is beyond the scope of realtime index pluggability to do this
refactoring, so realtime
+ // text indexes remain statically defined. Revisit this after this
refactoring has been done.
+ textIndex = new RealtimeLuceneTextIndex(column, new
File(config.getConsumerDir()), _segmentName);
if (_realtimeLuceneReaders == null) {
_realtimeLuceneReaders = new
RealtimeLuceneIndexRefreshState.RealtimeLuceneReaders(_segmentName);
}
@@ -340,9 +296,11 @@ public class MutableSegmentImpl implements MutableSegment {
}
// Json index
- MutableJsonIndex jsonIndex = jsonIndexColumns.contains(column) ? new
MutableJsonIndex() : null;
+ MutableJsonIndex jsonIndex =
+ jsonIndexColumns.contains(column) ?
indexProvider.newJsonIndex(context.forJsonIndex()) : null;
// H3 index
+ // TODO consider making this overridable
MutableH3Index h3Index;
try {
H3IndexConfig h3IndexConfig = h3IndexConfigs.get(column);
@@ -360,6 +318,7 @@ public class MutableSegmentImpl implements MutableSegment {
invertedIndexReader, null, textIndex, jsonIndex, h3Index, null,
nullValueVector));
}
+ // TODO separate concerns: this logic does not belong here
if (_realtimeLuceneReaders != null) {
// add the realtime lucene index readers to the global queue for refresh
task to pick up
RealtimeLuceneIndexRefreshState realtimeLuceneIndexRefreshState =
RealtimeLuceneIndexRefreshState.getInstance();
@@ -590,7 +549,7 @@ public class MutableSegmentImpl implements MutableSegment {
forwardIndex.setDictId(docId, dictId);
// Update inverted index
- RealtimeInvertedIndexReader invertedIndex =
indexContainer._invertedIndex;
+ MutableInvertedIndex invertedIndex = indexContainer._invertedIndex;
if (invertedIndex != null) {
try {
invertedIndex.add(dictId, docId);
@@ -651,7 +610,7 @@ public class MutableSegmentImpl implements MutableSegment {
}
// Update text index
- RealtimeLuceneTextIndexReader textIndex = indexContainer._textIndex;
+ MutableTextIndex textIndex = indexContainer._textIndex;
if (textIndex != null) {
try {
textIndex.add((String) value);
@@ -691,7 +650,7 @@ public class MutableSegmentImpl implements MutableSegment {
indexContainer._forwardIndex.setDictIdMV(docId, dictIds);
// Update inverted index
- RealtimeInvertedIndexReader invertedIndex =
indexContainer._invertedIndex;
+ MutableInvertedIndex invertedIndex = indexContainer._invertedIndex;
if (invertedIndex != null) {
for (int dictId : dictIds) {
try {
@@ -954,7 +913,7 @@ public class MutableSegmentImpl implements MutableSegment {
IntArrays.quickSort(dictIds, dictionary::compare);
// Re-order documents using the inverted index
- RealtimeInvertedIndexReader invertedIndex = indexContainer._invertedIndex;
+ MutableInvertedIndex invertedIndex = indexContainer._invertedIndex;
int[] docIds = new int[numDocsIndexed];
int[] batch = new int[256];
int docIdIndex = 0;
@@ -977,18 +936,6 @@ public class MutableSegmentImpl implements MutableSegment {
}
/**
- * Helper method that builds allocation context that includes segment name,
column name, and index type.
- *
- * @param segmentName Name of segment.
- * @param columnName Name of column.
- * @param indexType Index type.
- * @return Allocation context built from segment name, column name and index
type.
- */
- private String buildAllocationContext(String segmentName, String columnName,
String indexType) {
- return segmentName + ":" + columnName + indexType;
- }
-
- /**
* Helper function that returns docId, depends on the following scenarios.
* <ul>
* <li> If metrics aggregation is enabled and if the dimension values were
already seen, return existing docIds
@@ -1134,10 +1081,10 @@ public class MutableSegmentImpl implements
MutableSegment {
final NumValuesInfo _numValuesInfo;
final MutableForwardIndex _forwardIndex;
final MutableDictionary _dictionary;
- final RealtimeInvertedIndexReader _invertedIndex;
+ final MutableInvertedIndex _invertedIndex;
final RangeIndexReader _rangeIndex;
final MutableH3Index _h3Index;
- final RealtimeLuceneTextIndexReader _textIndex;
+ final MutableTextIndex _textIndex;
final MutableJsonIndex _jsonIndex;
final BloomFilterReader _bloomFilter;
final MutableNullValueVector _nullValueVector;
@@ -1151,9 +1098,10 @@ public class MutableSegmentImpl implements
MutableSegment {
IndexContainer(FieldSpec fieldSpec, @Nullable PartitionFunction
partitionFunction,
@Nullable Set<Integer> partitions, NumValuesInfo numValuesInfo,
MutableForwardIndex forwardIndex,
- @Nullable MutableDictionary dictionary, @Nullable
RealtimeInvertedIndexReader invertedIndex,
- @Nullable RangeIndexReader rangeIndex, @Nullable
RealtimeLuceneTextIndexReader textIndex,
- @Nullable MutableJsonIndex jsonIndex, @Nullable MutableH3Index
h3Index, @Nullable BloomFilterReader bloomFilter,
+ @Nullable MutableDictionary dictionary, @Nullable MutableInvertedIndex
invertedIndex,
+ @Nullable RangeIndexReader rangeIndex, @Nullable MutableTextIndex
textIndex,
+ @Nullable MutableJsonIndex jsonIndex, @Nullable MutableH3Index h3Index,
+ @Nullable BloomFilterReader bloomFilter,
@Nullable MutableNullValueVector nullValueVector) {
_fieldSpec = fieldSpec;
_partitionFunction = partitionFunction;
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeInvertedIndexReader.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeInvertedIndex.java
similarity index 93%
rename from
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeInvertedIndexReader.java
rename to
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeInvertedIndex.java
index d74151c..7aa5f6b 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeInvertedIndexReader.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeInvertedIndex.java
@@ -21,8 +21,8 @@ package
org.apache.pinot.segment.local.realtime.impl.invertedindex;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.pinot.segment.spi.index.mutable.MutableInvertedIndex;
import
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
-import org.apache.pinot.segment.spi.index.reader.InvertedIndexReader;
import org.roaringbitmap.buffer.MutableRoaringBitmap;
@@ -30,12 +30,12 @@ import org.roaringbitmap.buffer.MutableRoaringBitmap;
* Real-time bitmap based inverted index reader which allows adding values on
the fly.
* <p>This class is thread-safe for single writer multiple readers.
*/
-public class RealtimeInvertedIndexReader implements
InvertedIndexReader<MutableRoaringBitmap> {
+public class RealtimeInvertedIndex implements MutableInvertedIndex {
private final List<ThreadSafeMutableRoaringBitmap> _bitmaps = new
ArrayList<>();
private final ReentrantReadWriteLock.ReadLock _readLock;
private final ReentrantReadWriteLock.WriteLock _writeLock;
- public RealtimeInvertedIndexReader() {
+ public RealtimeInvertedIndex() {
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
_readLock = readWriteLock.readLock();
_writeLock = readWriteLock.writeLock();
@@ -44,6 +44,7 @@ public class RealtimeInvertedIndexReader implements
InvertedIndexReader<MutableR
/**
* Adds the document id to the bitmap of the given dictionary id.
*/
+ @Override
public void add(int dictId, int docId) {
if (_bitmaps.size() == dictId) {
// Bitmap for the dictionary id does not exist, add a new bitmap into
the list
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneIndexReaderRefreshThread.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneIndexReaderRefreshThread.java
index c21e11d..a352f37 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneIndexReaderRefreshThread.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneIndexReaderRefreshThread.java
@@ -114,9 +114,9 @@ public class RealtimeLuceneIndexReaderRefreshThread
implements Runnable {
// if the segment hasn't yet been destroyed, refresh each
// realtime reader (one per column with text index enabled)
// for this segment.
- List<RealtimeLuceneTextIndexReader> realtimeLuceneReaders =
+ List<RealtimeLuceneTextIndex> realtimeLuceneReaders =
realtimeReadersForSegment.getRealtimeLuceneReaders();
- for (RealtimeLuceneTextIndexReader realtimeReader :
realtimeLuceneReaders) {
+ for (RealtimeLuceneTextIndex realtimeReader :
realtimeLuceneReaders) {
if (_stopped) {
// exit
break;
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneIndexRefreshState.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneIndexRefreshState.java
index 90ef96f..e2330a6 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneIndexRefreshState.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneIndexRefreshState.java
@@ -99,7 +99,7 @@ public class RealtimeLuceneIndexRefreshState {
private final String _segmentName;
private final Lock _lock;
private boolean _segmentDestroyed;
- private final List<RealtimeLuceneTextIndexReader> _realtimeLuceneReaders;
+ private final List<RealtimeLuceneTextIndex> _realtimeLuceneReaders;
public RealtimeLuceneReaders(String segmentName) {
_segmentName = segmentName;
@@ -108,7 +108,7 @@ public class RealtimeLuceneIndexRefreshState {
_realtimeLuceneReaders = new LinkedList<>();
}
- public void addReader(RealtimeLuceneTextIndexReader
realtimeLuceneTextIndexReader) {
+ public void addReader(RealtimeLuceneTextIndex
realtimeLuceneTextIndexReader) {
_realtimeLuceneReaders.add(realtimeLuceneTextIndexReader);
}
@@ -124,7 +124,7 @@ public class RealtimeLuceneIndexRefreshState {
return _segmentName;
}
- public List<RealtimeLuceneTextIndexReader> getRealtimeLuceneReaders() {
+ public List<RealtimeLuceneTextIndex> getRealtimeLuceneReaders() {
return _realtimeLuceneReaders;
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndexReader.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java
similarity index 95%
rename from
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndexReader.java
rename to
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java
index 47958cd..60e7b36 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndexReader.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java
@@ -29,7 +29,7 @@ import org.apache.lucene.search.Query;
import org.apache.lucene.search.SearcherManager;
import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
import
org.apache.pinot.segment.local.segment.creator.impl.text.LuceneTextIndexCreator;
-import org.apache.pinot.segment.spi.index.reader.TextIndexReader;
+import org.apache.pinot.segment.spi.index.mutable.MutableTextIndex;
import org.roaringbitmap.IntIterator;
import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
import org.roaringbitmap.buffer.MutableRoaringBitmap;
@@ -42,8 +42,8 @@ import org.slf4j.LoggerFactory;
* Internally it uses {@link LuceneTextIndexCreator} for adding documents to
the lucene index
* as and when they are indexed by the consuming segment.
*/
-public class RealtimeLuceneTextIndexReader implements TextIndexReader {
- private static final org.slf4j.Logger LOGGER =
LoggerFactory.getLogger(RealtimeLuceneTextIndexReader.class);
+public class RealtimeLuceneTextIndex implements MutableTextIndex {
+ private static final org.slf4j.Logger LOGGER =
LoggerFactory.getLogger(RealtimeLuceneTextIndex.class);
private final QueryParser _queryParser;
private final LuceneTextIndexCreator _indexCreator;
@@ -58,7 +58,7 @@ public class RealtimeLuceneTextIndexReader implements
TextIndexReader {
* @param segmentIndexDir realtime segment consumer dir
* @param segmentName realtime segment name
*/
- public RealtimeLuceneTextIndexReader(String column, File segmentIndexDir,
String segmentName) {
+ public RealtimeLuceneTextIndex(String column, File segmentIndexDir, String
segmentName) {
_column = column;
_segmentName = segmentName;
try {
@@ -87,6 +87,7 @@ public class RealtimeLuceneTextIndexReader implements
TextIndexReader {
/**
* Adds a new document.
*/
+ @Override
public void add(String document) {
_indexCreator.add(document);
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndex.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java
similarity index 98%
rename from
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndex.java
rename to
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java
index 136fcd7..2eb37ea 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndex.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java
@@ -36,7 +36,7 @@ import
org.apache.pinot.common.request.context.predicate.NotInPredicate;
import org.apache.pinot.common.request.context.predicate.Predicate;
import
org.apache.pinot.segment.local.segment.creator.impl.inv.json.BaseJsonIndexCreator;
import org.apache.pinot.segment.spi.index.creator.JsonIndexCreator;
-import org.apache.pinot.segment.spi.index.reader.JsonIndexReader;
+import org.apache.pinot.segment.spi.index.mutable.MutableJsonIndex;
import org.apache.pinot.spi.exception.BadQueryRequestException;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.sql.parsers.CalciteSqlParser;
@@ -50,7 +50,7 @@ import static
org.apache.pinot.common.request.context.FilterContext.Type.PREDICA
/**
* Json index for mutable segment.
*/
-public class MutableJsonIndex implements JsonIndexReader {
+public class MutableJsonIndexImpl implements MutableJsonIndex {
private final Map<String, RoaringBitmap> _postingListMap;
private final IntList _docIdMapping;
private final ReentrantReadWriteLock.ReadLock _readLock;
@@ -59,7 +59,7 @@ public class MutableJsonIndex implements JsonIndexReader {
private int _nextDocId;
private int _nextFlattenedDocId;
- public MutableJsonIndex() {
+ public MutableJsonIndexImpl() {
_postingListMap = new HashMap<>();
_docIdMapping = new IntArrayList();
@@ -71,6 +71,7 @@ public class MutableJsonIndex implements JsonIndexReader {
/**
* Adds the next json value.
*/
+ @Override
public void add(String jsonString)
throws IOException {
try {
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/LuceneTextIndexCreator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/LuceneTextIndexCreator.java
index a46c481..b3b3cd0 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/LuceneTextIndexCreator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/LuceneTextIndexCreator.java
@@ -31,7 +31,7 @@ import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
-import
org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneTextIndexReader;
+import
org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneTextIndex;
import
org.apache.pinot.segment.local.segment.creator.impl.SegmentColumnarIndexCreator;
import org.apache.pinot.segment.spi.V1Constants;
import
org.apache.pinot.segment.spi.index.creator.DictionaryBasedInvertedIndexCreator;
@@ -41,7 +41,7 @@ import
org.apache.pinot.segment.spi.index.creator.TextIndexCreator;
/**
* This is used to create Lucene based text index.
* Used for both offline from {@link SegmentColumnarIndexCreator}
- * and realtime from {@link RealtimeLuceneTextIndexReader}
+ * and realtime from {@link RealtimeLuceneTextIndex}
*/
public class LuceneTextIndexCreator implements TextIndexCreator {
// TODO: make buffer size configurable choosing a default value based on the
heap usage results in design doc
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeInvertedIndexReaderTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeInvertedIndexReaderTest.java
index 6a78f54..94528c9 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeInvertedIndexReaderTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeInvertedIndexReaderTest.java
@@ -30,7 +30,7 @@ public class RealtimeInvertedIndexReaderTest {
@Test
public void testRealtimeInvertedIndexReader() {
- RealtimeInvertedIndexReader realtimeInvertedIndexReader = new
RealtimeInvertedIndexReader();
+ RealtimeInvertedIndex realtimeInvertedIndexReader = new
RealtimeInvertedIndex();
// Add dictionary id 0, document id 0 to the inverted index (single-value
dictionary id not added yet)
// Before adding
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/JsonIndexTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/JsonIndexTest.java
index 3f7b2a2..c62ddbc 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/JsonIndexTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/JsonIndexTest.java
@@ -21,7 +21,7 @@ package org.apache.pinot.segment.local.segment.index;
import java.io.File;
import java.io.IOException;
import org.apache.commons.io.FileUtils;
-import org.apache.pinot.segment.local.realtime.impl.json.MutableJsonIndex;
+import org.apache.pinot.segment.local.realtime.impl.json.MutableJsonIndexImpl;
import
org.apache.pinot.segment.local.segment.creator.impl.inv.json.OffHeapJsonIndexCreator;
import
org.apache.pinot.segment.local.segment.creator.impl.inv.json.OnHeapJsonIndexCreator;
import
org.apache.pinot.segment.local.segment.index.readers.json.ImmutableJsonIndexReader;
@@ -101,7 +101,7 @@ public class JsonIndexTest {
PinotDataBuffer offHeapDataBuffer =
PinotDataBuffer.mapReadOnlyBigEndianFile(offHeapIndexFile);
JsonIndexReader onHeapIndexReader = new
ImmutableJsonIndexReader(onHeapDataBuffer, records.length);
JsonIndexReader offHeapIndexReader = new
ImmutableJsonIndexReader(offHeapDataBuffer, records.length);
- MutableJsonIndex mutableJsonIndex = new MutableJsonIndex()) {
+ MutableJsonIndexImpl mutableJsonIndex = new MutableJsonIndexImpl()) {
for (String record : records) {
mutableJsonIndex.add(record);
}
@@ -186,7 +186,7 @@ public class JsonIndexTest {
PinotDataBuffer offHeapDataBuffer =
PinotDataBuffer.mapReadOnlyBigEndianFile(offHeapIndexFile);
JsonIndexReader onHeapIndexReader = new
ImmutableJsonIndexReader(onHeapDataBuffer, records.length);
JsonIndexReader offHeapIndexReader = new
ImmutableJsonIndexReader(offHeapDataBuffer, records.length);
- MutableJsonIndex mutableJsonIndex = new MutableJsonIndex()) {
+ MutableJsonIndexImpl mutableJsonIndex = new MutableJsonIndexImpl()) {
for (String record : records) {
mutableJsonIndex.add(record);
}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/IndexingOverrides.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/IndexingOverrides.java
index 273a4e9..8fc65d9 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/IndexingOverrides.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/IndexingOverrides.java
@@ -36,6 +36,13 @@ import
org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
import org.apache.pinot.segment.spi.index.creator.GeoSpatialIndexCreator;
import org.apache.pinot.segment.spi.index.creator.JsonIndexCreator;
import org.apache.pinot.segment.spi.index.creator.TextIndexCreator;
+import org.apache.pinot.segment.spi.index.mutable.MutableDictionary;
+import org.apache.pinot.segment.spi.index.mutable.MutableForwardIndex;
+import org.apache.pinot.segment.spi.index.mutable.MutableInvertedIndex;
+import org.apache.pinot.segment.spi.index.mutable.MutableJsonIndex;
+import org.apache.pinot.segment.spi.index.mutable.MutableTextIndex;
+import org.apache.pinot.segment.spi.index.mutable.provider.MutableIndexContext;
+import
org.apache.pinot.segment.spi.index.mutable.provider.MutableIndexProvider;
import org.apache.pinot.segment.spi.index.reader.BloomFilterReader;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
import org.apache.pinot.segment.spi.index.reader.H3IndexReader;
@@ -52,15 +59,16 @@ import org.slf4j.LoggerFactory;
public class IndexingOverrides {
- public interface IndexingOverride extends IndexCreatorProvider,
IndexReaderProvider {
+ public interface IndexingOverride extends IndexCreatorProvider,
IndexReaderProvider, MutableIndexProvider {
}
private static final Logger LOGGER =
LoggerFactory.getLogger(IndexingOverrides.class);
private static final IndexCreatorProvider CREATOR_DEFAULTS =
createDefaultCreatorProvider();
private static final IndexReaderProvider READER_DEFAULTS =
createDefaultReaderProvider();
- private static final AtomicReference<IndexingOverride> REGISTRATION = new
AtomicReference<>(null);
+ private static final MutableIndexProvider MUTABLE_INDEX_DEFAULTS =
createDefaultMutableIndexProvider();
+ private static final AtomicReference<IndexingOverride> REGISTRATION = new
AtomicReference<>(null);
private IndexingOverrides() {
}
@@ -77,13 +85,21 @@ public class IndexingOverrides {
/**
* Gets the registered {@see IndexReaderProvider} or the default if none was
registered yet.
- * @return an index reader provier.
+ * @return an index reader provider.
*/
public static IndexReaderProvider getIndexReaderProvider() {
return Holder.PROVIDER;
}
/**
+ * Gets the registered {@see MutableIndexProvider} or the default if none
was registered yet.
+ * @return a mutable index reader provider.
+ */
+ public static MutableIndexProvider getMutableIndexProvider() {
+ return Holder.PROVIDER;
+ }
+
+ /**
* Obtain the registered index creator provider. If the user has provided an
override, then it will be used instead.
* If the user has not provided an override yet, then this action will
prevent them from doing so.
* @return the global index provision logic.
@@ -104,6 +120,11 @@ public class IndexingOverrides {
return
invokeDefaultConstructor("org.apache.pinot.segment.local.segment.index.readers.DefaultIndexReaderProvider");
}
+ private static MutableIndexProvider createDefaultMutableIndexProvider() {
+ return invokeDefaultConstructor(
+
"org.apache.pinot.segment.local.indexsegment.mutable.DefaultMutableIndexProvider");
+ }
+
@SuppressWarnings("unchecked")
private static <T> T invokeDefaultConstructor(String className) {
try {
@@ -233,12 +254,48 @@ public class IndexingOverrides {
return READER_DEFAULTS.newTextIndexReader(file, columnMetadata,
textIndexProperties);
}
+ @Override
+ public MutableForwardIndex newForwardIndex(MutableIndexContext.Forward
context) {
+ ensureMutableReaderPresent();
+ return MUTABLE_INDEX_DEFAULTS.newForwardIndex(context);
+ }
+
+ @Override
+ public MutableInvertedIndex newInvertedIndex(MutableIndexContext.Inverted
context) {
+ ensureMutableReaderPresent();
+ return MUTABLE_INDEX_DEFAULTS.newInvertedIndex(context);
+ }
+
+ @Override
+ public MutableTextIndex newTextIndex(MutableIndexContext.Text context) {
+ ensureMutableReaderPresent();
+ return MUTABLE_INDEX_DEFAULTS.newTextIndex(context);
+ }
+
+ @Override
+ public MutableJsonIndex newJsonIndex(MutableIndexContext.Json context) {
+ ensureMutableReaderPresent();
+ return MUTABLE_INDEX_DEFAULTS.newJsonIndex(context);
+ }
+
+ @Override
+ public MutableDictionary newDictionary(MutableIndexContext.Dictionary
context) {
+ ensureMutableReaderPresent();
+ return MUTABLE_INDEX_DEFAULTS.newDictionary(context);
+ }
+
private void ensureReaderPresent() {
if (READER_DEFAULTS == null) {
throw new UnsupportedOperationException("default implementation not
present on classpath");
}
}
+ private void ensureMutableReaderPresent() {
+ if (MUTABLE_INDEX_DEFAULTS == null) {
+ throw new UnsupportedOperationException("default implementation not
present on classpath");
+ }
+ }
+
private void ensureCreatorPresent() {
if (CREATOR_DEFAULTS == null) {
throw new UnsupportedOperationException("default implementation not
present on classpath");
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/MutableInvertedIndex.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/MutableInvertedIndex.java
new file mode 100644
index 0000000..c88499e
--- /dev/null
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/MutableInvertedIndex.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.index.mutable;
+
+import org.apache.pinot.segment.spi.index.reader.InvertedIndexReader;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+
+
+public interface MutableInvertedIndex extends
InvertedIndexReader<MutableRoaringBitmap> {
+ /**
+ * Add the docId to the posting list for the dictionary id.
+ * @param dictId dictionary id
+ * @param docId document id
+ */
+ void add(int dictId, int docId);
+}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/MutableJsonIndex.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/MutableJsonIndex.java
new file mode 100644
index 0000000..b5e34f7
--- /dev/null
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/MutableJsonIndex.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.index.mutable;
+
+import java.io.IOException;
+import org.apache.pinot.segment.spi.index.reader.JsonIndexReader;
+
+
+public interface MutableJsonIndex extends JsonIndexReader {
+ /**
+ * Index a JSON document
+ * @param jsonString the JSON
+ */
+ void add(String jsonString)
+ throws IOException;
+}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/MutableTextIndex.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/MutableTextIndex.java
new file mode 100644
index 0000000..00e894f
--- /dev/null
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/MutableTextIndex.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.index.mutable;
+
+import org.apache.pinot.segment.spi.index.reader.TextIndexReader;
+
+
+public interface MutableTextIndex extends TextIndexReader {
+ /**
+ * Index the document
+ * @param document the document as a string
+ */
+ void add(String document);
+}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/provider/MutableDictionaryProvider.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/provider/MutableDictionaryProvider.java
new file mode 100644
index 0000000..e42f4a9
--- /dev/null
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/provider/MutableDictionaryProvider.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.index.mutable.provider;
+
+import org.apache.pinot.segment.spi.index.mutable.MutableDictionary;
+
+
+public interface MutableDictionaryProvider {
+ MutableDictionary newDictionary(MutableIndexContext.Dictionary context);
+}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/provider/MutableForwardIndexProvider.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/provider/MutableForwardIndexProvider.java
new file mode 100644
index 0000000..948336f
--- /dev/null
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/provider/MutableForwardIndexProvider.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.index.mutable.provider;
+
+import org.apache.pinot.segment.spi.index.mutable.MutableForwardIndex;
+
+
+public interface MutableForwardIndexProvider {
+ MutableForwardIndex newForwardIndex(MutableIndexContext.Forward context);
+}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/provider/MutableIndexContext.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/provider/MutableIndexContext.java
new file mode 100644
index 0000000..7d86dae
--- /dev/null
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/provider/MutableIndexContext.java
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.index.mutable.provider;
+
+import java.util.Objects;
+import org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+public interface MutableIndexContext {
+ PinotDataBufferMemoryManager getMemoryManager();
+
+ FieldSpec getFieldSpec();
+
+ String getSegmentName();
+
+ boolean hasDictionary();
+
+ int getCapacity();
+
+ boolean isOffHeap();
+
+ static Builder builder() {
+ return new Builder();
+ }
+
+ class Builder {
+ private FieldSpec _fieldSpec;
+ private String _segmentName;
+ private boolean _hasDictionary = true;
+ private boolean _offHeap = true;
+ private int _capacity;
+ private PinotDataBufferMemoryManager _memoryManager;
+
+ public Builder withMemoryManager(PinotDataBufferMemoryManager
memoryManager) {
+ _memoryManager = memoryManager;
+ return this;
+ }
+
+ public Builder withFieldSpec(FieldSpec fieldSpec) {
+ _fieldSpec = fieldSpec;
+ return this;
+ }
+
+ public Builder withSegmentName(String segmentName) {
+ _segmentName = segmentName;
+ return this;
+ }
+
+ public Builder withDictionary(boolean hasDictionary) {
+ _hasDictionary = hasDictionary;
+ return this;
+ }
+
+ public Builder offHeap(boolean offHeap) {
+ _offHeap = offHeap;
+ return this;
+ }
+
+ public Builder withCapacity(int capacity) {
+ _capacity = capacity;
+ return this;
+ }
+
+ public Common build() {
+ return new Common(Objects.requireNonNull(_fieldSpec), _hasDictionary,
Objects.requireNonNull(_segmentName),
+ Objects.requireNonNull(_memoryManager), _capacity, _offHeap);
+ }
+ }
+
+ final class Common implements MutableIndexContext {
+ private final int _capacity;
+ private final FieldSpec _fieldSpec;
+ private final boolean _hasDictionary;
+ private final boolean _offHeap;
+ private final String _segmentName;
+ private final PinotDataBufferMemoryManager _memoryManager;
+
+ public Common(FieldSpec fieldSpec, boolean hasDictionary, String
segmentName,
+ PinotDataBufferMemoryManager memoryManager, int capacity, boolean
offHeap) {
+ _fieldSpec = fieldSpec;
+ _hasDictionary = hasDictionary;
+ _segmentName = segmentName;
+ _memoryManager = memoryManager;
+ _capacity = capacity;
+ _offHeap = offHeap;
+ }
+
+ @Override
+ public PinotDataBufferMemoryManager getMemoryManager() {
+ return _memoryManager;
+ }
+
+ @Override
+ public String getSegmentName() {
+ return _segmentName;
+ }
+
+ @Override
+ public FieldSpec getFieldSpec() {
+ return _fieldSpec;
+ }
+
+ @Override
+ public boolean hasDictionary() {
+ return _hasDictionary;
+ }
+
+ @Override
+ public int getCapacity() {
+ return _capacity;
+ }
+
+ @Override
+ public boolean isOffHeap() {
+ return _offHeap;
+ }
+
+ public Dictionary forDictionary(int estimatedColSize, int
estimatedCardinality) {
+ return new Dictionary(this, estimatedColSize, estimatedCardinality);
+ }
+
+ public Forward forForwardIndex(int avgNumMultiValues) {
+ return new Forward(this, avgNumMultiValues);
+ }
+
+ public Inverted forInvertedIndex() {
+ return new Inverted(this);
+ }
+
+ public Json forJsonIndex() {
+ return new Json(this);
+ }
+
+ public Text forTextIndex() {
+ return new Text(this);
+ }
+ }
+
+ class Wrapper implements MutableIndexContext {
+
+ private final MutableIndexContext _wrapped;
+
+ public Wrapper(MutableIndexContext wrapped) {
+ _wrapped = wrapped;
+ }
+
+ @Override
+ public PinotDataBufferMemoryManager getMemoryManager() {
+ return _wrapped.getMemoryManager();
+ }
+
+ @Override
+ public FieldSpec getFieldSpec() {
+ return _wrapped.getFieldSpec();
+ }
+
+ @Override
+ public String getSegmentName() {
+ return _wrapped.getSegmentName();
+ }
+
+ @Override
+ public boolean hasDictionary() {
+ return _wrapped.hasDictionary();
+ }
+
+ @Override
+ public int getCapacity() {
+ return _wrapped.getCapacity();
+ }
+
+ @Override
+ public boolean isOffHeap() {
+ return _wrapped.isOffHeap();
+ }
+ }
+
+ class Dictionary extends Wrapper {
+
+ private final int _estimatedColSize;
+ private final int _estimatedCardinality;
+
+ public Dictionary(MutableIndexContext wrapped, int estimatedColSize, int
estimatedCardinality) {
+ super(wrapped);
+ _estimatedColSize = estimatedColSize;
+ _estimatedCardinality = estimatedCardinality;
+ }
+
+ public int getEstimatedColSize() {
+ return _estimatedColSize;
+ }
+
+ public int getEstimatedCardinality() {
+ return _estimatedCardinality;
+ }
+ }
+
+ class Forward extends Wrapper {
+
+ private final int _avgNumMultiValues;
+
+ public Forward(MutableIndexContext wrapped, int avgNumMultiValues) {
+ super(wrapped);
+ _avgNumMultiValues = avgNumMultiValues;
+ }
+
+ public int getAvgNumMultiValues() {
+ return _avgNumMultiValues;
+ }
+ }
+
+ class Inverted extends Wrapper {
+
+ public Inverted(MutableIndexContext wrapped) {
+ super(wrapped);
+ }
+ }
+
+ class Json extends Wrapper {
+
+ public Json(MutableIndexContext wrapped) {
+ super(wrapped);
+ }
+ }
+
+ class Text extends Wrapper {
+
+ public Text(MutableIndexContext wrapped) {
+ super(wrapped);
+ }
+ }
+}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/provider/MutableIndexProvider.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/provider/MutableIndexProvider.java
new file mode 100644
index 0000000..60f07c5
--- /dev/null
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/provider/MutableIndexProvider.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.index.mutable.provider;
+
+public interface MutableIndexProvider
+ extends MutableForwardIndexProvider, MutableInvertedIndexProvider,
MutableTextIndexReaderProvider,
+ MutableJsonIndexProvider, MutableDictionaryProvider {
+}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/provider/MutableInvertedIndexProvider.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/provider/MutableInvertedIndexProvider.java
new file mode 100644
index 0000000..d67d5aa
--- /dev/null
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/provider/MutableInvertedIndexProvider.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.index.mutable.provider;
+
+import org.apache.pinot.segment.spi.index.mutable.MutableInvertedIndex;
+
+
+public interface MutableInvertedIndexProvider {
+ MutableInvertedIndex newInvertedIndex(MutableIndexContext.Inverted context);
+}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/provider/MutableJsonIndexProvider.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/provider/MutableJsonIndexProvider.java
new file mode 100644
index 0000000..fb518c3
--- /dev/null
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/provider/MutableJsonIndexProvider.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.index.mutable.provider;
+
+import org.apache.pinot.segment.spi.index.mutable.MutableJsonIndex;
+
+
+public interface MutableJsonIndexProvider {
+ MutableJsonIndex newJsonIndex(MutableIndexContext.Json context);
+}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/provider/MutableTextIndexReaderProvider.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/provider/MutableTextIndexReaderProvider.java
new file mode 100644
index 0000000..de30926
--- /dev/null
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/provider/MutableTextIndexReaderProvider.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.index.mutable.provider;
+
+import org.apache.pinot.segment.spi.index.mutable.MutableTextIndex;
+
+
+public interface MutableTextIndexReaderProvider {
+ MutableTextIndex newTextIndex(MutableIndexContext.Text context);
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]