gortiz commented on code in PR #10184:
URL: https://github.com/apache/pinot/pull/10184#discussion_r1153009977
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java:
##########
@@ -133,183 +120,143 @@ public void init(SegmentGeneratorConfig
segmentCreationSpec, SegmentIndexCreatio
return;
}
- Collection<FieldSpec> fieldSpecs = schema.getAllFieldSpecs();
- Set<String> invertedIndexColumns = new HashSet<>();
- for (String columnName : _config.getInvertedIndexCreationColumns()) {
- Preconditions.checkState(schema.hasColumn(columnName),
- "Cannot create inverted index for column: %s because it is not in
schema", columnName);
- invertedIndexColumns.add(columnName);
- }
+ Map<String, FieldIndexConfigs> indexConfigs =
segmentCreationSpec.getIndexConfigsByColName();
- Set<String> bloomFilterColumns = new HashSet<>();
- for (String columnName : _config.getBloomFilterCreationColumns()) {
- Preconditions.checkState(schema.hasColumn(columnName),
- "Cannot create bloom filter for column: %s because it is not in
schema", columnName);
- bloomFilterColumns.add(columnName);
- }
-
- Set<String> rangeIndexColumns = new HashSet<>();
- for (String columnName : _config.getRangeIndexCreationColumns()) {
- Preconditions.checkState(schema.hasColumn(columnName),
- "Cannot create range index for column: %s because it is not in
schema", columnName);
- rangeIndexColumns.add(columnName);
- }
-
- Set<String> textIndexColumns = new HashSet<>();
- for (String columnName : _config.getTextIndexCreationColumns()) {
- Preconditions.checkState(schema.hasColumn(columnName),
- "Cannot create text index for column: %s because it is not in
schema", columnName);
- textIndexColumns.add(columnName);
- }
-
- Set<String> fstIndexColumns = new HashSet<>();
- for (String columnName : _config.getFSTIndexCreationColumns()) {
- Preconditions.checkState(schema.hasColumn(columnName),
- "Cannot create FST index for column: %s because it is not in
schema", columnName);
- fstIndexColumns.add(columnName);
- }
-
- Map<String, JsonIndexConfig> jsonIndexConfigs =
_config.getJsonIndexConfigs();
- for (String columnName : jsonIndexConfigs.keySet()) {
- Preconditions.checkState(schema.hasColumn(columnName),
- "Cannot create json index for column: %s because it is not in
schema", columnName);
- }
-
- Set<String> forwardIndexDisabledColumns = new HashSet<>();
- for (String columnName : _config.getForwardIndexDisabledColumns()) {
- Preconditions.checkState(schema.hasColumn(columnName),
String.format("Invalid config. Can't disable "
- + "forward index creation for a column: %s that does not exist in
schema", columnName));
- forwardIndexDisabledColumns.add(columnName);
- }
-
- Map<String, H3IndexConfig> h3IndexConfigs = _config.getH3IndexConfigs();
- for (String columnName : h3IndexConfigs.keySet()) {
- Preconditions.checkState(schema.hasColumn(columnName),
- "Cannot create H3 index for column: %s because it is not in schema",
columnName);
- }
+ _creatorsByColAndIndex =
Maps.newHashMapWithExpectedSize(indexConfigs.keySet().size());
- // Initialize creators for dictionary, forward index and inverted index
- IndexingConfig indexingConfig =
_config.getTableConfig().getIndexingConfig();
- int rangeIndexVersion = indexingConfig.getRangeIndexVersion();
- for (FieldSpec fieldSpec : fieldSpecs) {
- // Ignore virtual columns
+ for (String columnName : indexConfigs.keySet()) {
+ FieldSpec fieldSpec = schema.getFieldSpecFor(columnName);
+ if (fieldSpec == null) {
+ Preconditions.checkState(schema.hasColumn(columnName),
+ "Cannot create index for column: %s because it is not in schema",
columnName);
+ }
if (fieldSpec.isVirtualColumn()) {
+ LOGGER.warn("Ignoring index creation for virtual column " +
columnName);
continue;
}
- String columnName = fieldSpec.getName();
+ FieldIndexConfigs originalConfig = indexConfigs.get(columnName);
ColumnIndexCreationInfo columnIndexCreationInfo =
indexCreationInfoMap.get(columnName);
Preconditions.checkNotNull(columnIndexCreationInfo, "Missing index
creation info for column: %s", columnName);
boolean dictEnabledColumn =
createDictionaryForColumn(columnIndexCreationInfo, segmentCreationSpec,
fieldSpec);
- Preconditions.checkState(dictEnabledColumn ||
!invertedIndexColumns.contains(columnName),
+ Preconditions.checkState(dictEnabledColumn ||
!originalConfig.getConfig(StandardIndexes.inverted()).isEnabled(),
"Cannot create inverted index for raw index column: %s", columnName);
- boolean forwardIndexDisabled =
forwardIndexDisabledColumns.contains(columnName);
+ IndexType<ForwardIndexConfig, ?, ForwardIndexCreator> forwardIdx =
StandardIndexes.forward();
+ boolean forwardIndexDisabled =
!originalConfig.getConfig(forwardIdx).isEnabled();
IndexCreationContext.Common context = IndexCreationContext.builder()
.withIndexDir(_indexDir)
- .withCardinality(columnIndexCreationInfo.getDistinctValueCount())
.withDictionary(dictEnabledColumn)
.withFieldSpec(fieldSpec)
.withTotalDocs(segmentIndexCreationInfo.getTotalDocs())
- .withMinValue((Comparable<?>) columnIndexCreationInfo.getMin())
- .withMaxValue((Comparable<?>) columnIndexCreationInfo.getMax())
-
.withTotalNumberOfEntries(columnIndexCreationInfo.getTotalNumberOfEntries())
.withColumnIndexCreationInfo(columnIndexCreationInfo)
- .sorted(columnIndexCreationInfo.isSorted())
+ .withIsOptimizedDictionary(_config.isOptimizeDictionary()
+ || _config.isOptimizeDictionaryForMetrics() &&
fieldSpec.getFieldType() == FieldSpec.FieldType.METRIC)
.onHeap(segmentCreationSpec.isOnHeap())
.withForwardIndexDisabled(forwardIndexDisabled)
+ .withTextCommitOnClose(true)
.build();
- // Initialize forward index creator
- ChunkCompressionType chunkCompressionType =
- dictEnabledColumn ? null :
getColumnCompressionType(segmentCreationSpec, fieldSpec);
- // Sorted columns treat the 'forwardIndexDisabled' flag as a no-op
- _forwardIndexCreatorMap.put(columnName, (forwardIndexDisabled &&
!columnIndexCreationInfo.isSorted())
- ? null : _indexCreatorProvider.newForwardIndexCreator(
- context.forForwardIndex(chunkCompressionType,
segmentCreationSpec.getColumnProperties())));
-
- // Initialize inverted index creator; skip creating inverted index if
sorted
- if (invertedIndexColumns.contains(columnName) &&
!columnIndexCreationInfo.isSorted()) {
- _invertedIndexCreatorMap.put(columnName,
-
_indexCreatorProvider.newInvertedIndexCreator(context.forInvertedIndex()));
- }
+
+ FieldIndexConfigs config = adaptConfig(columnName, originalConfig,
columnIndexCreationInfo, segmentCreationSpec);
+
if (dictEnabledColumn) {
// Create dictionary-encoded index
// Initialize dictionary creator
// TODO: Dictionary creator holds all unique values on heap. Consider
keeping dictionary instead of creator
// which uses off-heap memory.
- SegmentDictionaryCreator dictionaryCreator =
- new SegmentDictionaryCreator(fieldSpec, _indexDir,
columnIndexCreationInfo.isUseVarLengthDictionary());
- _dictionaryCreatorMap.put(columnName, dictionaryCreator);
- // Create dictionary
+
+ // Index conf should be present if dictEnabledColumn is true. In case
it doesn't, getConfig will throw an
+ // exception
+ DictionaryIndexConfig dictConfig =
config.getConfig(StandardIndexes.dictionary());
+ if (!dictConfig.isEnabled()) {
+ throw new IllegalArgumentException("Dictionary index should be
enabled");
+ }
+ SegmentDictionaryCreator creator = new
DictionaryIndexPlugin().getIndexType()
+ .createIndexCreator(context, dictConfig);
+
try {
-
dictionaryCreator.build(columnIndexCreationInfo.getSortedUniqueElementsArray());
+ creator.build(context.getSortedUniqueElementsArray());
} catch (Exception e) {
LOGGER.error("Error building dictionary for field: {}, cardinality:
{}, number of bytes per entry: {}",
- fieldSpec.getName(),
columnIndexCreationInfo.getDistinctValueCount(),
- dictionaryCreator.getNumBytesPerEntry());
+ context.getFieldSpec().getName(), context.getCardinality(),
creator.getNumBytesPerEntry());
throw e;
}
- }
- if (bloomFilterColumns.contains(columnName)) {
- if (indexingConfig.getBloomFilterConfigs() != null
- && indexingConfig.getBloomFilterConfigs().containsKey(columnName))
{
- _bloomFilterCreatorMap.put(columnName,
_indexCreatorProvider.newBloomFilterCreator(
-
context.forBloomFilter(indexingConfig.getBloomFilterConfigs().get(columnName))));
- } else {
- _bloomFilterCreatorMap.put(columnName,
_indexCreatorProvider.newBloomFilterCreator(
- context.forBloomFilter(new
BloomFilterConfig(BloomFilterConfig.DEFAULT_FPP, 0, false))));
- }
+ _dictionaryCreatorMap.put(columnName, creator);
}
- if (!columnIndexCreationInfo.isSorted() &&
rangeIndexColumns.contains(columnName)) {
- _rangeIndexFilterCreatorMap.put(columnName,
-
_indexCreatorProvider.newRangeIndexCreator(context.forRangeIndex(rangeIndexVersion)));
+ Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex =
+
Maps.newHashMapWithExpectedSize(IndexService.getInstance().getAllIndexes().size());
+ for (IndexType<?, ?, ?> index :
IndexService.getInstance().getAllIndexes()) {
+ if (hasSpecialLifecycle(index)) {
+ continue;
+ }
+ tryCreateIndexCreator(creatorsByIndex, index, context, config);
}
-
- if (textIndexColumns.contains(columnName)) {
- FSTType fstType = FSTType.LUCENE;
- List<FieldConfig> fieldConfigList =
_config.getTableConfig().getFieldConfigList();
- if (fieldConfigList != null) {
- for (FieldConfig fieldConfig : fieldConfigList) {
- if (fieldConfig.getName().equals(columnName)) {
- Map<String, String> properties = fieldConfig.getProperties();
- if (TextIndexUtils.isFstTypeNative(properties)) {
- fstType = FSTType.NATIVE;
- }
- }
- }
+ // TODO: Remove this when values stored as ForwardIndex stop depending
on TextIndex config
+ IndexCreator oldFwdCreator = creatorsByIndex.get(forwardIdx);
+ if (oldFwdCreator != null) {
+ Object fakeForwardValue =
calculateRawValueForTextIndex(dictEnabledColumn, config, fieldSpec);
+ if (fakeForwardValue != null) {
+ @SuppressWarnings("unchecked")
+ ForwardIndexCreator castedOldFwdCreator = (ForwardIndexCreator)
oldFwdCreator;
+ SameValueForwardIndexCreator fakeValueFwdCreator =
+ new SameValueForwardIndexCreator(fakeForwardValue,
castedOldFwdCreator);
+ creatorsByIndex.put(forwardIdx, fakeValueFwdCreator);
}
- _textIndexCreatorMap.put(columnName,
-
_indexCreatorProvider.newTextIndexCreator(context.forTextIndex(fstType, true,
- TextIndexUtils.extractStopWordsInclude(columnName,
_columnProperties),
- TextIndexUtils.extractStopWordsExclude(columnName,
_columnProperties))));
}
+ _creatorsByColAndIndex.put(columnName, creatorsByIndex);
+ }
- if (fstIndexColumns.contains(columnName)) {
- _fstIndexCreatorMap.put(columnName,
_indexCreatorProvider.newTextIndexCreator(
- context.forFSTIndex(_config.getFSTIndexType(),
- (String[])
columnIndexCreationInfo.getSortedUniqueElementsArray())));
+ // Although NullValueVector is implemented as an index, it needs to be
treated in a different way than other indexes
+ _nullHandlingEnabled = _config.isNullHandlingEnabled();
+ if (_nullHandlingEnabled) {
+ for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
+ // Initialize Null value vector map
+ String columnName = fieldSpec.getName();
+ _nullValueVectorCreatorMap.put(columnName, new
NullValueVectorCreator(_indexDir, columnName));
}
+ }
+ }
- JsonIndexConfig jsonIndexConfig = jsonIndexConfigs.get(columnName);
- if (jsonIndexConfig != null) {
- _jsonIndexCreatorMap.put(columnName,
-
_indexCreatorProvider.newJsonIndexCreator(context.forJsonIndex(jsonIndexConfig)));
- }
+ private FieldIndexConfigs adaptConfig(String columnName, FieldIndexConfigs
config,
+ ColumnIndexCreationInfo columnIndexCreationInfo, SegmentGeneratorConfig
segmentCreationSpec) {
+ FieldIndexConfigs.Builder builder = new FieldIndexConfigs.Builder(config);
+ // Sorted columns treat the 'forwardIndexDisabled' flag as a no-op
+ ForwardIndexConfig fwdConfig = config.getConfig(StandardIndexes.forward());
+ if (!fwdConfig.isEnabled() && columnIndexCreationInfo.isSorted()) {
+ builder.add(StandardIndexes.forward(), new
ForwardIndexConfig.Builder(fwdConfig)
+ .withLegacyProperties(segmentCreationSpec.getColumnProperties(),
columnName)
+ .build());
+ }
+ // Initialize inverted index creator; skip creating inverted index if
sorted
+ if (columnIndexCreationInfo.isSorted()) {
+ builder.undeclare(StandardIndexes.inverted());
+ }
+ return builder.build();
+ }
- H3IndexConfig h3IndexConfig = h3IndexConfigs.get(columnName);
- if (h3IndexConfig != null) {
- _h3IndexCreatorMap.put(columnName,
-
_indexCreatorProvider.newGeoSpatialIndexCreator(context.forGeospatialIndex(h3IndexConfig)));
- }
+ /**
+ * Returns true if the given index type has their own construction lifecycle
and therefore should not be instantiated
+ * in the general index loop and shouldn't be notified of each new column.
+ */
+ private boolean hasSpecialLifecycle(IndexType<?, ?, ?> indexType) {
+ return indexType == StandardIndexes.nullValueVector() || indexType ==
StandardIndexes.dictionary();
Review Comment:
I don't think that is a good idea because `SegmentColumnarIndexCreator` has
to know not only which indexes have to be treated "in a special way" but also
_how_ to treat them in a special way. I mean, imagine that a new index type has
to be treated in a special way. It wouldn't be enough to just return true to
some new method, `SegmentColumnarIndexCreator` would need to know how to deal
with that index.
We can design a more complex algorithm where there are different strategies,
but it sounds difficult to find the correct abstraction and TBH I don't think
new indexes will need to be special. In fact dictionary and null vector are not
actual indexes. They are different ways to encode the information that are
historically implemented as indexes because that was the tool we had.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]