somandal commented on code in PR #9678:
URL: https://github.com/apache/pinot/pull/9678#discussion_r1010639412
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java:
##########
@@ -416,15 +416,11 @@ void buildIndexCreationInfo()
String columnName = fieldSpec.getName();
DataType storedType = fieldSpec.getDataType().getStoredType();
ColumnStatistics columnProfile =
_segmentStats.getColumnProfileFor(columnName);
- boolean useVarLengthDictionary =
varLengthDictionaryColumns.contains(columnName);
+ boolean useVarLengthDictionary =
+ varLengthDictionaryColumns.contains(columnName) ||
shouldUseVarLengthDictionary(storedType, columnProfile);
Review Comment:
how about you move the first part of the OR,
`varLengthDictionaryColumns.contains(columnName)`, to the function too?
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java:
##########
@@ -26,23 +26,43 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.io.FileUtils;
+import
org.apache.pinot.segment.local.segment.creator.impl.SegmentDictionaryCreator;
+import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import
org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueVarByteRawIndexCreator;
+import
org.apache.pinot.segment.local.segment.creator.impl.stats.AbstractColumnStatisticsCollector;
+import
org.apache.pinot.segment.local.segment.creator.impl.stats.BigDecimalColumnPreIndexStatsCollector;
+import
org.apache.pinot.segment.local.segment.creator.impl.stats.BytesColumnPredIndexStatsCollector;
+import
org.apache.pinot.segment.local.segment.creator.impl.stats.DoubleColumnPreIndexStatsCollector;
+import
org.apache.pinot.segment.local.segment.creator.impl.stats.FloatColumnPreIndexStatsCollector;
+import
org.apache.pinot.segment.local.segment.creator.impl.stats.IntColumnPreIndexStatsCollector;
+import
org.apache.pinot.segment.local.segment.creator.impl.stats.LongColumnPreIndexStatsCollector;
+import
org.apache.pinot.segment.local.segment.creator.impl.stats.StringColumnPreIndexStatsCollector;
+import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.segment.spi.creator.IndexCreationContext;
import org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
+import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
import org.apache.pinot.segment.spi.store.ColumnIndexType;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
+import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
+import org.apache.pinot.spi.data.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static
org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Column.DICTIONARY_ELEMENT_SIZE;
Review Comment:
nit: just add import for `MetadataKeys` or `MetadataKeys.Column`?
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java:
##########
@@ -271,118 +306,293 @@ private void rewriteRawSVForwardIndex(String column,
ColumnMetadata existingColM
.forForwardIndex(newCompressionType,
_indexLoadingConfig.getColumnProperties());
try (ForwardIndexCreator creator =
indexCreatorProvider.newForwardIndexCreator(context)) {
- // If creator stored type and the reader stored type do not match,
throw an exception.
if (!reader.getStoredType().equals(creator.getValueType())) {
+ // Creator stored type should match reader stored type for raw
columns. We do not support changing datatypes.
String failureMsg =
"Unsupported operation to change datatype for column=" + column
+ " from " + reader.getStoredType()
.toString() + " to " + creator.getValueType().toString();
throw new UnsupportedOperationException(failureMsg);
}
int numDocs = existingColMetadata.getTotalDocs();
- forwardIndexWriterHelper(column, reader, creator, numDocs);
+ forwardIndexWriterHelper(column, existingColMetadata, reader, creator,
numDocs, null);
}
}
}
- private void forwardIndexWriterHelper(String column, ForwardIndexReader
reader, ForwardIndexCreator creator,
- int numDocs) {
- // If creator stored type should match reader stored type. We do not
support changing datatypes.
- if (!reader.getStoredType().equals(creator.getValueType())) {
- String failureMsg =
- "Unsupported operation to change datatype for column=" + column + "
from " + reader.getStoredType().toString()
- + " to " + creator.getValueType().toString();
- throw new UnsupportedOperationException(failureMsg);
- }
-
+ private void forwardIndexWriterHelper(String column, ColumnMetadata
existingColumnMetadata, ForwardIndexReader reader,
+ ForwardIndexCreator creator, int numDocs, @Nullable
SegmentDictionaryCreator dictionaryCreator) {
ForwardIndexReaderContext readerContext = reader.createContext();
boolean isSVColumn = reader.isSingleValue();
- switch (reader.getStoredType()) {
- // JSON fields are either stored as string or bytes. No special handling
is needed because we make this
- // decision based on the storedType of the reader.
- case INT: {
- for (int i = 0; i < numDocs; i++) {
- if (isSVColumn) {
- int val = reader.getInt(i, readerContext);
- creator.putInt(val);
- } else {
- int[] ints = reader.getIntMV(i, readerContext);
- creator.putIntMV(ints);
- }
+ if (dictionaryCreator != null) {
+ int maxNumValuesPerEntry =
existingColumnMetadata.getMaxNumberOfMultiValues();
+ PinotSegmentColumnReader columnReader = new
PinotSegmentColumnReader(reader, null, null, maxNumValuesPerEntry);
+
+ for (int i = 0; i < numDocs; i++) {
+ Object obj = columnReader.getValue(i);
+
+ if (isSVColumn) {
+ int dictId = dictionaryCreator.indexOfSV(obj);
+ creator.putDictId(dictId);
+ } else {
+ int[] dictIds = dictionaryCreator.indexOfMV(obj);
+ creator.putDictIdMV(dictIds);
}
- break;
}
- case LONG: {
- for (int i = 0; i < numDocs; i++) {
- if (isSVColumn) {
- long val = reader.getLong(i, readerContext);
- creator.putLong(val);
- } else {
- long[] longs = reader.getLongMV(i, readerContext);
- creator.putLongMV(longs);
+ } else {
+ switch (reader.getStoredType()) {
+ // JSON fields are either stored as string or bytes. No special
handling is needed because we make this
+ // decision based on the storedType of the reader.
+ case INT: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ int val = reader.getInt(i, readerContext);
+ creator.putInt(val);
+ } else {
+ int[] ints = reader.getIntMV(i, readerContext);
+ creator.putIntMV(ints);
+ }
}
+ break;
}
- break;
- }
- case FLOAT: {
- for (int i = 0; i < numDocs; i++) {
- if (isSVColumn) {
- float val = reader.getFloat(i, readerContext);
- creator.putFloat(val);
- } else {
- float[] floats = reader.getFloatMV(i, readerContext);
- creator.putFloatMV(floats);
+ case LONG: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ long val = reader.getLong(i, readerContext);
+ creator.putLong(val);
+ } else {
+ long[] longs = reader.getLongMV(i, readerContext);
+ creator.putLongMV(longs);
+ }
}
+ break;
}
- break;
- }
- case DOUBLE: {
- for (int i = 0; i < numDocs; i++) {
- if (isSVColumn) {
- double val = reader.getDouble(i, readerContext);
- creator.putDouble(val);
- } else {
- double[] doubles = reader.getDoubleMV(i, readerContext);
- creator.putDoubleMV(doubles);
+ case FLOAT: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ float val = reader.getFloat(i, readerContext);
+ creator.putFloat(val);
+ } else {
+ float[] floats = reader.getFloatMV(i, readerContext);
+ creator.putFloatMV(floats);
+ }
}
+ break;
}
- break;
- }
- case STRING: {
- for (int i = 0; i < numDocs; i++) {
- if (isSVColumn) {
- String val = reader.getString(i, readerContext);
- creator.putString(val);
- } else {
- String[] strings = reader.getStringMV(i, readerContext);
- creator.putStringMV(strings);
+ case DOUBLE: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ double val = reader.getDouble(i, readerContext);
+ creator.putDouble(val);
+ } else {
+ double[] doubles = reader.getDoubleMV(i, readerContext);
+ creator.putDoubleMV(doubles);
+ }
}
+ break;
}
- break;
- }
- case BYTES: {
- for (int i = 0; i < numDocs; i++) {
- if (isSVColumn) {
- byte[] val = reader.getBytes(i, readerContext);
- creator.putBytes(val);
- } else {
- byte[][] bytesArray = reader.getBytesMV(i, readerContext);
- creator.putBytesMV(bytesArray);
+ case STRING: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ String val = reader.getString(i, readerContext);
+ creator.putString(val);
+ } else {
+ String[] strings = reader.getStringMV(i, readerContext);
+ creator.putStringMV(strings);
+ }
}
+ break;
}
- break;
- }
- case BIG_DECIMAL: {
- for (int i = 0; i < numDocs; i++) {
+ case BYTES: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ byte[] val = reader.getBytes(i, readerContext);
+ creator.putBytes(val);
+ } else {
+ byte[][] bytesArray = reader.getBytesMV(i, readerContext);
+ creator.putBytesMV(bytesArray);
+ }
+ }
+ break;
+ }
+ case BIG_DECIMAL: {
Preconditions.checkState(isSVColumn, "BigDecimal is not supported
for MV columns");
- BigDecimal val = reader.getBigDecimal(i, readerContext);
- creator.putBigDecimal(val);
+ for (int i = 0; i < numDocs; i++) {
+ BigDecimal val = reader.getBigDecimal(i, readerContext);
+ creator.putBigDecimal(val);
+ }
+ break;
}
- break;
+ default:
+ throw new IllegalStateException();
+ }
+ }
+ }
+
+ private void enableDictionary(String column, SegmentDirectory.Writer
segmentWriter,
+ IndexCreatorProvider indexCreatorProvider)
+ throws Exception {
+ Preconditions.checkState(_segmentMetadata.getVersion() ==
SegmentVersion.v3);
Review Comment:
nit: This check needs to be done for every function called to take action.
Move it outside so that once we add support for V1, we don't need to remove the
check from too many places?
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java:
##########
@@ -271,118 +306,293 @@ private void rewriteRawSVForwardIndex(String column,
ColumnMetadata existingColM
.forForwardIndex(newCompressionType,
_indexLoadingConfig.getColumnProperties());
try (ForwardIndexCreator creator =
indexCreatorProvider.newForwardIndexCreator(context)) {
- // If creator stored type and the reader stored type do not match,
throw an exception.
if (!reader.getStoredType().equals(creator.getValueType())) {
+ // Creator stored type should match reader stored type for raw
columns. We do not support changing datatypes.
String failureMsg =
"Unsupported operation to change datatype for column=" + column
+ " from " + reader.getStoredType()
.toString() + " to " + creator.getValueType().toString();
throw new UnsupportedOperationException(failureMsg);
}
int numDocs = existingColMetadata.getTotalDocs();
- forwardIndexWriterHelper(column, reader, creator, numDocs);
+ forwardIndexWriterHelper(column, existingColMetadata, reader, creator,
numDocs, null);
}
}
}
- private void forwardIndexWriterHelper(String column, ForwardIndexReader
reader, ForwardIndexCreator creator,
- int numDocs) {
- // If creator stored type should match reader stored type. We do not
support changing datatypes.
- if (!reader.getStoredType().equals(creator.getValueType())) {
- String failureMsg =
- "Unsupported operation to change datatype for column=" + column + "
from " + reader.getStoredType().toString()
- + " to " + creator.getValueType().toString();
- throw new UnsupportedOperationException(failureMsg);
- }
-
+ private void forwardIndexWriterHelper(String column, ColumnMetadata
existingColumnMetadata, ForwardIndexReader reader,
+ ForwardIndexCreator creator, int numDocs, @Nullable
SegmentDictionaryCreator dictionaryCreator) {
ForwardIndexReaderContext readerContext = reader.createContext();
boolean isSVColumn = reader.isSingleValue();
- switch (reader.getStoredType()) {
- // JSON fields are either stored as string or bytes. No special handling
is needed because we make this
- // decision based on the storedType of the reader.
- case INT: {
- for (int i = 0; i < numDocs; i++) {
- if (isSVColumn) {
- int val = reader.getInt(i, readerContext);
- creator.putInt(val);
- } else {
- int[] ints = reader.getIntMV(i, readerContext);
- creator.putIntMV(ints);
- }
+ if (dictionaryCreator != null) {
+ int maxNumValuesPerEntry =
existingColumnMetadata.getMaxNumberOfMultiValues();
+ PinotSegmentColumnReader columnReader = new
PinotSegmentColumnReader(reader, null, null, maxNumValuesPerEntry);
+
+ for (int i = 0; i < numDocs; i++) {
+ Object obj = columnReader.getValue(i);
+
+ if (isSVColumn) {
+ int dictId = dictionaryCreator.indexOfSV(obj);
+ creator.putDictId(dictId);
+ } else {
+ int[] dictIds = dictionaryCreator.indexOfMV(obj);
+ creator.putDictIdMV(dictIds);
}
- break;
}
- case LONG: {
- for (int i = 0; i < numDocs; i++) {
- if (isSVColumn) {
- long val = reader.getLong(i, readerContext);
- creator.putLong(val);
- } else {
- long[] longs = reader.getLongMV(i, readerContext);
- creator.putLongMV(longs);
+ } else {
+ switch (reader.getStoredType()) {
+ // JSON fields are either stored as string or bytes. No special
handling is needed because we make this
+ // decision based on the storedType of the reader.
+ case INT: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ int val = reader.getInt(i, readerContext);
+ creator.putInt(val);
+ } else {
+ int[] ints = reader.getIntMV(i, readerContext);
+ creator.putIntMV(ints);
+ }
}
+ break;
}
- break;
- }
- case FLOAT: {
- for (int i = 0; i < numDocs; i++) {
- if (isSVColumn) {
- float val = reader.getFloat(i, readerContext);
- creator.putFloat(val);
- } else {
- float[] floats = reader.getFloatMV(i, readerContext);
- creator.putFloatMV(floats);
+ case LONG: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ long val = reader.getLong(i, readerContext);
+ creator.putLong(val);
+ } else {
+ long[] longs = reader.getLongMV(i, readerContext);
+ creator.putLongMV(longs);
+ }
}
+ break;
}
- break;
- }
- case DOUBLE: {
- for (int i = 0; i < numDocs; i++) {
- if (isSVColumn) {
- double val = reader.getDouble(i, readerContext);
- creator.putDouble(val);
- } else {
- double[] doubles = reader.getDoubleMV(i, readerContext);
- creator.putDoubleMV(doubles);
+ case FLOAT: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ float val = reader.getFloat(i, readerContext);
+ creator.putFloat(val);
+ } else {
+ float[] floats = reader.getFloatMV(i, readerContext);
+ creator.putFloatMV(floats);
+ }
}
+ break;
}
- break;
- }
- case STRING: {
- for (int i = 0; i < numDocs; i++) {
- if (isSVColumn) {
- String val = reader.getString(i, readerContext);
- creator.putString(val);
- } else {
- String[] strings = reader.getStringMV(i, readerContext);
- creator.putStringMV(strings);
+ case DOUBLE: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ double val = reader.getDouble(i, readerContext);
+ creator.putDouble(val);
+ } else {
+ double[] doubles = reader.getDoubleMV(i, readerContext);
+ creator.putDoubleMV(doubles);
+ }
}
+ break;
}
- break;
- }
- case BYTES: {
- for (int i = 0; i < numDocs; i++) {
- if (isSVColumn) {
- byte[] val = reader.getBytes(i, readerContext);
- creator.putBytes(val);
- } else {
- byte[][] bytesArray = reader.getBytesMV(i, readerContext);
- creator.putBytesMV(bytesArray);
+ case STRING: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ String val = reader.getString(i, readerContext);
+ creator.putString(val);
+ } else {
+ String[] strings = reader.getStringMV(i, readerContext);
+ creator.putStringMV(strings);
+ }
}
+ break;
}
- break;
- }
- case BIG_DECIMAL: {
- for (int i = 0; i < numDocs; i++) {
+ case BYTES: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ byte[] val = reader.getBytes(i, readerContext);
+ creator.putBytes(val);
+ } else {
+ byte[][] bytesArray = reader.getBytesMV(i, readerContext);
+ creator.putBytesMV(bytesArray);
+ }
+ }
+ break;
+ }
+ case BIG_DECIMAL: {
Preconditions.checkState(isSVColumn, "BigDecimal is not supported
for MV columns");
- BigDecimal val = reader.getBigDecimal(i, readerContext);
- creator.putBigDecimal(val);
+ for (int i = 0; i < numDocs; i++) {
+ BigDecimal val = reader.getBigDecimal(i, readerContext);
+ creator.putBigDecimal(val);
+ }
+ break;
}
- break;
+ default:
+ throw new IllegalStateException();
+ }
+ }
+ }
+
+ private void enableDictionary(String column, SegmentDirectory.Writer
segmentWriter,
+ IndexCreatorProvider indexCreatorProvider)
+ throws Exception {
+ Preconditions.checkState(_segmentMetadata.getVersion() ==
SegmentVersion.v3);
+ ColumnMetadata existingColMetadata =
_segmentMetadata.getColumnMetadataFor(column);
+ Preconditions.checkState(!existingColMetadata.hasDictionary(), "Existing
column already has dictionary.");
+ boolean isSingleValue = existingColMetadata.isSingleValue();
+
+ File indexDir = _segmentMetadata.getIndexDir();
+ String segmentName = _segmentMetadata.getName();
+ File inProgress = new File(indexDir, column + ".dict.inprogress");
+ File dictionaryFile = new File(indexDir, column +
V1Constants.Dict.FILE_EXTENSION);
+ String fwdIndexFileExtension;
+ if (isSingleValue) {
+ if (existingColMetadata.isSorted()) {
+ fwdIndexFileExtension =
V1Constants.Indexes.SORTED_SV_FORWARD_INDEX_FILE_EXTENSION;
+ } else {
+ fwdIndexFileExtension =
V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION;
+ }
Review Comment:
nit: simplify using conditionals?
```
fwdIndexFileExtension = existingColMetadata.isSorted() ?
V1Constants.Indexes.SORTED_SV_FORWARD_INDEX_FILE_EXTENSION :
V1Constants.Indexes.SORTED_SV_FORWARD_INDEX_FILE_EXTENSION;
```
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java:
##########
@@ -271,118 +306,293 @@ private void rewriteRawSVForwardIndex(String column,
ColumnMetadata existingColM
.forForwardIndex(newCompressionType,
_indexLoadingConfig.getColumnProperties());
try (ForwardIndexCreator creator =
indexCreatorProvider.newForwardIndexCreator(context)) {
- // If creator stored type and the reader stored type do not match,
throw an exception.
if (!reader.getStoredType().equals(creator.getValueType())) {
+ // Creator stored type should match reader stored type for raw
columns. We do not support changing datatypes.
String failureMsg =
"Unsupported operation to change datatype for column=" + column
+ " from " + reader.getStoredType()
.toString() + " to " + creator.getValueType().toString();
throw new UnsupportedOperationException(failureMsg);
}
int numDocs = existingColMetadata.getTotalDocs();
- forwardIndexWriterHelper(column, reader, creator, numDocs);
+ forwardIndexWriterHelper(column, existingColMetadata, reader, creator,
numDocs, null);
}
}
}
- private void forwardIndexWriterHelper(String column, ForwardIndexReader
reader, ForwardIndexCreator creator,
- int numDocs) {
- // If creator stored type should match reader stored type. We do not
support changing datatypes.
- if (!reader.getStoredType().equals(creator.getValueType())) {
- String failureMsg =
- "Unsupported operation to change datatype for column=" + column + "
from " + reader.getStoredType().toString()
- + " to " + creator.getValueType().toString();
- throw new UnsupportedOperationException(failureMsg);
- }
-
+ private void forwardIndexWriterHelper(String column, ColumnMetadata
existingColumnMetadata, ForwardIndexReader reader,
+ ForwardIndexCreator creator, int numDocs, @Nullable
SegmentDictionaryCreator dictionaryCreator) {
ForwardIndexReaderContext readerContext = reader.createContext();
boolean isSVColumn = reader.isSingleValue();
- switch (reader.getStoredType()) {
- // JSON fields are either stored as string or bytes. No special handling
is needed because we make this
- // decision based on the storedType of the reader.
- case INT: {
- for (int i = 0; i < numDocs; i++) {
- if (isSVColumn) {
- int val = reader.getInt(i, readerContext);
- creator.putInt(val);
- } else {
- int[] ints = reader.getIntMV(i, readerContext);
- creator.putIntMV(ints);
- }
+ if (dictionaryCreator != null) {
+ int maxNumValuesPerEntry =
existingColumnMetadata.getMaxNumberOfMultiValues();
+ PinotSegmentColumnReader columnReader = new
PinotSegmentColumnReader(reader, null, null, maxNumValuesPerEntry);
+
+ for (int i = 0; i < numDocs; i++) {
+ Object obj = columnReader.getValue(i);
+
+ if (isSVColumn) {
+ int dictId = dictionaryCreator.indexOfSV(obj);
+ creator.putDictId(dictId);
+ } else {
+ int[] dictIds = dictionaryCreator.indexOfMV(obj);
+ creator.putDictIdMV(dictIds);
}
- break;
}
- case LONG: {
- for (int i = 0; i < numDocs; i++) {
- if (isSVColumn) {
- long val = reader.getLong(i, readerContext);
- creator.putLong(val);
- } else {
- long[] longs = reader.getLongMV(i, readerContext);
- creator.putLongMV(longs);
+ } else {
+ switch (reader.getStoredType()) {
+ // JSON fields are either stored as string or bytes. No special
handling is needed because we make this
+ // decision based on the storedType of the reader.
+ case INT: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ int val = reader.getInt(i, readerContext);
+ creator.putInt(val);
+ } else {
+ int[] ints = reader.getIntMV(i, readerContext);
+ creator.putIntMV(ints);
+ }
}
+ break;
}
- break;
- }
- case FLOAT: {
- for (int i = 0; i < numDocs; i++) {
- if (isSVColumn) {
- float val = reader.getFloat(i, readerContext);
- creator.putFloat(val);
- } else {
- float[] floats = reader.getFloatMV(i, readerContext);
- creator.putFloatMV(floats);
+ case LONG: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ long val = reader.getLong(i, readerContext);
+ creator.putLong(val);
+ } else {
+ long[] longs = reader.getLongMV(i, readerContext);
+ creator.putLongMV(longs);
+ }
}
+ break;
}
- break;
- }
- case DOUBLE: {
- for (int i = 0; i < numDocs; i++) {
- if (isSVColumn) {
- double val = reader.getDouble(i, readerContext);
- creator.putDouble(val);
- } else {
- double[] doubles = reader.getDoubleMV(i, readerContext);
- creator.putDoubleMV(doubles);
+ case FLOAT: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ float val = reader.getFloat(i, readerContext);
+ creator.putFloat(val);
+ } else {
+ float[] floats = reader.getFloatMV(i, readerContext);
+ creator.putFloatMV(floats);
+ }
}
+ break;
}
- break;
- }
- case STRING: {
- for (int i = 0; i < numDocs; i++) {
- if (isSVColumn) {
- String val = reader.getString(i, readerContext);
- creator.putString(val);
- } else {
- String[] strings = reader.getStringMV(i, readerContext);
- creator.putStringMV(strings);
+ case DOUBLE: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ double val = reader.getDouble(i, readerContext);
+ creator.putDouble(val);
+ } else {
+ double[] doubles = reader.getDoubleMV(i, readerContext);
+ creator.putDoubleMV(doubles);
+ }
}
+ break;
}
- break;
- }
- case BYTES: {
- for (int i = 0; i < numDocs; i++) {
- if (isSVColumn) {
- byte[] val = reader.getBytes(i, readerContext);
- creator.putBytes(val);
- } else {
- byte[][] bytesArray = reader.getBytesMV(i, readerContext);
- creator.putBytesMV(bytesArray);
+ case STRING: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ String val = reader.getString(i, readerContext);
+ creator.putString(val);
+ } else {
+ String[] strings = reader.getStringMV(i, readerContext);
+ creator.putStringMV(strings);
+ }
}
+ break;
}
- break;
- }
- case BIG_DECIMAL: {
- for (int i = 0; i < numDocs; i++) {
+ case BYTES: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ byte[] val = reader.getBytes(i, readerContext);
+ creator.putBytes(val);
+ } else {
+ byte[][] bytesArray = reader.getBytesMV(i, readerContext);
+ creator.putBytesMV(bytesArray);
+ }
+ }
+ break;
+ }
+ case BIG_DECIMAL: {
Preconditions.checkState(isSVColumn, "BigDecimal is not supported
for MV columns");
- BigDecimal val = reader.getBigDecimal(i, readerContext);
- creator.putBigDecimal(val);
+ for (int i = 0; i < numDocs; i++) {
+ BigDecimal val = reader.getBigDecimal(i, readerContext);
+ creator.putBigDecimal(val);
+ }
+ break;
}
- break;
+ default:
+ throw new IllegalStateException();
+ }
+ }
+ }
+
+ private void enableDictionary(String column, SegmentDirectory.Writer
segmentWriter,
+ IndexCreatorProvider indexCreatorProvider)
+ throws Exception {
+ Preconditions.checkState(_segmentMetadata.getVersion() ==
SegmentVersion.v3);
+ ColumnMetadata existingColMetadata =
_segmentMetadata.getColumnMetadataFor(column);
+ Preconditions.checkState(!existingColMetadata.hasDictionary(), "Existing
column already has dictionary.");
Review Comment:
nit: Modify the error message to indicate what operation we are doing? e.g.
"Cannot rewrite dictionary enabled forward index, dictionary already exists for
column: {}"
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java:
##########
@@ -271,118 +306,293 @@ private void rewriteRawSVForwardIndex(String column,
ColumnMetadata existingColM
.forForwardIndex(newCompressionType,
_indexLoadingConfig.getColumnProperties());
try (ForwardIndexCreator creator =
indexCreatorProvider.newForwardIndexCreator(context)) {
- // If creator stored type and the reader stored type do not match,
throw an exception.
if (!reader.getStoredType().equals(creator.getValueType())) {
+ // Creator stored type should match reader stored type for raw
columns. We do not support changing datatypes.
String failureMsg =
"Unsupported operation to change datatype for column=" + column
+ " from " + reader.getStoredType()
.toString() + " to " + creator.getValueType().toString();
throw new UnsupportedOperationException(failureMsg);
}
int numDocs = existingColMetadata.getTotalDocs();
- forwardIndexWriterHelper(column, reader, creator, numDocs);
+ forwardIndexWriterHelper(column, existingColMetadata, reader, creator,
numDocs, null);
}
}
}
- private void forwardIndexWriterHelper(String column, ForwardIndexReader
reader, ForwardIndexCreator creator,
- int numDocs) {
- // If creator stored type should match reader stored type. We do not
support changing datatypes.
- if (!reader.getStoredType().equals(creator.getValueType())) {
- String failureMsg =
- "Unsupported operation to change datatype for column=" + column + "
from " + reader.getStoredType().toString()
- + " to " + creator.getValueType().toString();
- throw new UnsupportedOperationException(failureMsg);
- }
-
+ private void forwardIndexWriterHelper(String column, ColumnMetadata
existingColumnMetadata, ForwardIndexReader reader,
+ ForwardIndexCreator creator, int numDocs, @Nullable
SegmentDictionaryCreator dictionaryCreator) {
ForwardIndexReaderContext readerContext = reader.createContext();
boolean isSVColumn = reader.isSingleValue();
- switch (reader.getStoredType()) {
- // JSON fields are either stored as string or bytes. No special handling
is needed because we make this
- // decision based on the storedType of the reader.
- case INT: {
- for (int i = 0; i < numDocs; i++) {
- if (isSVColumn) {
- int val = reader.getInt(i, readerContext);
- creator.putInt(val);
- } else {
- int[] ints = reader.getIntMV(i, readerContext);
- creator.putIntMV(ints);
- }
+ if (dictionaryCreator != null) {
+ int maxNumValuesPerEntry =
existingColumnMetadata.getMaxNumberOfMultiValues();
+ PinotSegmentColumnReader columnReader = new
PinotSegmentColumnReader(reader, null, null, maxNumValuesPerEntry);
+
+ for (int i = 0; i < numDocs; i++) {
+ Object obj = columnReader.getValue(i);
+
+ if (isSVColumn) {
+ int dictId = dictionaryCreator.indexOfSV(obj);
+ creator.putDictId(dictId);
+ } else {
+ int[] dictIds = dictionaryCreator.indexOfMV(obj);
+ creator.putDictIdMV(dictIds);
}
- break;
}
- case LONG: {
- for (int i = 0; i < numDocs; i++) {
- if (isSVColumn) {
- long val = reader.getLong(i, readerContext);
- creator.putLong(val);
- } else {
- long[] longs = reader.getLongMV(i, readerContext);
- creator.putLongMV(longs);
+ } else {
+ switch (reader.getStoredType()) {
+ // JSON fields are either stored as string or bytes. No special
handling is needed because we make this
+ // decision based on the storedType of the reader.
+ case INT: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ int val = reader.getInt(i, readerContext);
+ creator.putInt(val);
+ } else {
+ int[] ints = reader.getIntMV(i, readerContext);
+ creator.putIntMV(ints);
+ }
}
+ break;
}
- break;
- }
- case FLOAT: {
- for (int i = 0; i < numDocs; i++) {
- if (isSVColumn) {
- float val = reader.getFloat(i, readerContext);
- creator.putFloat(val);
- } else {
- float[] floats = reader.getFloatMV(i, readerContext);
- creator.putFloatMV(floats);
+ case LONG: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ long val = reader.getLong(i, readerContext);
+ creator.putLong(val);
+ } else {
+ long[] longs = reader.getLongMV(i, readerContext);
+ creator.putLongMV(longs);
+ }
}
+ break;
}
- break;
- }
- case DOUBLE: {
- for (int i = 0; i < numDocs; i++) {
- if (isSVColumn) {
- double val = reader.getDouble(i, readerContext);
- creator.putDouble(val);
- } else {
- double[] doubles = reader.getDoubleMV(i, readerContext);
- creator.putDoubleMV(doubles);
+ case FLOAT: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ float val = reader.getFloat(i, readerContext);
+ creator.putFloat(val);
+ } else {
+ float[] floats = reader.getFloatMV(i, readerContext);
+ creator.putFloatMV(floats);
+ }
}
+ break;
}
- break;
- }
- case STRING: {
- for (int i = 0; i < numDocs; i++) {
- if (isSVColumn) {
- String val = reader.getString(i, readerContext);
- creator.putString(val);
- } else {
- String[] strings = reader.getStringMV(i, readerContext);
- creator.putStringMV(strings);
+ case DOUBLE: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ double val = reader.getDouble(i, readerContext);
+ creator.putDouble(val);
+ } else {
+ double[] doubles = reader.getDoubleMV(i, readerContext);
+ creator.putDoubleMV(doubles);
+ }
}
+ break;
}
- break;
- }
- case BYTES: {
- for (int i = 0; i < numDocs; i++) {
- if (isSVColumn) {
- byte[] val = reader.getBytes(i, readerContext);
- creator.putBytes(val);
- } else {
- byte[][] bytesArray = reader.getBytesMV(i, readerContext);
- creator.putBytesMV(bytesArray);
+ case STRING: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ String val = reader.getString(i, readerContext);
+ creator.putString(val);
+ } else {
+ String[] strings = reader.getStringMV(i, readerContext);
+ creator.putStringMV(strings);
+ }
}
+ break;
}
- break;
- }
- case BIG_DECIMAL: {
- for (int i = 0; i < numDocs; i++) {
+ case BYTES: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ byte[] val = reader.getBytes(i, readerContext);
+ creator.putBytes(val);
+ } else {
+ byte[][] bytesArray = reader.getBytesMV(i, readerContext);
+ creator.putBytesMV(bytesArray);
+ }
+ }
+ break;
+ }
+ case BIG_DECIMAL: {
Preconditions.checkState(isSVColumn, "BigDecimal is not supported
for MV columns");
- BigDecimal val = reader.getBigDecimal(i, readerContext);
- creator.putBigDecimal(val);
+ for (int i = 0; i < numDocs; i++) {
+ BigDecimal val = reader.getBigDecimal(i, readerContext);
+ creator.putBigDecimal(val);
+ }
+ break;
}
- break;
+ default:
+ throw new IllegalStateException();
+ }
+ }
+ }
+
+ private void enableDictionary(String column, SegmentDirectory.Writer
segmentWriter,
+ IndexCreatorProvider indexCreatorProvider)
+ throws Exception {
+ Preconditions.checkState(_segmentMetadata.getVersion() ==
SegmentVersion.v3);
+ ColumnMetadata existingColMetadata =
_segmentMetadata.getColumnMetadataFor(column);
+ Preconditions.checkState(!existingColMetadata.hasDictionary(), "Existing
column already has dictionary.");
+ boolean isSingleValue = existingColMetadata.isSingleValue();
+
+ File indexDir = _segmentMetadata.getIndexDir();
+ String segmentName = _segmentMetadata.getName();
+ File inProgress = new File(indexDir, column + ".dict.inprogress");
+ File dictionaryFile = new File(indexDir, column +
V1Constants.Dict.FILE_EXTENSION);
+ String fwdIndexFileExtension;
+ if (isSingleValue) {
+ if (existingColMetadata.isSorted()) {
+ fwdIndexFileExtension =
V1Constants.Indexes.SORTED_SV_FORWARD_INDEX_FILE_EXTENSION;
+ } else {
+ fwdIndexFileExtension =
V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION;
+ }
+ } else {
+ fwdIndexFileExtension =
V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION;
+ }
+ File fwdIndexFile = new File(indexDir, column + fwdIndexFileExtension);
+
+ if (!inProgress.exists()) {
+ // Marker file does not exist, which means last run ended normally.
+ // Create a marker file.
+ FileUtils.touch(inProgress);
+ } else {
+ // Marker file exists, which means last run was interrupted.
+ // Remove forward index and dictionary files if they exist.
+ FileUtils.deleteQuietly(fwdIndexFile);
+ FileUtils.deleteQuietly(dictionaryFile);
+ }
+
+ LOGGER.info("Creating a new dictionary for segment={} and column={}",
segmentName, column);
+ SegmentDictionaryCreator dictionaryCreator = buildDictionary(column,
existingColMetadata, segmentWriter);
+ LoaderUtils.writeIndexToV3Format(segmentWriter, column, dictionaryFile,
ColumnIndexType.DICTIONARY);
+
+ LOGGER.info("Built dictionary. Rewriting dictionary enabled forward index
for segment={} and column={}",
+ segmentName, column);
+ writeDictEnabledForwardIndex(column, existingColMetadata, segmentWriter,
indexDir, indexCreatorProvider,
+ dictionaryCreator);
+ // We used the existing forward index to generate a new forward index. The
existing forward index will be in V3
+ // format and the new forward index will be in V1 format. Remove the
existing forward index as it is not needed
+ // anymore. Note that removeIndex() will only mark an index for removal
and remove the in-memory state. The
+ // actual cleanup from columns.psf file will happen when
singleFileIndexDirectory.cleanupRemovedIndices() is
+ // called during segmentWriter.close().
+ segmentWriter.removeIndex(column, ColumnIndexType.FORWARD_INDEX);
+ LoaderUtils.writeIndexToV3Format(segmentWriter, column, fwdIndexFile,
ColumnIndexType.FORWARD_INDEX);
+
+ LOGGER.info("Created forwardIndex. Updating metadata properties for
segment={} and column={}", segmentName, column);
+ updateMetadataProperties(indexDir, column, dictionaryCreator);
+
+ // We remove indexes that have to be rewritten when a dictEnabled is
toggled. Note that the respective index
+ // handler will take care of recreating the index.
+ removeDictRelatedIndexes(column, segmentWriter);
+
+ // Delete the marker file.
+ FileUtils.deleteQuietly(inProgress);
+
+ LOGGER.info("Created dictionary based forward index for segment: {},
column: {}", segmentName, column);
+ }
+
+ private SegmentDictionaryCreator buildDictionary(String column,
ColumnMetadata existingColMetadata,
+ SegmentDirectory.Writer segmentWriter)
+ throws Exception {
+ int numDocs = existingColMetadata.getTotalDocs();
+ // SegmentPartitionConfig is not relevant for rewrites.
+ Preconditions.checkState(_indexLoadingConfig.getTableConfig() != null);
+ StatsCollectorConfig statsCollectorConfig =
+ new StatsCollectorConfig(_indexLoadingConfig.getTableConfig(),
_schema, null);
+ AbstractColumnStatisticsCollector statsCollector;
+
+ try (ForwardIndexReader reader =
LoaderUtils.getForwardIndexReader(segmentWriter, existingColMetadata)) {
+ boolean isSVColumn = reader.isSingleValue();
+
+ switch (reader.getStoredType()) {
+ case INT:
+ statsCollector = new IntColumnPreIndexStatsCollector(column,
statsCollectorConfig);
+ break;
+ case LONG:
+ statsCollector = new LongColumnPreIndexStatsCollector(column,
statsCollectorConfig);
+ break;
+ case FLOAT:
+ statsCollector = new FloatColumnPreIndexStatsCollector(column,
statsCollectorConfig);
+ break;
+ case DOUBLE:
+ statsCollector = new DoubleColumnPreIndexStatsCollector(column,
statsCollectorConfig);
+ break;
+ case STRING:
+ statsCollector = new StringColumnPreIndexStatsCollector(column,
statsCollectorConfig);
+ break;
+ case BYTES:
+ statsCollector = new BytesColumnPredIndexStatsCollector(column,
statsCollectorConfig);
+ break;
+ case BIG_DECIMAL:
+ Preconditions.checkState(isSVColumn, "BigDecimal is not supported
for MV columns");
+ statsCollector = new BigDecimalColumnPreIndexStatsCollector(column,
statsCollectorConfig);
+ break;
+ default:
+ throw new IllegalStateException();
+ }
+
+ Preconditions.checkState(statsCollector != null);
+ // Note: Special Null handling is not necessary here. This is because,
the existing default null value in the
+ // raw forwardIndex will be retained as such while created the
dictionary and dict-based forward index. Also,
+ // null value vectors maintain a bitmap of docIds. No handling is
necessary there.
+ PinotSegmentColumnReader columnReader =
+ new PinotSegmentColumnReader(reader, null, null,
existingColMetadata.getMaxNumberOfMultiValues());
+ for (int i = 0; i < numDocs; i++) {
+ Object obj = columnReader.getValue(i);
+ statsCollector.collect(obj);
+ }
+ statsCollector.seal();
+
+ boolean useVarLength =
_indexLoadingConfig.getVarLengthDictionaryColumns().contains(column)
+ ||
SegmentIndexCreationDriverImpl.shouldUseVarLengthDictionary(reader.getStoredType(),
statsCollector);
+ SegmentDictionaryCreator dictionaryCreator =
+ new SegmentDictionaryCreator(existingColMetadata.getFieldSpec(),
_segmentMetadata.getIndexDir(),
+ useVarLength);
+
+ dictionaryCreator.build(statsCollector.getUniqueValuesSet());
Review Comment:
just curious, can't the data collected by the `statsCollector` be used to
set up the `IndexCreationContext` in the next step? Just wondering if there is
a need to pull any of the other stats to create the forward index. I know we
did some ad-hoc calculations for the RAW forward index scenario, which may have
benefitted from running the stats collector and using these values, right?
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessor.java:
##########
@@ -104,8 +104,14 @@ public void process()
// Update single-column indices, like inverted index, json index etc.
IndexCreatorProvider indexCreatorProvider =
IndexingOverrides.getIndexCreatorProvider();
for (ColumnIndexType type : ColumnIndexType.values()) {
- IndexHandler handler = IndexHandlerFactory.getIndexHandler(type,
_segmentMetadata, _indexLoadingConfig);
+ IndexHandler handler =
+ IndexHandlerFactory.getIndexHandler(type, _segmentMetadata,
_indexLoadingConfig, _schema);
handler.updateIndices(segmentWriter, indexCreatorProvider);
+ if (type == ColumnIndexType.FORWARD_INDEX) {
+ // ForwardIndexHandler may modify the segment metadata.
Review Comment:
Suggest enhancing the comment to mention why you need to do this here and
not outside the index handling loop. The metadata update will be needed to
construct the other indexes such as Range index, right?
##########
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java:
##########
@@ -302,34 +351,54 @@ public void testComputeOperation()
// TEST1 : Validate with zero changes. ForwardIndexHandler should be a
No-Op.
IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null,
_tableConfig);
- ForwardIndexHandler fwdIndexHandler = new
ForwardIndexHandler(existingSegmentMetadata, indexLoadingConfig);
+ ForwardIndexHandler fwdIndexHandler = new
ForwardIndexHandler(existingSegmentMetadata, indexLoadingConfig, null);
Map<String, ForwardIndexHandler.Operation> operationMap = new HashMap<>();
operationMap = fwdIndexHandler.computeOperation(writer);
assertEquals(operationMap, Collections.EMPTY_MAP);
- // TEST2: Enable dictionary for a RAW_ZSTANDARD_INDEX_COLUMN.
ForwardIndexHandler should be a No-Op.
+ // TEST2: Enable dictionary for a RAW_ZSTANDARD_INDEX_COLUMN.
indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
indexLoadingConfig.getNoDictionaryColumns().remove(DIM_ZSTANDARD_STRING);
- fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata,
indexLoadingConfig);
+ fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata,
indexLoadingConfig, _schema);
+ operationMap = fwdIndexHandler.computeOperation(writer);
+ assertEquals(operationMap.get(DIM_ZSTANDARD_STRING),
ForwardIndexHandler.Operation.ENABLE_DICTIONARY);
+
+ // TEST3: Enable dictionary for an MV column.
+ indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
+
indexLoadingConfig.getNoDictionaryColumns().remove(DIM_MV_PASS_THROUGH_STRING);
+ fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata,
indexLoadingConfig, _schema);
+ operationMap = fwdIndexHandler.computeOperation(writer);
+ assertEquals(operationMap.get(DIM_MV_PASS_THROUGH_STRING),
ForwardIndexHandler.Operation.ENABLE_DICTIONARY);
+
+ // TEST4: Enable dictionary for a sorted column.
+ indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
+
indexLoadingConfig.getNoDictionaryColumns().remove(DIM_PASS_THROUGH_SORTED_LONG);
+ fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata,
indexLoadingConfig, _schema);
+ operationMap = fwdIndexHandler.computeOperation(writer);
+ assertEquals(operationMap.get(DIM_PASS_THROUGH_SORTED_LONG),
ForwardIndexHandler.Operation.ENABLE_DICTIONARY);
+
+ // TEST5: Enable dictionary for a dict column. Should be a No-op.
+ indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
+ fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata,
indexLoadingConfig, _schema);
operationMap = fwdIndexHandler.computeOperation(writer);
assertEquals(operationMap, Collections.EMPTY_MAP);
- // TEST3: Disable dictionary. ForwardIndexHandler should be a No-Op.
+ // TEST6: Disable dictionary. Should be a No-op.
indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
indexLoadingConfig.getNoDictionaryColumns().add(DIM_DICT_INTEGER);
- fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata,
indexLoadingConfig);
+ fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata,
indexLoadingConfig, null);
operationMap = fwdIndexHandler.computeOperation(writer);
assertEquals(operationMap, Collections.EMPTY_MAP);
- // TEST4: Add an additional text index. ForwardIndexHandler should be a
No-Op.
+ // TEST7: Add an additional text index. ForwardIndexHandler should be a
No-Op.
indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
indexLoadingConfig.getTextIndexColumns().add(DIM_DICT_INTEGER);
indexLoadingConfig.getTextIndexColumns().add(DIM_LZ4_INTEGER);
- fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata,
indexLoadingConfig);
+ fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata,
indexLoadingConfig, null);
Review Comment:
did you test adding a range index and enabling forward index at the same
time?
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java:
##########
@@ -271,118 +306,293 @@ private void rewriteRawSVForwardIndex(String column,
ColumnMetadata existingColM
.forForwardIndex(newCompressionType,
_indexLoadingConfig.getColumnProperties());
try (ForwardIndexCreator creator =
indexCreatorProvider.newForwardIndexCreator(context)) {
- // If creator stored type and the reader stored type do not match,
throw an exception.
if (!reader.getStoredType().equals(creator.getValueType())) {
+ // Creator stored type should match reader stored type for raw
columns. We do not support changing datatypes.
String failureMsg =
"Unsupported operation to change datatype for column=" + column
+ " from " + reader.getStoredType()
.toString() + " to " + creator.getValueType().toString();
throw new UnsupportedOperationException(failureMsg);
}
int numDocs = existingColMetadata.getTotalDocs();
- forwardIndexWriterHelper(column, reader, creator, numDocs);
+ forwardIndexWriterHelper(column, existingColMetadata, reader, creator,
numDocs, null);
}
}
}
- private void forwardIndexWriterHelper(String column, ForwardIndexReader
reader, ForwardIndexCreator creator,
- int numDocs) {
- // If creator stored type should match reader stored type. We do not
support changing datatypes.
- if (!reader.getStoredType().equals(creator.getValueType())) {
- String failureMsg =
- "Unsupported operation to change datatype for column=" + column + "
from " + reader.getStoredType().toString()
- + " to " + creator.getValueType().toString();
- throw new UnsupportedOperationException(failureMsg);
- }
-
+ private void forwardIndexWriterHelper(String column, ColumnMetadata
existingColumnMetadata, ForwardIndexReader reader,
+ ForwardIndexCreator creator, int numDocs, @Nullable
SegmentDictionaryCreator dictionaryCreator) {
ForwardIndexReaderContext readerContext = reader.createContext();
boolean isSVColumn = reader.isSingleValue();
- switch (reader.getStoredType()) {
- // JSON fields are either stored as string or bytes. No special handling
is needed because we make this
- // decision based on the storedType of the reader.
- case INT: {
- for (int i = 0; i < numDocs; i++) {
- if (isSVColumn) {
- int val = reader.getInt(i, readerContext);
- creator.putInt(val);
- } else {
- int[] ints = reader.getIntMV(i, readerContext);
- creator.putIntMV(ints);
- }
+ if (dictionaryCreator != null) {
+ int maxNumValuesPerEntry =
existingColumnMetadata.getMaxNumberOfMultiValues();
+ PinotSegmentColumnReader columnReader = new
PinotSegmentColumnReader(reader, null, null, maxNumValuesPerEntry);
+
+ for (int i = 0; i < numDocs; i++) {
+ Object obj = columnReader.getValue(i);
+
+ if (isSVColumn) {
+ int dictId = dictionaryCreator.indexOfSV(obj);
+ creator.putDictId(dictId);
+ } else {
+ int[] dictIds = dictionaryCreator.indexOfMV(obj);
+ creator.putDictIdMV(dictIds);
}
- break;
}
- case LONG: {
- for (int i = 0; i < numDocs; i++) {
- if (isSVColumn) {
- long val = reader.getLong(i, readerContext);
- creator.putLong(val);
- } else {
- long[] longs = reader.getLongMV(i, readerContext);
- creator.putLongMV(longs);
+ } else {
+ switch (reader.getStoredType()) {
+ // JSON fields are either stored as string or bytes. No special
handling is needed because we make this
+ // decision based on the storedType of the reader.
+ case INT: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ int val = reader.getInt(i, readerContext);
+ creator.putInt(val);
+ } else {
+ int[] ints = reader.getIntMV(i, readerContext);
+ creator.putIntMV(ints);
+ }
}
+ break;
}
- break;
- }
- case FLOAT: {
- for (int i = 0; i < numDocs; i++) {
- if (isSVColumn) {
- float val = reader.getFloat(i, readerContext);
- creator.putFloat(val);
- } else {
- float[] floats = reader.getFloatMV(i, readerContext);
- creator.putFloatMV(floats);
+ case LONG: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ long val = reader.getLong(i, readerContext);
+ creator.putLong(val);
+ } else {
+ long[] longs = reader.getLongMV(i, readerContext);
+ creator.putLongMV(longs);
+ }
}
+ break;
}
- break;
- }
- case DOUBLE: {
- for (int i = 0; i < numDocs; i++) {
- if (isSVColumn) {
- double val = reader.getDouble(i, readerContext);
- creator.putDouble(val);
- } else {
- double[] doubles = reader.getDoubleMV(i, readerContext);
- creator.putDoubleMV(doubles);
+ case FLOAT: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ float val = reader.getFloat(i, readerContext);
+ creator.putFloat(val);
+ } else {
+ float[] floats = reader.getFloatMV(i, readerContext);
+ creator.putFloatMV(floats);
+ }
}
+ break;
}
- break;
- }
- case STRING: {
- for (int i = 0; i < numDocs; i++) {
- if (isSVColumn) {
- String val = reader.getString(i, readerContext);
- creator.putString(val);
- } else {
- String[] strings = reader.getStringMV(i, readerContext);
- creator.putStringMV(strings);
+ case DOUBLE: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ double val = reader.getDouble(i, readerContext);
+ creator.putDouble(val);
+ } else {
+ double[] doubles = reader.getDoubleMV(i, readerContext);
+ creator.putDoubleMV(doubles);
+ }
}
+ break;
}
- break;
- }
- case BYTES: {
- for (int i = 0; i < numDocs; i++) {
- if (isSVColumn) {
- byte[] val = reader.getBytes(i, readerContext);
- creator.putBytes(val);
- } else {
- byte[][] bytesArray = reader.getBytesMV(i, readerContext);
- creator.putBytesMV(bytesArray);
+ case STRING: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ String val = reader.getString(i, readerContext);
+ creator.putString(val);
+ } else {
+ String[] strings = reader.getStringMV(i, readerContext);
+ creator.putStringMV(strings);
+ }
}
+ break;
}
- break;
- }
- case BIG_DECIMAL: {
- for (int i = 0; i < numDocs; i++) {
+ case BYTES: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ byte[] val = reader.getBytes(i, readerContext);
+ creator.putBytes(val);
+ } else {
+ byte[][] bytesArray = reader.getBytesMV(i, readerContext);
+ creator.putBytesMV(bytesArray);
+ }
+ }
+ break;
+ }
+ case BIG_DECIMAL: {
Preconditions.checkState(isSVColumn, "BigDecimal is not supported
for MV columns");
- BigDecimal val = reader.getBigDecimal(i, readerContext);
- creator.putBigDecimal(val);
+ for (int i = 0; i < numDocs; i++) {
+ BigDecimal val = reader.getBigDecimal(i, readerContext);
+ creator.putBigDecimal(val);
+ }
+ break;
}
- break;
+ default:
+ throw new IllegalStateException();
+ }
+ }
+ }
+
+ private void enableDictionary(String column, SegmentDirectory.Writer
segmentWriter,
+ IndexCreatorProvider indexCreatorProvider)
+ throws Exception {
+ Preconditions.checkState(_segmentMetadata.getVersion() ==
SegmentVersion.v3);
+ ColumnMetadata existingColMetadata =
_segmentMetadata.getColumnMetadataFor(column);
+ Preconditions.checkState(!existingColMetadata.hasDictionary(), "Existing
column already has dictionary.");
+ boolean isSingleValue = existingColMetadata.isSingleValue();
+
+ File indexDir = _segmentMetadata.getIndexDir();
+ String segmentName = _segmentMetadata.getName();
+ File inProgress = new File(indexDir, column + ".dict.inprogress");
+ File dictionaryFile = new File(indexDir, column +
V1Constants.Dict.FILE_EXTENSION);
+ String fwdIndexFileExtension;
+ if (isSingleValue) {
+ if (existingColMetadata.isSorted()) {
+ fwdIndexFileExtension =
V1Constants.Indexes.SORTED_SV_FORWARD_INDEX_FILE_EXTENSION;
+ } else {
+ fwdIndexFileExtension =
V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION;
+ }
+ } else {
+ fwdIndexFileExtension =
V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION;
+ }
+ File fwdIndexFile = new File(indexDir, column + fwdIndexFileExtension);
+
+ if (!inProgress.exists()) {
+ // Marker file does not exist, which means last run ended normally.
+ // Create a marker file.
+ FileUtils.touch(inProgress);
+ } else {
+ // Marker file exists, which means last run was interrupted.
+ // Remove forward index and dictionary files if they exist.
+ FileUtils.deleteQuietly(fwdIndexFile);
+ FileUtils.deleteQuietly(dictionaryFile);
+ }
+
+ LOGGER.info("Creating a new dictionary for segment={} and column={}",
segmentName, column);
+ SegmentDictionaryCreator dictionaryCreator = buildDictionary(column,
existingColMetadata, segmentWriter);
+ LoaderUtils.writeIndexToV3Format(segmentWriter, column, dictionaryFile,
ColumnIndexType.DICTIONARY);
+
+ LOGGER.info("Built dictionary. Rewriting dictionary enabled forward index
for segment={} and column={}",
+ segmentName, column);
+ writeDictEnabledForwardIndex(column, existingColMetadata, segmentWriter,
indexDir, indexCreatorProvider,
+ dictionaryCreator);
+ // We used the existing forward index to generate a new forward index. The
existing forward index will be in V3
+ // format and the new forward index will be in V1 format. Remove the
existing forward index as it is not needed
+ // anymore. Note that removeIndex() will only mark an index for removal
and remove the in-memory state. The
+ // actual cleanup from columns.psf file will happen when
singleFileIndexDirectory.cleanupRemovedIndices() is
+ // called during segmentWriter.close().
+ segmentWriter.removeIndex(column, ColumnIndexType.FORWARD_INDEX);
+ LoaderUtils.writeIndexToV3Format(segmentWriter, column, fwdIndexFile,
ColumnIndexType.FORWARD_INDEX);
+
+ LOGGER.info("Created forwardIndex. Updating metadata properties for
segment={} and column={}", segmentName, column);
+ updateMetadataProperties(indexDir, column, dictionaryCreator);
+
+ // We remove indexes that have to be rewritten when a dictEnabled is
toggled. Note that the respective index
+ // handler will take care of recreating the index.
+ removeDictRelatedIndexes(column, segmentWriter);
+
+ // Delete the marker file.
+ FileUtils.deleteQuietly(inProgress);
+
+ LOGGER.info("Created dictionary based forward index for segment: {},
column: {}", segmentName, column);
+ }
+
+ private SegmentDictionaryCreator buildDictionary(String column,
ColumnMetadata existingColMetadata,
+ SegmentDirectory.Writer segmentWriter)
+ throws Exception {
+ int numDocs = existingColMetadata.getTotalDocs();
+ // SegmentPartitionConfig is not relevant for rewrites.
+ Preconditions.checkState(_indexLoadingConfig.getTableConfig() != null);
+ StatsCollectorConfig statsCollectorConfig =
+ new StatsCollectorConfig(_indexLoadingConfig.getTableConfig(),
_schema, null);
+ AbstractColumnStatisticsCollector statsCollector;
+
+ try (ForwardIndexReader reader =
LoaderUtils.getForwardIndexReader(segmentWriter, existingColMetadata)) {
+ boolean isSVColumn = reader.isSingleValue();
+
+ switch (reader.getStoredType()) {
+ case INT:
+ statsCollector = new IntColumnPreIndexStatsCollector(column,
statsCollectorConfig);
+ break;
+ case LONG:
+ statsCollector = new LongColumnPreIndexStatsCollector(column,
statsCollectorConfig);
+ break;
+ case FLOAT:
+ statsCollector = new FloatColumnPreIndexStatsCollector(column,
statsCollectorConfig);
+ break;
+ case DOUBLE:
+ statsCollector = new DoubleColumnPreIndexStatsCollector(column,
statsCollectorConfig);
+ break;
+ case STRING:
+ statsCollector = new StringColumnPreIndexStatsCollector(column,
statsCollectorConfig);
+ break;
+ case BYTES:
+ statsCollector = new BytesColumnPredIndexStatsCollector(column,
statsCollectorConfig);
+ break;
+ case BIG_DECIMAL:
+ Preconditions.checkState(isSVColumn, "BigDecimal is not supported
for MV columns");
+ statsCollector = new BigDecimalColumnPreIndexStatsCollector(column,
statsCollectorConfig);
+ break;
+ default:
+ throw new IllegalStateException();
+ }
+
+ Preconditions.checkState(statsCollector != null);
+ // Note: Special Null handling is not necessary here. This is because,
the existing default null value in the
+ // raw forwardIndex will be retained as such while created the
dictionary and dict-based forward index. Also,
+ // null value vectors maintain a bitmap of docIds. No handling is
necessary there.
+ PinotSegmentColumnReader columnReader =
+ new PinotSegmentColumnReader(reader, null, null,
existingColMetadata.getMaxNumberOfMultiValues());
+ for (int i = 0; i < numDocs; i++) {
+ Object obj = columnReader.getValue(i);
+ statsCollector.collect(obj);
+ }
+ statsCollector.seal();
+
+ boolean useVarLength =
_indexLoadingConfig.getVarLengthDictionaryColumns().contains(column)
+ ||
SegmentIndexCreationDriverImpl.shouldUseVarLengthDictionary(reader.getStoredType(),
statsCollector);
+ SegmentDictionaryCreator dictionaryCreator =
+ new SegmentDictionaryCreator(existingColMetadata.getFieldSpec(),
_segmentMetadata.getIndexDir(),
+ useVarLength);
+
+ dictionaryCreator.build(statsCollector.getUniqueValuesSet());
+ return dictionaryCreator;
+ }
+ }
+
+ private void writeDictEnabledForwardIndex(String column, ColumnMetadata
existingColMetadata,
+ SegmentDirectory.Writer segmentWriter, File indexDir,
IndexCreatorProvider indexCreatorProvider,
+ SegmentDictionaryCreator dictionaryCreator)
+ throws Exception {
+ try (ForwardIndexReader reader =
LoaderUtils.getForwardIndexReader(segmentWriter, existingColMetadata)) {
+ int lengthOfLongestEntry = reader.getLengthOfLongestEntry();
+ IndexCreationContext.Builder builder =
+
IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(existingColMetadata)
+ .withLengthOfLongestEntry(lengthOfLongestEntry);
+ // existingColMetadata has dictEnable=false. Overwrite the value.
+ builder.withDictionary(true);
+ IndexCreationContext.Forward context =
+ builder.build().forForwardIndex(null,
_indexLoadingConfig.getColumnProperties());
+
+ try (ForwardIndexCreator creator =
indexCreatorProvider.newForwardIndexCreator(context)) {
+ int numDocs = existingColMetadata.getTotalDocs();
+ forwardIndexWriterHelper(column, existingColMetadata, reader, creator,
numDocs, dictionaryCreator);
}
- default:
- throw new IllegalStateException();
}
}
+
+ private void removeDictRelatedIndexes(String column, SegmentDirectory.Writer
segmentWriter) {
+ // TODO: Move this logic as a static function in each index creator.
+ segmentWriter.removeIndex(column, ColumnIndexType.RANGE_INDEX);
+ }
+
+ private void updateMetadataProperties(File indexDir, String column,
SegmentDictionaryCreator dictionaryCreator)
Review Comment:
I recommend making this more generic. I too will have a need to update the
metadata for enabling forward index on a forward index disabled column. I will
want to set up different properties. Perhaps have it take up a set of input
properties and set them all in a loop?
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java:
##########
@@ -244,16 +279,16 @@ private void rewriteRawMVForwardIndex(String column,
ColumnMetadata existingColM
.forForwardIndex(newCompressionType,
_indexLoadingConfig.getColumnProperties());
try (ForwardIndexCreator creator =
indexCreatorProvider.newForwardIndexCreator(context)) {
- // If creator stored type and the reader stored type do not match,
throw an exception.
if (!reader.getStoredType().equals(creator.getValueType())) {
+ // Creator stored type should match reader stored type for raw
columns. We do not support changing datatypes.
String failureMsg =
"Unsupported operation to change datatype for column=" + column
+ " from " + reader.getStoredType()
.toString() + " to " + creator.getValueType().toString();
throw new UnsupportedOperationException(failureMsg);
}
Review Comment:
Curious, since this check is done in both functions before calling
`forwardIndexWriterHelper`, can you move it into `forwardIndexWriterHelper` so
that all callers will get this check?
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java:
##########
@@ -271,118 +306,293 @@ private void rewriteRawSVForwardIndex(String column,
ColumnMetadata existingColM
.forForwardIndex(newCompressionType,
_indexLoadingConfig.getColumnProperties());
try (ForwardIndexCreator creator =
indexCreatorProvider.newForwardIndexCreator(context)) {
- // If creator stored type and the reader stored type do not match,
throw an exception.
if (!reader.getStoredType().equals(creator.getValueType())) {
+ // Creator stored type should match reader stored type for raw
columns. We do not support changing datatypes.
String failureMsg =
"Unsupported operation to change datatype for column=" + column
+ " from " + reader.getStoredType()
.toString() + " to " + creator.getValueType().toString();
throw new UnsupportedOperationException(failureMsg);
}
int numDocs = existingColMetadata.getTotalDocs();
- forwardIndexWriterHelper(column, reader, creator, numDocs);
+ forwardIndexWriterHelper(column, existingColMetadata, reader, creator,
numDocs, null);
}
}
}
- private void forwardIndexWriterHelper(String column, ForwardIndexReader
reader, ForwardIndexCreator creator,
- int numDocs) {
- // If creator stored type should match reader stored type. We do not
support changing datatypes.
- if (!reader.getStoredType().equals(creator.getValueType())) {
- String failureMsg =
- "Unsupported operation to change datatype for column=" + column + "
from " + reader.getStoredType().toString()
- + " to " + creator.getValueType().toString();
- throw new UnsupportedOperationException(failureMsg);
- }
-
+ private void forwardIndexWriterHelper(String column, ColumnMetadata
existingColumnMetadata, ForwardIndexReader reader,
+ ForwardIndexCreator creator, int numDocs, @Nullable
SegmentDictionaryCreator dictionaryCreator) {
ForwardIndexReaderContext readerContext = reader.createContext();
boolean isSVColumn = reader.isSingleValue();
- switch (reader.getStoredType()) {
- // JSON fields are either stored as string or bytes. No special handling
is needed because we make this
- // decision based on the storedType of the reader.
- case INT: {
- for (int i = 0; i < numDocs; i++) {
- if (isSVColumn) {
- int val = reader.getInt(i, readerContext);
- creator.putInt(val);
- } else {
- int[] ints = reader.getIntMV(i, readerContext);
- creator.putIntMV(ints);
- }
+ if (dictionaryCreator != null) {
+ int maxNumValuesPerEntry =
existingColumnMetadata.getMaxNumberOfMultiValues();
+ PinotSegmentColumnReader columnReader = new
PinotSegmentColumnReader(reader, null, null, maxNumValuesPerEntry);
+
+ for (int i = 0; i < numDocs; i++) {
+ Object obj = columnReader.getValue(i);
+
+ if (isSVColumn) {
+ int dictId = dictionaryCreator.indexOfSV(obj);
+ creator.putDictId(dictId);
+ } else {
+ int[] dictIds = dictionaryCreator.indexOfMV(obj);
+ creator.putDictIdMV(dictIds);
}
- break;
}
- case LONG: {
- for (int i = 0; i < numDocs; i++) {
- if (isSVColumn) {
- long val = reader.getLong(i, readerContext);
- creator.putLong(val);
- } else {
- long[] longs = reader.getLongMV(i, readerContext);
- creator.putLongMV(longs);
+ } else {
+ switch (reader.getStoredType()) {
+ // JSON fields are either stored as string or bytes. No special
handling is needed because we make this
+ // decision based on the storedType of the reader.
+ case INT: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ int val = reader.getInt(i, readerContext);
+ creator.putInt(val);
+ } else {
+ int[] ints = reader.getIntMV(i, readerContext);
+ creator.putIntMV(ints);
+ }
}
+ break;
}
- break;
- }
- case FLOAT: {
- for (int i = 0; i < numDocs; i++) {
- if (isSVColumn) {
- float val = reader.getFloat(i, readerContext);
- creator.putFloat(val);
- } else {
- float[] floats = reader.getFloatMV(i, readerContext);
- creator.putFloatMV(floats);
+ case LONG: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ long val = reader.getLong(i, readerContext);
+ creator.putLong(val);
+ } else {
+ long[] longs = reader.getLongMV(i, readerContext);
+ creator.putLongMV(longs);
+ }
}
+ break;
}
- break;
- }
- case DOUBLE: {
- for (int i = 0; i < numDocs; i++) {
- if (isSVColumn) {
- double val = reader.getDouble(i, readerContext);
- creator.putDouble(val);
- } else {
- double[] doubles = reader.getDoubleMV(i, readerContext);
- creator.putDoubleMV(doubles);
+ case FLOAT: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ float val = reader.getFloat(i, readerContext);
+ creator.putFloat(val);
+ } else {
+ float[] floats = reader.getFloatMV(i, readerContext);
+ creator.putFloatMV(floats);
+ }
}
+ break;
}
- break;
- }
- case STRING: {
- for (int i = 0; i < numDocs; i++) {
- if (isSVColumn) {
- String val = reader.getString(i, readerContext);
- creator.putString(val);
- } else {
- String[] strings = reader.getStringMV(i, readerContext);
- creator.putStringMV(strings);
+ case DOUBLE: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ double val = reader.getDouble(i, readerContext);
+ creator.putDouble(val);
+ } else {
+ double[] doubles = reader.getDoubleMV(i, readerContext);
+ creator.putDoubleMV(doubles);
+ }
}
+ break;
}
- break;
- }
- case BYTES: {
- for (int i = 0; i < numDocs; i++) {
- if (isSVColumn) {
- byte[] val = reader.getBytes(i, readerContext);
- creator.putBytes(val);
- } else {
- byte[][] bytesArray = reader.getBytesMV(i, readerContext);
- creator.putBytesMV(bytesArray);
+ case STRING: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ String val = reader.getString(i, readerContext);
+ creator.putString(val);
+ } else {
+ String[] strings = reader.getStringMV(i, readerContext);
+ creator.putStringMV(strings);
+ }
}
+ break;
}
- break;
- }
- case BIG_DECIMAL: {
- for (int i = 0; i < numDocs; i++) {
+ case BYTES: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ byte[] val = reader.getBytes(i, readerContext);
+ creator.putBytes(val);
+ } else {
+ byte[][] bytesArray = reader.getBytesMV(i, readerContext);
+ creator.putBytesMV(bytesArray);
+ }
+ }
+ break;
+ }
+ case BIG_DECIMAL: {
Preconditions.checkState(isSVColumn, "BigDecimal is not supported
for MV columns");
- BigDecimal val = reader.getBigDecimal(i, readerContext);
- creator.putBigDecimal(val);
+ for (int i = 0; i < numDocs; i++) {
+ BigDecimal val = reader.getBigDecimal(i, readerContext);
+ creator.putBigDecimal(val);
+ }
+ break;
}
- break;
+ default:
+ throw new IllegalStateException();
+ }
+ }
+ }
+
+ private void enableDictionary(String column, SegmentDirectory.Writer
segmentWriter,
+ IndexCreatorProvider indexCreatorProvider)
+ throws Exception {
+ Preconditions.checkState(_segmentMetadata.getVersion() ==
SegmentVersion.v3);
+ ColumnMetadata existingColMetadata =
_segmentMetadata.getColumnMetadataFor(column);
+ Preconditions.checkState(!existingColMetadata.hasDictionary(), "Existing
column already has dictionary.");
+ boolean isSingleValue = existingColMetadata.isSingleValue();
+
+ File indexDir = _segmentMetadata.getIndexDir();
+ String segmentName = _segmentMetadata.getName();
+ File inProgress = new File(indexDir, column + ".dict.inprogress");
+ File dictionaryFile = new File(indexDir, column +
V1Constants.Dict.FILE_EXTENSION);
+ String fwdIndexFileExtension;
+ if (isSingleValue) {
+ if (existingColMetadata.isSorted()) {
+ fwdIndexFileExtension =
V1Constants.Indexes.SORTED_SV_FORWARD_INDEX_FILE_EXTENSION;
+ } else {
+ fwdIndexFileExtension =
V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION;
+ }
+ } else {
+ fwdIndexFileExtension =
V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION;
+ }
+ File fwdIndexFile = new File(indexDir, column + fwdIndexFileExtension);
+
+ if (!inProgress.exists()) {
+ // Marker file does not exist, which means last run ended normally.
+ // Create a marker file.
+ FileUtils.touch(inProgress);
+ } else {
+ // Marker file exists, which means last run was interrupted.
+ // Remove forward index and dictionary files if they exist.
+ FileUtils.deleteQuietly(fwdIndexFile);
+ FileUtils.deleteQuietly(dictionaryFile);
+ }
+
+ LOGGER.info("Creating a new dictionary for segment={} and column={}",
segmentName, column);
+ SegmentDictionaryCreator dictionaryCreator = buildDictionary(column,
existingColMetadata, segmentWriter);
+ LoaderUtils.writeIndexToV3Format(segmentWriter, column, dictionaryFile,
ColumnIndexType.DICTIONARY);
+
+ LOGGER.info("Built dictionary. Rewriting dictionary enabled forward index
for segment={} and column={}",
+ segmentName, column);
+ writeDictEnabledForwardIndex(column, existingColMetadata, segmentWriter,
indexDir, indexCreatorProvider,
+ dictionaryCreator);
+ // We used the existing forward index to generate a new forward index. The
existing forward index will be in V3
+ // format and the new forward index will be in V1 format. Remove the
existing forward index as it is not needed
+ // anymore. Note that removeIndex() will only mark an index for removal
and remove the in-memory state. The
+ // actual cleanup from columns.psf file will happen when
singleFileIndexDirectory.cleanupRemovedIndices() is
+ // called during segmentWriter.close().
+ segmentWriter.removeIndex(column, ColumnIndexType.FORWARD_INDEX);
+ LoaderUtils.writeIndexToV3Format(segmentWriter, column, fwdIndexFile,
ColumnIndexType.FORWARD_INDEX);
+
+ LOGGER.info("Created forwardIndex. Updating metadata properties for
segment={} and column={}", segmentName, column);
+ updateMetadataProperties(indexDir, column, dictionaryCreator);
+
+ // We remove indexes that have to be rewritten when a dictEnabled is
toggled. Note that the respective index
+ // handler will take care of recreating the index.
+ removeDictRelatedIndexes(column, segmentWriter);
+
+ // Delete the marker file.
+ FileUtils.deleteQuietly(inProgress);
+
+ LOGGER.info("Created dictionary based forward index for segment: {},
column: {}", segmentName, column);
+ }
+
+ private SegmentDictionaryCreator buildDictionary(String column,
ColumnMetadata existingColMetadata,
+ SegmentDirectory.Writer segmentWriter)
+ throws Exception {
+ int numDocs = existingColMetadata.getTotalDocs();
+ // SegmentPartitionConfig is not relevant for rewrites.
+ Preconditions.checkState(_indexLoadingConfig.getTableConfig() != null);
Review Comment:
should this check be done much earlier in the code flow? is it valid to have
null TableConfig for other scenarios?
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java:
##########
@@ -271,118 +306,293 @@ private void rewriteRawSVForwardIndex(String column,
ColumnMetadata existingColM
.forForwardIndex(newCompressionType,
_indexLoadingConfig.getColumnProperties());
try (ForwardIndexCreator creator =
indexCreatorProvider.newForwardIndexCreator(context)) {
- // If creator stored type and the reader stored type do not match,
throw an exception.
if (!reader.getStoredType().equals(creator.getValueType())) {
+ // Creator stored type should match reader stored type for raw
columns. We do not support changing datatypes.
String failureMsg =
"Unsupported operation to change datatype for column=" + column
+ " from " + reader.getStoredType()
.toString() + " to " + creator.getValueType().toString();
throw new UnsupportedOperationException(failureMsg);
}
int numDocs = existingColMetadata.getTotalDocs();
- forwardIndexWriterHelper(column, reader, creator, numDocs);
+ forwardIndexWriterHelper(column, existingColMetadata, reader, creator,
numDocs, null);
}
}
}
- private void forwardIndexWriterHelper(String column, ForwardIndexReader
reader, ForwardIndexCreator creator,
- int numDocs) {
- // If creator stored type should match reader stored type. We do not
support changing datatypes.
- if (!reader.getStoredType().equals(creator.getValueType())) {
- String failureMsg =
- "Unsupported operation to change datatype for column=" + column + "
from " + reader.getStoredType().toString()
- + " to " + creator.getValueType().toString();
- throw new UnsupportedOperationException(failureMsg);
- }
-
+ private void forwardIndexWriterHelper(String column, ColumnMetadata
existingColumnMetadata, ForwardIndexReader reader,
+ ForwardIndexCreator creator, int numDocs, @Nullable
SegmentDictionaryCreator dictionaryCreator) {
ForwardIndexReaderContext readerContext = reader.createContext();
boolean isSVColumn = reader.isSingleValue();
- switch (reader.getStoredType()) {
- // JSON fields are either stored as string or bytes. No special handling
is needed because we make this
- // decision based on the storedType of the reader.
- case INT: {
- for (int i = 0; i < numDocs; i++) {
- if (isSVColumn) {
- int val = reader.getInt(i, readerContext);
- creator.putInt(val);
- } else {
- int[] ints = reader.getIntMV(i, readerContext);
- creator.putIntMV(ints);
- }
+ if (dictionaryCreator != null) {
+ int maxNumValuesPerEntry =
existingColumnMetadata.getMaxNumberOfMultiValues();
+ PinotSegmentColumnReader columnReader = new
PinotSegmentColumnReader(reader, null, null, maxNumValuesPerEntry);
+
+ for (int i = 0; i < numDocs; i++) {
+ Object obj = columnReader.getValue(i);
+
+ if (isSVColumn) {
+ int dictId = dictionaryCreator.indexOfSV(obj);
+ creator.putDictId(dictId);
+ } else {
+ int[] dictIds = dictionaryCreator.indexOfMV(obj);
+ creator.putDictIdMV(dictIds);
}
- break;
}
- case LONG: {
- for (int i = 0; i < numDocs; i++) {
- if (isSVColumn) {
- long val = reader.getLong(i, readerContext);
- creator.putLong(val);
- } else {
- long[] longs = reader.getLongMV(i, readerContext);
- creator.putLongMV(longs);
+ } else {
+ switch (reader.getStoredType()) {
+ // JSON fields are either stored as string or bytes. No special
handling is needed because we make this
+ // decision based on the storedType of the reader.
+ case INT: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ int val = reader.getInt(i, readerContext);
+ creator.putInt(val);
+ } else {
+ int[] ints = reader.getIntMV(i, readerContext);
+ creator.putIntMV(ints);
+ }
}
+ break;
}
- break;
- }
- case FLOAT: {
- for (int i = 0; i < numDocs; i++) {
- if (isSVColumn) {
- float val = reader.getFloat(i, readerContext);
- creator.putFloat(val);
- } else {
- float[] floats = reader.getFloatMV(i, readerContext);
- creator.putFloatMV(floats);
+ case LONG: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ long val = reader.getLong(i, readerContext);
+ creator.putLong(val);
+ } else {
+ long[] longs = reader.getLongMV(i, readerContext);
+ creator.putLongMV(longs);
+ }
}
+ break;
}
- break;
- }
- case DOUBLE: {
- for (int i = 0; i < numDocs; i++) {
- if (isSVColumn) {
- double val = reader.getDouble(i, readerContext);
- creator.putDouble(val);
- } else {
- double[] doubles = reader.getDoubleMV(i, readerContext);
- creator.putDoubleMV(doubles);
+ case FLOAT: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ float val = reader.getFloat(i, readerContext);
+ creator.putFloat(val);
+ } else {
+ float[] floats = reader.getFloatMV(i, readerContext);
+ creator.putFloatMV(floats);
+ }
}
+ break;
}
- break;
- }
- case STRING: {
- for (int i = 0; i < numDocs; i++) {
- if (isSVColumn) {
- String val = reader.getString(i, readerContext);
- creator.putString(val);
- } else {
- String[] strings = reader.getStringMV(i, readerContext);
- creator.putStringMV(strings);
+ case DOUBLE: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ double val = reader.getDouble(i, readerContext);
+ creator.putDouble(val);
+ } else {
+ double[] doubles = reader.getDoubleMV(i, readerContext);
+ creator.putDoubleMV(doubles);
+ }
}
+ break;
}
- break;
- }
- case BYTES: {
- for (int i = 0; i < numDocs; i++) {
- if (isSVColumn) {
- byte[] val = reader.getBytes(i, readerContext);
- creator.putBytes(val);
- } else {
- byte[][] bytesArray = reader.getBytesMV(i, readerContext);
- creator.putBytesMV(bytesArray);
+ case STRING: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ String val = reader.getString(i, readerContext);
+ creator.putString(val);
+ } else {
+ String[] strings = reader.getStringMV(i, readerContext);
+ creator.putStringMV(strings);
+ }
}
+ break;
}
- break;
- }
- case BIG_DECIMAL: {
- for (int i = 0; i < numDocs; i++) {
+ case BYTES: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ byte[] val = reader.getBytes(i, readerContext);
+ creator.putBytes(val);
+ } else {
+ byte[][] bytesArray = reader.getBytesMV(i, readerContext);
+ creator.putBytesMV(bytesArray);
+ }
+ }
+ break;
+ }
+ case BIG_DECIMAL: {
Preconditions.checkState(isSVColumn, "BigDecimal is not supported
for MV columns");
- BigDecimal val = reader.getBigDecimal(i, readerContext);
- creator.putBigDecimal(val);
+ for (int i = 0; i < numDocs; i++) {
+ BigDecimal val = reader.getBigDecimal(i, readerContext);
+ creator.putBigDecimal(val);
+ }
+ break;
}
- break;
+ default:
+ throw new IllegalStateException();
+ }
+ }
+ }
+
+ private void enableDictionary(String column, SegmentDirectory.Writer
segmentWriter,
+ IndexCreatorProvider indexCreatorProvider)
+ throws Exception {
+ Preconditions.checkState(_segmentMetadata.getVersion() ==
SegmentVersion.v3);
+ ColumnMetadata existingColMetadata =
_segmentMetadata.getColumnMetadataFor(column);
+ Preconditions.checkState(!existingColMetadata.hasDictionary(), "Existing
column already has dictionary.");
+ boolean isSingleValue = existingColMetadata.isSingleValue();
+
+ File indexDir = _segmentMetadata.getIndexDir();
+ String segmentName = _segmentMetadata.getName();
+ File inProgress = new File(indexDir, column + ".dict.inprogress");
+ File dictionaryFile = new File(indexDir, column +
V1Constants.Dict.FILE_EXTENSION);
+ String fwdIndexFileExtension;
+ if (isSingleValue) {
+ if (existingColMetadata.isSorted()) {
+ fwdIndexFileExtension =
V1Constants.Indexes.SORTED_SV_FORWARD_INDEX_FILE_EXTENSION;
+ } else {
+ fwdIndexFileExtension =
V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION;
+ }
+ } else {
+ fwdIndexFileExtension =
V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION;
+ }
+ File fwdIndexFile = new File(indexDir, column + fwdIndexFileExtension);
+
+ if (!inProgress.exists()) {
+ // Marker file does not exist, which means last run ended normally.
+ // Create a marker file.
+ FileUtils.touch(inProgress);
+ } else {
+ // Marker file exists, which means last run was interrupted.
+ // Remove forward index and dictionary files if they exist.
+ FileUtils.deleteQuietly(fwdIndexFile);
+ FileUtils.deleteQuietly(dictionaryFile);
+ }
+
+ LOGGER.info("Creating a new dictionary for segment={} and column={}",
segmentName, column);
+ SegmentDictionaryCreator dictionaryCreator = buildDictionary(column,
existingColMetadata, segmentWriter);
+ LoaderUtils.writeIndexToV3Format(segmentWriter, column, dictionaryFile,
ColumnIndexType.DICTIONARY);
+
+ LOGGER.info("Built dictionary. Rewriting dictionary enabled forward index
for segment={} and column={}",
+ segmentName, column);
+ writeDictEnabledForwardIndex(column, existingColMetadata, segmentWriter,
indexDir, indexCreatorProvider,
+ dictionaryCreator);
+ // We used the existing forward index to generate a new forward index. The
existing forward index will be in V3
+ // format and the new forward index will be in V1 format. Remove the
existing forward index as it is not needed
+ // anymore. Note that removeIndex() will only mark an index for removal
and remove the in-memory state. The
+ // actual cleanup from columns.psf file will happen when
singleFileIndexDirectory.cleanupRemovedIndices() is
+ // called during segmentWriter.close().
+ segmentWriter.removeIndex(column, ColumnIndexType.FORWARD_INDEX);
+ LoaderUtils.writeIndexToV3Format(segmentWriter, column, fwdIndexFile,
ColumnIndexType.FORWARD_INDEX);
+
+ LOGGER.info("Created forwardIndex. Updating metadata properties for
segment={} and column={}", segmentName, column);
+ updateMetadataProperties(indexDir, column, dictionaryCreator);
+
+ // We remove indexes that have to be rewritten when a dictEnabled is
toggled. Note that the respective index
+ // handler will take care of recreating the index.
+ removeDictRelatedIndexes(column, segmentWriter);
+
+ // Delete the marker file.
+ FileUtils.deleteQuietly(inProgress);
+
+ LOGGER.info("Created dictionary based forward index for segment: {},
column: {}", segmentName, column);
+ }
+
+ private SegmentDictionaryCreator buildDictionary(String column,
ColumnMetadata existingColMetadata,
+ SegmentDirectory.Writer segmentWriter)
+ throws Exception {
+ int numDocs = existingColMetadata.getTotalDocs();
+ // SegmentPartitionConfig is not relevant for rewrites.
+ Preconditions.checkState(_indexLoadingConfig.getTableConfig() != null);
+ StatsCollectorConfig statsCollectorConfig =
+ new StatsCollectorConfig(_indexLoadingConfig.getTableConfig(),
_schema, null);
+ AbstractColumnStatisticsCollector statsCollector;
+
+ try (ForwardIndexReader reader =
LoaderUtils.getForwardIndexReader(segmentWriter, existingColMetadata)) {
+ boolean isSVColumn = reader.isSingleValue();
+
+ switch (reader.getStoredType()) {
+ case INT:
+ statsCollector = new IntColumnPreIndexStatsCollector(column,
statsCollectorConfig);
+ break;
+ case LONG:
+ statsCollector = new LongColumnPreIndexStatsCollector(column,
statsCollectorConfig);
+ break;
+ case FLOAT:
+ statsCollector = new FloatColumnPreIndexStatsCollector(column,
statsCollectorConfig);
+ break;
+ case DOUBLE:
+ statsCollector = new DoubleColumnPreIndexStatsCollector(column,
statsCollectorConfig);
+ break;
+ case STRING:
+ statsCollector = new StringColumnPreIndexStatsCollector(column,
statsCollectorConfig);
+ break;
+ case BYTES:
+ statsCollector = new BytesColumnPredIndexStatsCollector(column,
statsCollectorConfig);
+ break;
+ case BIG_DECIMAL:
+ Preconditions.checkState(isSVColumn, "BigDecimal is not supported
for MV columns");
+ statsCollector = new BigDecimalColumnPreIndexStatsCollector(column,
statsCollectorConfig);
+ break;
+ default:
+ throw new IllegalStateException();
+ }
+
+ Preconditions.checkState(statsCollector != null);
+ // Note: Special Null handling is not necessary here. This is because,
the existing default null value in the
+ // raw forwardIndex will be retained as such while created the
dictionary and dict-based forward index. Also,
+ // null value vectors maintain a bitmap of docIds. No handling is
necessary there.
+ PinotSegmentColumnReader columnReader =
+ new PinotSegmentColumnReader(reader, null, null,
existingColMetadata.getMaxNumberOfMultiValues());
+ for (int i = 0; i < numDocs; i++) {
+ Object obj = columnReader.getValue(i);
+ statsCollector.collect(obj);
+ }
+ statsCollector.seal();
+
+ boolean useVarLength =
_indexLoadingConfig.getVarLengthDictionaryColumns().contains(column)
Review Comment:
as mentioned earlier better to move the first part of the OR check into the
function. Have the function take the `varLengthDictionaryColumns` as input.
This will help ensure that all callers will get both checks and not
accidentally miss out on the first.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]