siddharthteotia commented on code in PR #9678:
URL: https://github.com/apache/pinot/pull/9678#discussion_r1011068710
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java:
##########
@@ -271,118 +304,295 @@ private void rewriteRawSVForwardIndex(String column,
ColumnMetadata existingColM
.forForwardIndex(newCompressionType,
_indexLoadingConfig.getColumnProperties());
try (ForwardIndexCreator creator =
indexCreatorProvider.newForwardIndexCreator(context)) {
- // If creator stored type and the reader stored type do not match,
throw an exception.
if (!reader.getStoredType().equals(creator.getValueType())) {
+ // Creator stored type should match reader stored type for raw
columns. We do not support changing datatypes.
String failureMsg =
"Unsupported operation to change datatype for column=" + column
+ " from " + reader.getStoredType()
.toString() + " to " + creator.getValueType().toString();
throw new UnsupportedOperationException(failureMsg);
}
int numDocs = existingColMetadata.getTotalDocs();
- forwardIndexWriterHelper(column, reader, creator, numDocs);
+ forwardIndexWriterHelper(column, existingColMetadata, reader, creator,
numDocs, null);
}
}
}
- private void forwardIndexWriterHelper(String column, ForwardIndexReader
reader, ForwardIndexCreator creator,
- int numDocs) {
- // If creator stored type should match reader stored type. We do not
support changing datatypes.
- if (!reader.getStoredType().equals(creator.getValueType())) {
- String failureMsg =
- "Unsupported operation to change datatype for column=" + column + "
from " + reader.getStoredType().toString()
- + " to " + creator.getValueType().toString();
- throw new UnsupportedOperationException(failureMsg);
- }
-
+ private void forwardIndexWriterHelper(String column, ColumnMetadata
existingColumnMetadata, ForwardIndexReader reader,
+ ForwardIndexCreator creator, int numDocs, @Nullable
SegmentDictionaryCreator dictionaryCreator) {
ForwardIndexReaderContext readerContext = reader.createContext();
boolean isSVColumn = reader.isSingleValue();
- switch (reader.getStoredType()) {
- // JSON fields are either stored as string or bytes. No special handling
is needed because we make this
- // decision based on the storedType of the reader.
- case INT: {
- for (int i = 0; i < numDocs; i++) {
- if (isSVColumn) {
- int val = reader.getInt(i, readerContext);
- creator.putInt(val);
- } else {
- int[] ints = reader.getIntMV(i, readerContext);
- creator.putIntMV(ints);
- }
+ if (dictionaryCreator != null) {
+ int maxNumValuesPerEntry =
existingColumnMetadata.getMaxNumberOfMultiValues();
+ PinotSegmentColumnReader columnReader = new
PinotSegmentColumnReader(reader, null, null, maxNumValuesPerEntry);
+
+ for (int i = 0; i < numDocs; i++) {
+ Object obj = columnReader.getValue(i);
+
+ if (isSVColumn) {
+ int dictId = dictionaryCreator.indexOfSV(obj);
+ creator.putDictId(dictId);
+ } else {
+ int[] dictIds = dictionaryCreator.indexOfMV(obj);
+ creator.putDictIdMV(dictIds);
}
- break;
}
- case LONG: {
- for (int i = 0; i < numDocs; i++) {
- if (isSVColumn) {
- long val = reader.getLong(i, readerContext);
- creator.putLong(val);
- } else {
- long[] longs = reader.getLongMV(i, readerContext);
- creator.putLongMV(longs);
+ } else {
+ switch (reader.getStoredType()) {
+ // JSON fields are either stored as string or bytes. No special
handling is needed because we make this
+ // decision based on the storedType of the reader.
+ case INT: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ int val = reader.getInt(i, readerContext);
+ creator.putInt(val);
+ } else {
+ int[] ints = reader.getIntMV(i, readerContext);
+ creator.putIntMV(ints);
+ }
}
+ break;
}
- break;
- }
- case FLOAT: {
- for (int i = 0; i < numDocs; i++) {
- if (isSVColumn) {
- float val = reader.getFloat(i, readerContext);
- creator.putFloat(val);
- } else {
- float[] floats = reader.getFloatMV(i, readerContext);
- creator.putFloatMV(floats);
+ case LONG: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ long val = reader.getLong(i, readerContext);
+ creator.putLong(val);
+ } else {
+ long[] longs = reader.getLongMV(i, readerContext);
+ creator.putLongMV(longs);
+ }
}
+ break;
}
- break;
- }
- case DOUBLE: {
- for (int i = 0; i < numDocs; i++) {
- if (isSVColumn) {
- double val = reader.getDouble(i, readerContext);
- creator.putDouble(val);
- } else {
- double[] doubles = reader.getDoubleMV(i, readerContext);
- creator.putDoubleMV(doubles);
+ case FLOAT: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ float val = reader.getFloat(i, readerContext);
+ creator.putFloat(val);
+ } else {
+ float[] floats = reader.getFloatMV(i, readerContext);
+ creator.putFloatMV(floats);
+ }
}
+ break;
}
- break;
- }
- case STRING: {
- for (int i = 0; i < numDocs; i++) {
- if (isSVColumn) {
- String val = reader.getString(i, readerContext);
- creator.putString(val);
- } else {
- String[] strings = reader.getStringMV(i, readerContext);
- creator.putStringMV(strings);
+ case DOUBLE: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ double val = reader.getDouble(i, readerContext);
+ creator.putDouble(val);
+ } else {
+ double[] doubles = reader.getDoubleMV(i, readerContext);
+ creator.putDoubleMV(doubles);
+ }
}
+ break;
}
- break;
- }
- case BYTES: {
- for (int i = 0; i < numDocs; i++) {
- if (isSVColumn) {
- byte[] val = reader.getBytes(i, readerContext);
- creator.putBytes(val);
- } else {
- byte[][] bytesArray = reader.getBytesMV(i, readerContext);
- creator.putBytesMV(bytesArray);
+ case STRING: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ String val = reader.getString(i, readerContext);
+ creator.putString(val);
+ } else {
+ String[] strings = reader.getStringMV(i, readerContext);
+ creator.putStringMV(strings);
+ }
}
+ break;
}
- break;
- }
- case BIG_DECIMAL: {
- for (int i = 0; i < numDocs; i++) {
+ case BYTES: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ byte[] val = reader.getBytes(i, readerContext);
+ creator.putBytes(val);
+ } else {
+ byte[][] bytesArray = reader.getBytesMV(i, readerContext);
+ creator.putBytesMV(bytesArray);
+ }
+ }
+ break;
+ }
+ case BIG_DECIMAL: {
Preconditions.checkState(isSVColumn, "BigDecimal is not supported
for MV columns");
- BigDecimal val = reader.getBigDecimal(i, readerContext);
- creator.putBigDecimal(val);
+ for (int i = 0; i < numDocs; i++) {
+ BigDecimal val = reader.getBigDecimal(i, readerContext);
+ creator.putBigDecimal(val);
+ }
+ break;
}
- break;
+ default:
+ throw new IllegalStateException();
+ }
+ }
+ }
+
+ private void enableDictionary(String column, SegmentDirectory.Writer
segmentWriter,
+ IndexCreatorProvider indexCreatorProvider)
+ throws Exception {
+ Preconditions.checkState(_segmentMetadata.getVersion() ==
SegmentVersion.v3);
+ ColumnMetadata existingColMetadata =
_segmentMetadata.getColumnMetadataFor(column);
+ Preconditions.checkState(!existingColMetadata.hasDictionary(),
+ "Cannot rewrite dictionary enabled forward index. Dictionary already
exists for column:" + column);
+ boolean isSingleValue = existingColMetadata.isSingleValue();
+
+ File indexDir = _segmentMetadata.getIndexDir();
+ String segmentName = _segmentMetadata.getName();
+ File inProgress = new File(indexDir, column + ".dict.inprogress");
+ File dictionaryFile = new File(indexDir, column +
V1Constants.Dict.FILE_EXTENSION);
+ String fwdIndexFileExtension;
+ if (isSingleValue) {
+ fwdIndexFileExtension =
+ existingColMetadata.isSorted() ?
V1Constants.Indexes.SORTED_SV_FORWARD_INDEX_FILE_EXTENSION
+ : V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION;
+ } else {
+ fwdIndexFileExtension =
V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION;
+ }
+ File fwdIndexFile = new File(indexDir, column + fwdIndexFileExtension);
+
+ if (!inProgress.exists()) {
+ // Marker file does not exist, which means last run ended normally.
+ // Create a marker file.
+ FileUtils.touch(inProgress);
+ } else {
+ // Marker file exists, which means last run was interrupted.
+ // Remove forward index and dictionary files if they exist.
+ FileUtils.deleteQuietly(fwdIndexFile);
+ FileUtils.deleteQuietly(dictionaryFile);
+ }
+
+ LOGGER.info("Creating a new dictionary for segment={} and column={}",
segmentName, column);
+ SegmentDictionaryCreator dictionaryCreator = buildDictionary(column,
existingColMetadata, segmentWriter);
+ LoaderUtils.writeIndexToV3Format(segmentWriter, column, dictionaryFile,
ColumnIndexType.DICTIONARY);
+
+ LOGGER.info("Built dictionary. Rewriting dictionary enabled forward index
for segment={} and column={}",
+ segmentName, column);
+ writeDictEnabledForwardIndex(column, existingColMetadata, segmentWriter,
indexDir, indexCreatorProvider,
+ dictionaryCreator);
+ // We used the existing forward index to generate a new forward index. The
existing forward index will be in V3
+ // format and the new forward index will be in V1 format. Remove the
existing forward index as it is not needed
+ // anymore. Note that removeIndex() will only mark an index for removal
and remove the in-memory state. The
+ // actual cleanup from columns.psf file will happen when
singleFileIndexDirectory.cleanupRemovedIndices() is
+ // called during segmentWriter.close().
+ segmentWriter.removeIndex(column, ColumnIndexType.FORWARD_INDEX);
+ LoaderUtils.writeIndexToV3Format(segmentWriter, column, fwdIndexFile,
ColumnIndexType.FORWARD_INDEX);
+
+ LOGGER.info("Created forwardIndex. Updating metadata properties for
segment={} and column={}", segmentName, column);
+ Map<String, String> metadataProperties = new HashMap<>();
+ metadataProperties.put(getKeyFor(column, HAS_DICTIONARY),
String.valueOf(true));
+ metadataProperties.put(getKeyFor(column, DICTIONARY_ELEMENT_SIZE),
+ String.valueOf(dictionaryCreator.getNumBytesPerEntry()));
+ updateMetadataProperties(indexDir, metadataProperties);
+
+ // We remove indexes that have to be rewritten when a dictEnabled is
toggled. Note that the respective index
+ // handler will take care of recreating the index.
+ removeDictRelatedIndexes(column, segmentWriter);
+
+ // Delete the marker file.
+ FileUtils.deleteQuietly(inProgress);
+
+ LOGGER.info("Created dictionary based forward index for segment: {},
column: {}", segmentName, column);
+ }
+
+ private SegmentDictionaryCreator buildDictionary(String column,
ColumnMetadata existingColMetadata,
+ SegmentDirectory.Writer segmentWriter)
+ throws Exception {
+ int numDocs = existingColMetadata.getTotalDocs();
+ // SegmentPartitionConfig is not relevant for rewrites.
+ StatsCollectorConfig statsCollectorConfig =
+ new StatsCollectorConfig(_indexLoadingConfig.getTableConfig(),
_schema, null);
+ AbstractColumnStatisticsCollector statsCollector;
+
+ try (ForwardIndexReader reader =
LoaderUtils.getForwardIndexReader(segmentWriter, existingColMetadata)) {
+ boolean isSVColumn = reader.isSingleValue();
+
+ switch (reader.getStoredType()) {
+ case INT:
+ statsCollector = new IntColumnPreIndexStatsCollector(column,
statsCollectorConfig);
+ break;
+ case LONG:
+ statsCollector = new LongColumnPreIndexStatsCollector(column,
statsCollectorConfig);
+ break;
+ case FLOAT:
+ statsCollector = new FloatColumnPreIndexStatsCollector(column,
statsCollectorConfig);
+ break;
+ case DOUBLE:
+ statsCollector = new DoubleColumnPreIndexStatsCollector(column,
statsCollectorConfig);
+ break;
+ case STRING:
+ statsCollector = new StringColumnPreIndexStatsCollector(column,
statsCollectorConfig);
+ break;
+ case BYTES:
+ statsCollector = new BytesColumnPredIndexStatsCollector(column,
statsCollectorConfig);
+ break;
+ case BIG_DECIMAL:
+ Preconditions.checkState(isSVColumn, "BigDecimal is not supported
for MV columns");
+ statsCollector = new BigDecimalColumnPreIndexStatsCollector(column,
statsCollectorConfig);
+ break;
+ default:
+ throw new IllegalStateException();
+ }
+
+ Preconditions.checkState(statsCollector != null);
+ // Note: Special Null handling is not necessary here. This is because,
the existing default null value in the
+ // raw forwardIndex will be retained as such while created the
dictionary and dict-based forward index. Also,
+ // null value vectors maintain a bitmap of docIds. No handling is
necessary there.
+ PinotSegmentColumnReader columnReader =
+ new PinotSegmentColumnReader(reader, null, null,
existingColMetadata.getMaxNumberOfMultiValues());
+ for (int i = 0; i < numDocs; i++) {
+ Object obj = columnReader.getValue(i);
+ statsCollector.collect(obj);
+ }
+ statsCollector.seal();
+
+ boolean useVarLength =
SegmentIndexCreationDriverImpl.shouldUseVarLengthDictionary(column,
+ _indexLoadingConfig.getVarLengthDictionaryColumns(),
reader.getStoredType(), statsCollector);
+ SegmentDictionaryCreator dictionaryCreator =
+ new SegmentDictionaryCreator(existingColMetadata.getFieldSpec(),
_segmentMetadata.getIndexDir(),
+ useVarLength);
+
+ dictionaryCreator.build(statsCollector.getUniqueValuesSet());
+ return dictionaryCreator;
+ }
+ }
+
+ private void writeDictEnabledForwardIndex(String column, ColumnMetadata
existingColMetadata,
+ SegmentDirectory.Writer segmentWriter, File indexDir,
IndexCreatorProvider indexCreatorProvider,
+ SegmentDictionaryCreator dictionaryCreator)
+ throws Exception {
+ try (ForwardIndexReader reader =
LoaderUtils.getForwardIndexReader(segmentWriter, existingColMetadata)) {
+ int lengthOfLongestEntry = reader.getLengthOfLongestEntry();
+ IndexCreationContext.Builder builder =
+
IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(existingColMetadata)
+ .withLengthOfLongestEntry(lengthOfLongestEntry);
+ // existingColMetadata has dictEnable=false. Overwrite the value.
+ builder.withDictionary(true);
+ IndexCreationContext.Forward context =
+ builder.build().forForwardIndex(null,
_indexLoadingConfig.getColumnProperties());
+
+ try (ForwardIndexCreator creator =
indexCreatorProvider.newForwardIndexCreator(context)) {
+ int numDocs = existingColMetadata.getTotalDocs();
+ forwardIndexWriterHelper(column, existingColMetadata, reader, creator,
numDocs, dictionaryCreator);
}
- default:
- throw new IllegalStateException();
}
}
+
+ private void removeDictRelatedIndexes(String column, SegmentDirectory.Writer
segmentWriter) {
+ // TODO: Move this logic as a static function in each index creator.
+ segmentWriter.removeIndex(column, ColumnIndexType.RANGE_INDEX);
+ }
+
+ private void updateMetadataProperties(File indexDir, Map<String, String>
metadataProperties)
+ throws Exception {
+ File v3Dir = SegmentDirectoryPaths.segmentDirectoryFor(indexDir,
SegmentVersion.v3);
+ File metadataFile = new File(v3Dir,
V1Constants.MetadataKeys.METADATA_FILE_NAME);
+ PropertiesConfiguration properties = new
PropertiesConfiguration(metadataFile);
+
+ for (Map.Entry<String, String> entry : metadataProperties.entrySet()) {
+ properties.setProperty(entry.getKey(), entry.getValue());
+ }
+
+ properties.save();
Review Comment:
Don't we need to pass in the output stream to save ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]