This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new ae93ce74e7 Support rewriting forward index upon changing compression
type for existing raw MV column (#9510)
ae93ce74e7 is described below
commit ae93ce74e7dbff2efba0ed0c39eb458495e67e1a
Author: Vivek Iyer Vaidyanathan <[email protected]>
AuthorDate: Mon Oct 17 16:26:20 2022 -0700
Support rewriting forward index upon changing compression type for existing
raw MV column (#9510)
* Make StreamMessage generic
* Allow changing compressionType for MV columns during segmentReload
* Address review comments
* Refactor and cleanup code
* Optimize for loop in ForwardIndexHandler
---
.../impl/fwd/MultiValueVarByteRawIndexCreator.java | 57 ++++-
.../segment/index/loader/ForwardIndexHandler.java | 233 +++++++++++++++------
.../segment/readers/PinotSegmentColumnReader.java | 111 ++++++++--
.../index/loader/ForwardIndexHandlerTest.java | 138 ++++++++++--
.../index/loader/SegmentPreProcessorTest.java | 123 ++++++-----
.../segment/spi/creator/IndexCreationContext.java | 3 +-
6 files changed, 515 insertions(+), 150 deletions(-)
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java
index 471b4bd315..84d471d50e 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java
@@ -75,10 +75,8 @@ public class MultiValueVarByteRawIndexCreator implements
ForwardIndexCreator {
int totalDocs, DataType valueType, int writerVersion, int
maxRowLengthInBytes, int maxNumberOfElements)
throws IOException {
//we will prepend the actual content with numElements and length array
containing length of each element
- int maxLengthPrefixes = Integer.BYTES * maxNumberOfElements;
- int totalMaxLength = Integer.BYTES + maxRowLengthInBytes +
maxLengthPrefixes;
- Preconditions.checkArgument((maxLengthPrefixes | maxRowLengthInBytes |
totalMaxLength | maxNumberOfElements) > 0,
- "integer overflow detected");
+ int totalMaxLength = getTotalRowStorageBytes(maxNumberOfElements,
maxRowLengthInBytes);
+
File file = new File(baseIndexDir,
column + Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION);
int numDocsPerChunk = Math.max(
@@ -119,4 +117,55 @@ public class MultiValueVarByteRawIndexCreator implements
ForwardIndexCreator {
throws IOException {
_indexWriter.close();
}
+
+ /**
+ * The actual content in an MV array is prepended with 2 prefixes:
+ * 1. elementLengthStoragePrefixInBytes - bytes required to store the length
of each element in the largest array
+ * 2. numElementsStoragePrefixInBytes - Number of elements in the array
+ *
+ * This function returns the total bytes needed to store (1)
elementLengthStoragePrefixInBytes
+ */
+ public static int getElementLengthStoragePrefixInBytes(int
maxNumberOfElements) {
+ return Integer.BYTES * maxNumberOfElements;
+ }
+
+ /**
+ * The actual content in an MV array is prepended with 2 prefixes:
+ * 1. elementLengthStoragePrefixInBytes - bytes required to store the length
of each element in the largest array
+ * 2. numElementsStoragePrefixInBytes - Number of elements in the array
+ *
+ * This function returns the bytes needed to store (2)
numElementsStoragePrefixInBytes
+ */
+ public static int getNumElementsStoragePrefix() {
+ return Integer.BYTES;
+ }
+
+ /**
+ * The actual content in an MV array is prepended with 2 prefixes:
+ * 1. elementLengthStoragePrefixInBytes - bytes required to store the length
of each element in the largest array
+ * 2. numElementsStoragePrefixInBytes - Number of elements in the array
+ *
+ * This function returns the bytes needed to store the (1), (2) and the
actual content.
+ */
+ public static int getTotalRowStorageBytes(int maxNumberOfElements, int
maxRowDataLengthInBytes) {
+ int elementLengthStoragePrefixInBytes =
getElementLengthStoragePrefixInBytes(maxNumberOfElements);
+ int numElementsStoragePrefixInBytes = getNumElementsStoragePrefix();
+ int totalMaxLength = elementLengthStoragePrefixInBytes +
numElementsStoragePrefixInBytes + maxRowDataLengthInBytes;
+ Preconditions.checkArgument(
+ (elementLengthStoragePrefixInBytes | maxRowDataLengthInBytes |
totalMaxLength | maxNumberOfElements) > 0,
+ "integer overflow detected");
+
+ return totalMaxLength;
+ }
+
+ /**
+ * The actual content in an MV array is prepended with 2 prefixes:
+ * 1. elementLengthStoragePrefixInBytes - bytes required to store the length
of each element in the largest array
+ * 2. numberOfElementsStoragePrefix - Number of elements in the array
+ *
+ * This function returns the bytes needed to store the actual content.
+ */
+ public static int getMaxRowDataLengthInBytes(int totalMaxLength, int
maxNumberOfElements) {
+ return totalMaxLength - getNumElementsStoragePrefix() -
getElementLengthStoragePrefixInBytes(maxNumberOfElements);
+ }
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
index d9c317e2e6..67ba9ff6e6 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
@@ -27,7 +27,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.commons.io.FileUtils;
-import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
+import
org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueVarByteRawIndexCreator;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.V1Constants;
@@ -37,6 +37,7 @@ import
org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
import org.apache.pinot.segment.spi.store.ColumnIndexType;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
import org.slf4j.Logger;
@@ -48,12 +49,12 @@ import org.slf4j.LoggerFactory;
* that this handler only works for segment versions >= 3.0. Support for
segment version < 3.0 is not added because
* majority of the usecases are in versions >= 3.0 and this avoids adding tech
debt. The currently supported
* operations are:
- * 1. Change compression on raw SV columns.
+ * 1. Change compression on raw SV and MV columns.
*
* TODO: Add support for the following:
- * 1. Change compression for raw MV columns
- * 2. Enable dictionary
- * 3. Disable dictionary
+ * 1. Enable dictionary
+ * 2. Disable dictionary
+ * 3. Segment versions < V3
*/
public class ForwardIndexHandler implements IndexHandler {
private static final Logger LOGGER =
LoggerFactory.getLogger(ForwardIndexHandler.class);
@@ -106,7 +107,7 @@ public class ForwardIndexHandler implements IndexHandler {
throws Exception {
Map<String, Operation> columnOperationMap = new HashMap<>();
- // Does not work for segment versions < V3
+ // Does not work for segment versions < V3.
if (_segmentMetadata.getVersion().compareTo(SegmentVersion.v3) < 0) {
return columnOperationMap;
}
@@ -140,11 +141,6 @@ public class ForwardIndexHandler implements IndexHandler {
private boolean shouldChangeCompressionType(String column,
SegmentDirectory.Reader segmentReader) throws Exception {
ColumnMetadata existingColMetadata =
_segmentMetadata.getColumnMetadataFor(column);
- // TODO: Remove this MV column limitation.
- if (!existingColMetadata.isSingleValue()) {
- return false;
- }
-
// The compression type for an existing segment can only be determined by
reading the forward index header.
try (ForwardIndexReader fwdIndexReader =
LoaderUtils.getForwardIndexReader(segmentReader, existingColMetadata)) {
ChunkCompressionType existingCompressionType =
fwdIndexReader.getCompressionType();
@@ -173,12 +169,15 @@ public class ForwardIndexHandler implements IndexHandler {
IndexCreatorProvider indexCreatorProvider)
throws Exception {
Preconditions.checkState(_segmentMetadata.getVersion() ==
SegmentVersion.v3);
-
ColumnMetadata existingColMetadata =
_segmentMetadata.getColumnMetadataFor(column);
+ boolean isSingleValue = existingColMetadata.isSingleValue();
+
File indexDir = _segmentMetadata.getIndexDir();
String segmentName = _segmentMetadata.getName();
File inProgress = new File(indexDir, column + ".fwd.inprogress");
- File fwdIndexFile = new File(indexDir, column +
V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
+ String fileExtension = isSingleValue ?
V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION
+ : V1Constants.Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION;
+ File fwdIndexFile = new File(indexDir, column + fileExtension);
if (!inProgress.exists()) {
// Marker file does not exist, which means last run ended normally.
@@ -199,16 +198,49 @@ public class ForwardIndexHandler implements IndexHandler {
// are processed only if a valid compressionType is specified in
fieldConfig.
ChunkCompressionType newCompressionType = compressionConfigs.get(column);
- int numDocs = existingColMetadata.getTotalDocs();
+ if (isSingleValue) {
+ rewriteRawSVForwardIndex(column, existingColMetadata, indexDir,
segmentWriter, indexCreatorProvider,
+ newCompressionType);
+ } else {
+ rewriteRawMVForwardIndex(column, existingColMetadata, indexDir,
segmentWriter, indexCreatorProvider,
+ newCompressionType);
+ }
+ // We used the existing forward index to generate a new forward index. The
existing forward index will be in V3
+ // format and the new forward index will be in V1 format. Remove the
existing forward index as it is not needed
+ // anymore. Note that removeIndex() will only mark an index for removal
and remove the in-memory state. The
+ // actual cleanup from columns.psf file will happen when
singleFileIndexDirectory.cleanupRemovedIndices() is
+ // called during segmentWriter.close().
+ segmentWriter.removeIndex(column, ColumnIndexType.FORWARD_INDEX);
+ LoaderUtils.writeIndexToV3Format(segmentWriter, column, fwdIndexFile,
ColumnIndexType.FORWARD_INDEX);
+
+ // Delete the marker file.
+ FileUtils.deleteQuietly(inProgress);
+
+ LOGGER.info("Created forward index for segment: {}, column: {}",
segmentName, column);
+ }
+
+ private void rewriteRawMVForwardIndex(String column, ColumnMetadata
existingColMetadata, File indexDir,
+ SegmentDirectory.Writer segmentWriter, IndexCreatorProvider
indexCreatorProvider,
+ ChunkCompressionType newCompressionType)
+ throws Exception {
try (ForwardIndexReader reader =
LoaderUtils.getForwardIndexReader(segmentWriter, existingColMetadata)) {
+ // For VarByte MV columns like String and Bytes, the storage
representation of each row contains the following
+ // components:
+ // 1. bytes required to store the actual elements of the MV row (A)
+ // 2. bytes required to store the number of elements in the MV row (B)
+ // 3. bytes required to store the length of each MV element (C)
+ //
+ // lengthOfLongestEntry = A + B + C
+ // maxRowLengthInBytes = A
int lengthOfLongestEntry = reader.getLengthOfLongestEntry();
- Preconditions.checkState(lengthOfLongestEntry >= 0,
- "lengthOfLongestEntry cannot be negative. segment=" + segmentName +
" column={}" + column);
+ int maxNumberOfMVEntries =
existingColMetadata.getMaxNumberOfMultiValues();
+ int maxRowLengthInBytes =
+
MultiValueVarByteRawIndexCreator.getMaxRowDataLengthInBytes(lengthOfLongestEntry,
maxNumberOfMVEntries);
IndexCreationContext.Forward context =
IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(existingColMetadata)
- .withLengthOfLongestEntry(lengthOfLongestEntry).build()
+
.withLengthOfLongestEntry(lengthOfLongestEntry).withMaxRowLengthInBytes(maxRowLengthInBytes).build()
.forForwardIndex(newCompressionType,
_indexLoadingConfig.getColumnProperties());
try (ForwardIndexCreator creator =
indexCreatorProvider.newForwardIndexCreator(context)) {
@@ -220,54 +252,137 @@ public class ForwardIndexHandler implements IndexHandler
{
throw new UnsupportedOperationException(failureMsg);
}
- PinotSegmentColumnReader columnReader =
- new PinotSegmentColumnReader(reader, null, null,
existingColMetadata.getMaxNumberOfMultiValues());
+ int numDocs = existingColMetadata.getTotalDocs();
+ forwardIndexWriterHelper(column, reader, creator, numDocs);
+ }
+ }
+ }
- for (int i = 0; i < numDocs; i++) {
- Object val = columnReader.getValue(i);
-
- // JSON fields are either stored as string or bytes. No special
handling is needed because we make this
- // decision based on the storedType of the reader.
- switch (reader.getStoredType()) {
- case INT:
- creator.putInt((int) val);
- break;
- case LONG:
- creator.putLong((long) val);
- break;
- case FLOAT:
- creator.putFloat((float) val);
- break;
- case DOUBLE:
- creator.putDouble((double) val);
- break;
- case STRING:
- creator.putString((String) val);
- break;
- case BYTES:
- creator.putBytes((byte[]) val);
- break;
- case BIG_DECIMAL:
- creator.putBigDecimal((BigDecimal) val);
- break;
- default:
- throw new IllegalStateException();
- }
+ private void rewriteRawSVForwardIndex(String column, ColumnMetadata
existingColMetadata, File indexDir,
+ SegmentDirectory.Writer segmentWriter, IndexCreatorProvider
indexCreatorProvider,
+ ChunkCompressionType newCompressionType)
+ throws Exception {
+ try (ForwardIndexReader reader =
LoaderUtils.getForwardIndexReader(segmentWriter, existingColMetadata)) {
+ int lengthOfLongestEntry = reader.getLengthOfLongestEntry();
+
+ IndexCreationContext.Forward context =
+
IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(existingColMetadata)
+ .withLengthOfLongestEntry(lengthOfLongestEntry).build()
+ .forForwardIndex(newCompressionType,
_indexLoadingConfig.getColumnProperties());
+
+ try (ForwardIndexCreator creator =
indexCreatorProvider.newForwardIndexCreator(context)) {
+ // If creator stored type and the reader stored type do not match,
throw an exception.
+ if (!reader.getStoredType().equals(creator.getValueType())) {
+ String failureMsg =
+ "Unsupported operation to change datatype for column=" + column
+ " from " + reader.getStoredType()
+ .toString() + " to " + creator.getValueType().toString();
+ throw new UnsupportedOperationException(failureMsg);
}
+
+ int numDocs = existingColMetadata.getTotalDocs();
+ forwardIndexWriterHelper(column, reader, creator, numDocs);
}
}
+ }
- // We used the existing forward index to generate a new forward index. The
existing forward index will be in V3
- // format and the new forward index will be in V1 format. Remove the
existing forward index as it is not needed
- // anymore. Note that removeIndex() will only mark an index for removal
and remove the in-memory state. The
- // actual cleanup from columns.psf file will happen when
singleFileIndexDirectory.cleanupRemovedIndices() is
- // called during segmentWriter.close().
- segmentWriter.removeIndex(column, ColumnIndexType.FORWARD_INDEX);
- LoaderUtils.writeIndexToV3Format(segmentWriter, column, fwdIndexFile,
ColumnIndexType.FORWARD_INDEX);
+ private void forwardIndexWriterHelper(String column, ForwardIndexReader
reader, ForwardIndexCreator creator,
+ int numDocs) {
+ // If creator stored type should match reader stored type. We do not
support changing datatypes.
+ if (!reader.getStoredType().equals(creator.getValueType())) {
+ String failureMsg =
+ "Unsupported operation to change datatype for column=" + column + "
from " + reader.getStoredType().toString()
+ + " to " + creator.getValueType().toString();
+ throw new UnsupportedOperationException(failureMsg);
+ }
- // Delete the marker file.
- FileUtils.deleteQuietly(inProgress);
+ ForwardIndexReaderContext readerContext = reader.createContext();
+ boolean isSVColumn = reader.isSingleValue();
- LOGGER.info("Created forward index for segment: {}, column: {}",
segmentName, column);
+ switch (reader.getStoredType()) {
+ // JSON fields are either stored as string or bytes. No special handling
is needed because we make this
+ // decision based on the storedType of the reader.
+ case INT: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ int val = reader.getInt(i, readerContext);
+ creator.putInt(val);
+ } else {
+ int[] ints = reader.getIntMV(i, readerContext);
+ creator.putIntMV(ints);
+ }
+ }
+ break;
+ }
+ case LONG: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ long val = reader.getLong(i, readerContext);
+ creator.putLong(val);
+ } else {
+ long[] longs = reader.getLongMV(i, readerContext);
+ creator.putLongMV(longs);
+ }
+ }
+ break;
+ }
+ case FLOAT: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ float val = reader.getFloat(i, readerContext);
+ creator.putFloat(val);
+ } else {
+ float[] floats = reader.getFloatMV(i, readerContext);
+ creator.putFloatMV(floats);
+ }
+ }
+ break;
+ }
+ case DOUBLE: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ double val = reader.getDouble(i, readerContext);
+ creator.putDouble(val);
+ } else {
+ double[] doubles = reader.getDoubleMV(i, readerContext);
+ creator.putDoubleMV(doubles);
+ }
+ }
+ break;
+ }
+ case STRING: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ String val = reader.getString(i, readerContext);
+ creator.putString(val);
+ } else {
+ String[] strings = reader.getStringMV(i, readerContext);
+ creator.putStringMV(strings);
+ }
+ }
+ break;
+ }
+ case BYTES: {
+ for (int i = 0; i < numDocs; i++) {
+ if (isSVColumn) {
+ byte[] val = reader.getBytes(i, readerContext);
+ creator.putBytes(val);
+ } else {
+ byte[][] bytesArray = reader.getBytesMV(i, readerContext);
+ creator.putBytesMV(bytesArray);
+ }
+ }
+ break;
+ }
+ case BIG_DECIMAL: {
+ for (int i = 0; i < numDocs; i++) {
+ Preconditions.checkState(isSVColumn, "BigDecimal is not supported
for MV columns");
+ BigDecimal val = reader.getBigDecimal(i, readerContext);
+ creator.putBigDecimal(val);
+ }
+ break;
+ }
+ default:
+ throw new IllegalStateException();
+ }
}
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java
index a510bfd128..8b9ba6bb6f 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java
@@ -37,6 +37,7 @@ public class PinotSegmentColumnReader implements Closeable {
private final Dictionary _dictionary;
private final NullValueVectorReader _nullValueVectorReader;
private final int[] _dictIdBuffer;
+ private final int _maxNumValuesPerMVEntry;
public PinotSegmentColumnReader(IndexSegment indexSegment, String column) {
DataSource dataSource = indexSegment.getDataSource(column);
@@ -48,8 +49,11 @@ public class PinotSegmentColumnReader implements Closeable {
_nullValueVectorReader = dataSource.getNullValueVector();
if (_forwardIndexReader.isSingleValue()) {
_dictIdBuffer = null;
+ _maxNumValuesPerMVEntry = -1;
} else {
- _dictIdBuffer = new
int[dataSource.getDataSourceMetadata().getMaxNumValuesPerMVEntry()];
+ _maxNumValuesPerMVEntry =
dataSource.getDataSourceMetadata().getMaxNumValuesPerMVEntry();
+ Preconditions.checkState(_maxNumValuesPerMVEntry >= 0,
"maxNumValuesPerMVEntry is negative for an MV column.");
+ _dictIdBuffer = new int[_maxNumValuesPerMVEntry];
}
}
@@ -59,6 +63,7 @@ public class PinotSegmentColumnReader implements Closeable {
_forwardIndexReaderContext = _forwardIndexReader.createContext();
_dictionary = dictionary;
_nullValueVectorReader = nullValueVectorReader;
+ _maxNumValuesPerMVEntry = maxNumValuesPerMVEntry;
if (_forwardIndexReader.isSingleValue()) {
_dictIdBuffer = null;
} else {
@@ -95,26 +100,90 @@ public class PinotSegmentColumnReader implements Closeable
{
return values;
}
} else {
- // NOTE: Only support single-value raw index
- assert _forwardIndexReader.isSingleValue();
-
- switch (_forwardIndexReader.getStoredType()) {
- case INT:
- return _forwardIndexReader.getInt(docId, _forwardIndexReaderContext);
- case LONG:
- return _forwardIndexReader.getLong(docId,
_forwardIndexReaderContext);
- case FLOAT:
- return _forwardIndexReader.getFloat(docId,
_forwardIndexReaderContext);
- case DOUBLE:
- return _forwardIndexReader.getDouble(docId,
_forwardIndexReaderContext);
- case BIG_DECIMAL:
- return _forwardIndexReader.getBigDecimal(docId,
_forwardIndexReaderContext);
- case STRING:
- return _forwardIndexReader.getString(docId,
_forwardIndexReaderContext);
- case BYTES:
- return _forwardIndexReader.getBytes(docId,
_forwardIndexReaderContext);
- default:
- throw new IllegalStateException();
+ if (_forwardIndexReader.isSingleValue()) {
+ switch (_forwardIndexReader.getStoredType()) {
+ case INT:
+ return _forwardIndexReader.getInt(docId,
_forwardIndexReaderContext);
+ case LONG:
+ return _forwardIndexReader.getLong(docId,
_forwardIndexReaderContext);
+ case FLOAT:
+ return _forwardIndexReader.getFloat(docId,
_forwardIndexReaderContext);
+ case DOUBLE:
+ return _forwardIndexReader.getDouble(docId,
_forwardIndexReaderContext);
+ case BIG_DECIMAL:
+ return _forwardIndexReader.getBigDecimal(docId,
_forwardIndexReaderContext);
+ case STRING:
+ return _forwardIndexReader.getString(docId,
_forwardIndexReaderContext);
+ case BYTES:
+ return _forwardIndexReader.getBytes(docId,
_forwardIndexReaderContext);
+ default:
+ throw new IllegalStateException();
+ }
+ } else {
+ switch (_forwardIndexReader.getStoredType()) {
+ case INT: {
+ int[] buffer = new int[_maxNumValuesPerMVEntry];
+ int length = _forwardIndexReader.getIntMV(docId, buffer,
_forwardIndexReaderContext);
+ Object[] values = new Object[length];
+ for (int i = 0; i < length; i++) {
+ values[i] = buffer[i];
+ }
+
+ return values;
+ }
+ case LONG: {
+ long[] buffer = new long[_maxNumValuesPerMVEntry];
+ int length = _forwardIndexReader.getLongMV(docId, buffer,
_forwardIndexReaderContext);
+ Object[] values = new Object[length];
+ for (int i = 0; i < length; i++) {
+ values[i] = buffer[i];
+ }
+
+ return values;
+ }
+ case FLOAT: {
+ float[] buffer = new float[_maxNumValuesPerMVEntry];
+ int length = _forwardIndexReader.getFloatMV(docId, buffer,
_forwardIndexReaderContext);
+ Object[] values = new Object[length];
+ for (int i = 0; i < length; i++) {
+ values[i] = buffer[i];
+ }
+
+ return values;
+ }
+ case DOUBLE: {
+ double[] buffer = new double[_maxNumValuesPerMVEntry];
+ int length = _forwardIndexReader.getDoubleMV(docId, buffer,
_forwardIndexReaderContext);
+ Object[] values = new Object[length];
+ for (int i = 0; i < length; i++) {
+ values[i] = buffer[i];
+ }
+
+ return values;
+ }
+ case STRING: {
+ String[] buffer = new String[_maxNumValuesPerMVEntry];
+ int length = _forwardIndexReader.getStringMV(docId, buffer,
_forwardIndexReaderContext);
+ Object[] values = new Object[length];
+ for (int i = 0; i < length; i++) {
+ values[i] = buffer[i];
+ }
+
+ return values;
+ }
+ case BYTES: {
+ byte[][] buffer = new byte[_maxNumValuesPerMVEntry][];
+ int length = _forwardIndexReader.getBytesMV(docId, buffer,
_forwardIndexReaderContext);
+ Object[] values = new Object[length];
+ for (int i = 0; i < length; i++) {
+ values[i] = buffer[i];
+ }
+
+ return values;
+ }
+ default:
+ throw new IllegalStateException("Invalid stored type=" +
_forwardIndexReader.getStoredType());
+ }
}
}
}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
index 12bb8b61a2..f70779feed 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
@@ -81,15 +81,23 @@ public class ForwardIndexHandlerTest {
private static final String DIM_ZSTANDARD_INTEGER = "DIM_ZSTANDARD_INTEGER";
private static final String DIM_LZ4_INTEGER = "DIM_LZ4_INTEGER";
+ // Dictionary columns
private static final String DIM_DICT_INTEGER = "DIM_DICT_INTEGER";
private static final String DIM_DICT_STRING = "DIM_DICT_STRING";
private static final String DIM_DICT_LONG = "DIM_DICT_LONG";
- private static final String METRIC_PASSTHROUGH_INTEGER =
"METRIC_PASSTHROUGH_INTEGER";
+ // Metric columns
+ private static final String METRIC_PASS_THROUGH_INTEGER =
"METRIC_PASS_THROUGH_INTEGER";
private static final String METRIC_SNAPPY_INTEGER = "METRIC_SNAPPY_INTEGER";
private static final String METRIC_ZSTANDARD_INTEGER =
"METRIC_ZSTANDARD_INTEGER";
private static final String METRIC_LZ4_INTEGER = "METRIC_LZ4_INTEGER";
+ // Multivalue columns
+ private static final String DIM_MV_PASS_THROUGH_INTEGER =
"DIM_MV_PASS_THROUGH_INTEGER";
+ private static final String DIM_MV_PASS_THROUGH_LONG =
"DIM_MV_PASS_THROUGH_LONG";
+ private static final String DIM_MV_PASS_THROUGH_STRING =
"DIM_MV_PASS_THROUGH_STRING";
+ private static final String DIM_MV_PASS_THROUGH_BYTES =
"DIM_MV_PASS_THROUGH_BYTES";
+
private static final List<String> RAW_SNAPPY_INDEX_COLUMNS =
Arrays.asList(DIM_SNAPPY_STRING, DIM_SNAPPY_LONG, DIM_SNAPPY_INTEGER,
METRIC_SNAPPY_INTEGER);
@@ -98,7 +106,8 @@ public class ForwardIndexHandlerTest {
private static final List<String> RAW_PASS_THROUGH_INDEX_COLUMNS =
Arrays.asList(DIM_PASS_THROUGH_STRING, DIM_PASS_THROUGH_LONG,
DIM_PASS_THROUGH_INTEGER,
- METRIC_PASSTHROUGH_INTEGER);
+ METRIC_PASS_THROUGH_INTEGER, DIM_MV_PASS_THROUGH_INTEGER,
DIM_MV_PASS_THROUGH_LONG,
+ DIM_MV_PASS_THROUGH_STRING, DIM_MV_PASS_THROUGH_BYTES);
private static final List<String> RAW_LZ4_INDEX_COLUMNS =
Arrays.asList(DIM_LZ4_STRING, DIM_LZ4_LONG, DIM_LZ4_INTEGER,
METRIC_LZ4_INTEGER);
@@ -171,11 +180,13 @@ public class ForwardIndexHandlerTest {
.addSingleValueDimension(DIM_DICT_INTEGER, FieldSpec.DataType.INT)
.addSingleValueDimension(DIM_DICT_LONG, FieldSpec.DataType.LONG)
.addSingleValueDimension(DIM_DICT_STRING, FieldSpec.DataType.STRING)
- .addMetric(METRIC_PASSTHROUGH_INTEGER, FieldSpec.DataType.INT)
+ .addMetric(METRIC_PASS_THROUGH_INTEGER, FieldSpec.DataType.INT)
.addMetric(METRIC_SNAPPY_INTEGER,
FieldSpec.DataType.INT).addMetric(METRIC_LZ4_INTEGER, FieldSpec.DataType.INT)
.addMetric(METRIC_ZSTANDARD_INTEGER, FieldSpec.DataType.INT)
-
- .build();
+ .addMultiValueDimension(DIM_MV_PASS_THROUGH_INTEGER,
FieldSpec.DataType.INT)
+ .addMultiValueDimension(DIM_MV_PASS_THROUGH_LONG,
FieldSpec.DataType.LONG)
+ .addMultiValueDimension(DIM_MV_PASS_THROUGH_STRING,
FieldSpec.DataType.STRING)
+ .addMultiValueDimension(DIM_MV_PASS_THROUGH_BYTES,
FieldSpec.DataType.BYTES).build();
SegmentGeneratorConfig config = new SegmentGeneratorConfig(_tableConfig,
_schema);
config.setOutDir(INDEX_DIR.getPath());
@@ -200,16 +211,42 @@ public class ForwardIndexHandlerTest {
Integer[] tempIntRows = new Integer[rowLength];
Long[] tempLongRows = new Long[rowLength];
+ int maxNumberOfMVEntries = random.nextInt(500);
+ String[][] tempMVStringRows = new String[rowLength][maxNumberOfMVEntries];
+ Integer[][] tempMVIntRows = new Integer[rowLength][maxNumberOfMVEntries];
+ Long[][] tempMVLongRows = new Long[rowLength][maxNumberOfMVEntries];
+ byte[][][] tempMVByteRows = new byte[rowLength][maxNumberOfMVEntries][];
+
for (int i = 0; i < rowLength; i++) {
//Adding a fixed value to check for filter queries
if (i % 10 == 0) {
tempStringRows[i] = "testRow";
tempIntRows[i] = 1001;
tempLongRows[i] = 1001L;
+
+ // Avoid creating empty arrays.
+ int numMVElements = random.nextInt(maxNumberOfMVEntries - 1) + 1;
+ for (int j = 0; j < numMVElements; j++) {
+ tempMVIntRows[i][j] = 1001;
+ tempMVLongRows[i][j] = 1001L;
+ String str = "testRow";
+ tempMVStringRows[i][j] = str;
+ tempMVByteRows[i][j] = (byte[]) str.getBytes();
+ }
} else {
tempStringRows[i] = "n" + i;
tempIntRows[i] = i;
tempLongRows[i] = (long) i;
+
+ // Avoid creating empty arrays.
+ int numMVElements = random.nextInt(maxNumberOfMVEntries - 1) + 1;
+ for (int j = 0; j < numMVElements; j++) {
+ tempMVIntRows[i][j] = j;
+ tempMVLongRows[i][j] = (long) j;
+ String str = "n" + i;
+ tempMVStringRows[i][j] = str;
+ tempMVByteRows[i][j] = (byte[]) str.getBytes();
+ }
}
}
@@ -228,7 +265,7 @@ public class ForwardIndexHandlerTest {
row.putValue(DIM_PASS_THROUGH_INTEGER, tempIntRows[i]);
row.putValue(DIM_LZ4_INTEGER, tempIntRows[i]);
row.putValue(METRIC_LZ4_INTEGER, tempIntRows[i]);
- row.putValue(METRIC_PASSTHROUGH_INTEGER, tempIntRows[i]);
+ row.putValue(METRIC_PASS_THROUGH_INTEGER, tempIntRows[i]);
row.putValue(METRIC_ZSTANDARD_INTEGER, tempIntRows[i]);
row.putValue(METRIC_SNAPPY_INTEGER, tempIntRows[i]);
@@ -243,6 +280,12 @@ public class ForwardIndexHandlerTest {
row.putValue(DIM_DICT_LONG, tempLongRows[i]);
row.putValue(DIM_DICT_STRING, tempStringRows[i]);
+ // MV columns
+ row.putValue(DIM_MV_PASS_THROUGH_INTEGER, tempMVIntRows[i]);
+ row.putValue(DIM_MV_PASS_THROUGH_LONG, tempMVLongRows[i]);
+ row.putValue(DIM_MV_PASS_THROUGH_STRING, tempMVStringRows[i]);
+ row.putValue(DIM_MV_PASS_THROUGH_BYTES, tempMVByteRows[i]);
+
rows.add(row);
}
return rows;
@@ -278,7 +321,7 @@ public class ForwardIndexHandlerTest {
operationMap = fwdIndexHandler.computeOperation(writer);
assertEquals(operationMap, Collections.EMPTY_MAP);
- // TEST4: Add random index. ForwardIndexHandler should be a No-Op.
+ // TEST4: Add an additional text index. ForwardIndexHandler should be a
No-Op.
indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
indexLoadingConfig.getTextIndexColumns().add(DIM_DICT_INTEGER);
indexLoadingConfig.getTextIndexColumns().add(DIM_LZ4_INTEGER);
@@ -287,12 +330,21 @@ public class ForwardIndexHandlerTest {
assertEquals(operationMap, Collections.EMPTY_MAP);
// TEST5: Change compression
+ Random rand = new Random();
// Create new tableConfig with the modified fieldConfigs.
List<FieldConfig> fieldConfigs = new
ArrayList<>(_tableConfig.getFieldConfigList());
- FieldConfig config = fieldConfigs.remove(0);
- FieldConfig newConfig = new FieldConfig(config.getName(),
FieldConfig.EncodingType.RAW, Collections.emptyList(),
- FieldConfig.CompressionCodec.ZSTANDARD, null);
+ int randIdx = rand.nextInt(fieldConfigs.size());
+ FieldConfig config = fieldConfigs.remove(randIdx);
+ FieldConfig.CompressionCodec newCompressionType = null;
+ for (FieldConfig.CompressionCodec type : _allCompressionTypes) {
+ if (config.getCompressionCodec() != type) {
+ newCompressionType = type;
+ }
+ }
+ FieldConfig newConfig =
+ new FieldConfig(config.getName(), FieldConfig.EncodingType.RAW,
Collections.emptyList(), newCompressionType,
+ null);
fieldConfigs.add(newConfig);
TableConfig tableConfig =
new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setNoDictionaryColumns(_noDictionaryColumns)
@@ -339,7 +391,6 @@ public class ForwardIndexHandlerTest {
public void testRewriteRawForwardIndexForSingleColumn()
throws Exception {
for (int i = 0; i < _noDictionaryColumns.size(); i++) {
-
// For every noDictionaryColumn, change the compressionType to all
available types, one by one.
for (FieldConfig.CompressionCodec compressionType :
_allCompressionTypes) {
// Setup
@@ -365,6 +416,9 @@ public class ForwardIndexHandlerTest {
IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null,
tableConfig);
IndexCreatorProvider indexCreatorProvider =
IndexingOverrides.getIndexCreatorProvider();
ForwardIndexHandler fwdIndexHandler = new
ForwardIndexHandler(existingSegmentMetadata, indexLoadingConfig);
+ boolean val = fwdIndexHandler.needUpdateIndices(writer);
+
+
fwdIndexHandler.updateIndices(writer, indexCreatorProvider);
@@ -434,6 +488,7 @@ public class ForwardIndexHandlerTest {
new SegmentLocalFSDirectory(_segmentDirectory,
existingSegmentMetadata, ReadMode.mmap);
SegmentDirectory.Writer writer = segmentLocalFSDirectory.createWriter();
ColumnMetadata columnMetadata =
existingSegmentMetadata.getColumnMetadataFor(columnName);
+ boolean isSingleValue = columnMetadata.isSingleValue();
// Check Compression type in header
ForwardIndexReader fwdIndexReader =
LoaderUtils.getForwardIndexReader(writer, columnMetadata);
@@ -449,12 +504,61 @@ public class ForwardIndexHandlerTest {
Object val = columnReader.getValue(rowIdx);
FieldSpec.DataType dataType = forwardIndexReader.getStoredType();
- if (dataType == FieldSpec.DataType.STRING) {
- assertEquals((String) val, "testRow");
- } else if (dataType == FieldSpec.DataType.INT) {
- assertEquals((int) val, 1001, columnName + " " + rowIdx + " " +
expectedCompressionType);
- } else if (dataType == FieldSpec.DataType.LONG) {
- assertEquals((long) val, 1001L, columnName + " " + rowIdx + " " +
expectedCompressionType);
+
+ switch (dataType) {
+ case STRING: {
+ if (isSingleValue) {
+ assertEquals((String) val, "testRow");
+ } else {
+ Object[] values = (Object[]) val;
+ int length = values.length;
+ for (int i = 0; i < length; i++) {
+ assertEquals((String) values[i], "testRow");
+ }
+ }
+ break;
+ }
+ case INT: {
+ if (isSingleValue) {
+ assertEquals((int) val, 1001, columnName + " " + rowIdx + " "
+ expectedCompressionType);
+ } else {
+ Object[] values = (Object[]) val;
+ int length = values.length;
+ for (int i = 0; i < length; i++) {
+ assertEquals((int) values[i], 1001, columnName + " " +
rowIdx + " " + expectedCompressionType);
+ }
+ }
+ break;
+ }
+ case LONG: {
+ if (isSingleValue) {
+ assertEquals((long) val, 1001L, columnName + " " + rowIdx + "
" + expectedCompressionType);
+ } else {
+ Object[] values = (Object[]) val;
+ int length = values.length;
+ for (int i = 0; i < length; i++) {
+ assertEquals((long) values[i], 1001, columnName + " " +
rowIdx + " " + expectedCompressionType);
+ }
+ }
+ break;
+ }
+ case BYTES: {
+ byte[] expectedVal = "testRow".getBytes();
+ if (isSingleValue) {
+ assertEquals((byte[]) val, expectedVal, columnName + " " +
rowIdx + " " + expectedCompressionType);
+ } else {
+ Object[] values = (Object[]) val;
+ int length = values.length;
+ for (int i = 0; i < length; i++) {
+ assertEquals((byte[]) values[i], expectedVal,
+ columnName + " " + rowIdx + " " +
expectedCompressionType);
+ }
+ }
+ break;
+ }
+ default:
+ // Unreachable code.
+ throw new IllegalStateException("Invalid datatype for column=" +
columnName);
}
}
}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
index babbe646bc..85ffeb555d 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
@@ -98,6 +98,9 @@ public class SegmentPreProcessorTest {
private static final String NEWLY_ADDED_STRING_MV_COL_RAW =
"newTextMVColRaw";
private static final String NEWLY_ADDED_STRING_MV_COL_DICT =
"newTextMVColDict";
+ // For raw MV column.
+ private static final String EXISTING_INT_COL_RAW_MV = "column6";
+
// For create fst index tests
private static final String NEWLY_ADDED_FST_COL_DICT = "newFSTColDict";
@@ -209,11 +212,15 @@ public class SegmentPreProcessorTest {
throws Exception {
FileUtils.deleteQuietly(INDEX_DIR);
+ List<String> rawCols = new ArrayList<>();
+ rawCols.add(EXISTING_STRING_COL_RAW);
+ rawCols.add(EXISTING_INT_COL_RAW_MV);
+
// Create inverted index for 'column7' when constructing the segment.
SegmentGeneratorConfig segmentGeneratorConfig =
SegmentTestUtils.getSegmentGeneratorConfigWithSchema(_avroFile,
INDEX_DIR, "testTable", _tableConfig, _schema);
segmentGeneratorConfig.setInvertedIndexCreationColumns(Collections.singletonList(COLUMN7_NAME));
-
segmentGeneratorConfig.setRawIndexCreationColumns(Collections.singletonList(EXISTING_STRING_COL_RAW));
+ segmentGeneratorConfig.setRawIndexCreationColumns(rawCols);
SegmentIndexCreationDriver driver = SegmentCreationDriverFactory.get(null);
driver.init(segmentGeneratorConfig);
driver.build();
@@ -328,19 +335,19 @@ public class SegmentPreProcessorTest {
ChunkCompressionType newCompressionType = ChunkCompressionType.ZSTANDARD;
compressionConfigs.put(EXISTING_STRING_COL_RAW, newCompressionType);
_indexLoadingConfig.setCompressionConfigs(compressionConfigs);
- _indexLoadingConfig.setNoDictionaryColumns(new HashSet<String>() {{
- add(EXISTING_STRING_COL_RAW);
- }});
+ _indexLoadingConfig.getNoDictionaryColumns().add(EXISTING_STRING_COL_RAW);
// Test1: Rewriting forward index will be a no-op for v1 segments. Default
LZ4 compressionType will be retained.
constructV1Segment();
- checkForwardIndexCreation(EXISTING_STRING_COL_RAW, 5, 3, _schema, false,
false, false, 0, ChunkCompressionType.LZ4);
+ checkForwardIndexCreation(EXISTING_STRING_COL_RAW, 5, 3, _schema, false,
false, false, 0, ChunkCompressionType.LZ4,
+ true, 0, DataType.STRING, 100000);
// Convert the segment to V3.
new SegmentV1V2ToV3FormatConverter().convert(_indexDir);
// Test2: Now forward index will be rewritten with ZSTANDARD
compressionType.
- checkForwardIndexCreation(EXISTING_STRING_COL_RAW, 5, 3, _schema, false,
false, false, 0, newCompressionType);
+ checkForwardIndexCreation(EXISTING_STRING_COL_RAW, 5, 3, _schema, false,
false, false, 0, newCompressionType, true,
+ 0, DataType.STRING, 100000);
// Test3: Change compression on existing raw index column. Also add text
index on same column. Check correctness.
newCompressionType = ChunkCompressionType.SNAPPY;
@@ -355,7 +362,7 @@ public class SegmentPreProcessorTest {
assertNotNull(columnMetadata);
checkTextIndexCreation(EXISTING_STRING_COL_RAW, 5, 3, _schema, false,
false, false, 0);
validateIndex(ColumnIndexType.FORWARD_INDEX, EXISTING_STRING_COL_RAW, 5,
3, _schema, false, false, false, 0, true,
- 0, newCompressionType, false);
+ 0, newCompressionType, false, DataType.STRING, 100000);
// Test4: Change compression on RAW index column. Change another index on
another column. Check correctness.
newCompressionType = ChunkCompressionType.ZSTANDARD;
@@ -372,7 +379,18 @@ public class SegmentPreProcessorTest {
checkFSTIndexCreation(EXISTING_STRING_COL_DICT, 9, 4,
_newColumnsSchemaWithFST, false, false, 26);
// Check forward index.
validateIndex(ColumnIndexType.FORWARD_INDEX, EXISTING_STRING_COL_RAW, 5,
3, _schema, false, false, false, 0, true,
- 0, newCompressionType, false);
+ 0, newCompressionType, false, DataType.STRING, 100000);
+
+ // Test5: Change compressionType for an MV column
+ newCompressionType = ChunkCompressionType.ZSTANDARD;
+ compressionConfigs.put(EXISTING_INT_COL_RAW_MV, newCompressionType);
+ _indexLoadingConfig.setCompressionConfigs(compressionConfigs);
+ _indexLoadingConfig.getNoDictionaryColumns().add(EXISTING_INT_COL_RAW_MV);
+
+ constructV1Segment();
+ new SegmentV1V2ToV3FormatConverter().convert(_indexDir);
+ checkForwardIndexCreation(EXISTING_INT_COL_RAW_MV, 18499, 15, _schema,
false, false, false, 0,
+ ChunkCompressionType.ZSTANDARD, false, 13, DataType.INT, 106688);
}
/**
@@ -474,14 +492,14 @@ public class SegmentPreProcessorTest {
boolean isSorted, int dictionaryElementSize)
throws Exception {
createAndValidateIndex(ColumnIndexType.FST_INDEX, column, cardinality,
bits, schema, isAutoGenerated, true,
- isSorted, dictionaryElementSize, true, 0, null, false);
+ isSorted, dictionaryElementSize, true, 0, null, false,
DataType.STRING, 100000);
}
private void checkTextIndexCreation(String column, int cardinality, int
bits, Schema schema, boolean isAutoGenerated,
boolean hasDictionary, boolean isSorted, int dictionaryElementSize)
throws Exception {
createAndValidateIndex(ColumnIndexType.TEXT_INDEX, column, cardinality,
bits, schema, isAutoGenerated,
- hasDictionary, isSorted, dictionaryElementSize, true, 0, null, false);
+ hasDictionary, isSorted, dictionaryElementSize, true, 0, null, false,
DataType.STRING, 100000);
}
private void checkTextIndexCreation(String column, int cardinality, int
bits, Schema schema, boolean isAutoGenerated,
@@ -489,21 +507,24 @@ public class SegmentPreProcessorTest {
int maxNumberOfMultiValues)
throws Exception {
createAndValidateIndex(ColumnIndexType.TEXT_INDEX, column, cardinality,
bits, schema, isAutoGenerated,
- hasDictionary, isSorted, dictionaryElementSize, isSingleValue,
maxNumberOfMultiValues, null, false);
+ hasDictionary, isSorted, dictionaryElementSize, isSingleValue,
maxNumberOfMultiValues, null, false,
+ DataType.STRING, 100000);
}
private void checkForwardIndexCreation(String column, int cardinality, int
bits, Schema schema,
boolean isAutoGenerated, boolean hasDictionary, boolean isSorted, int
dictionaryElementSize,
- ChunkCompressionType expectedCompressionType)
+ ChunkCompressionType expectedCompressionType, boolean isSingleValue, int
maxNumberOfMultiValues,
+ DataType dataType, int totalNumberOfEntries)
throws Exception {
createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, column, cardinality,
bits, schema, isAutoGenerated,
- hasDictionary, isSorted, dictionaryElementSize, true, 0,
expectedCompressionType, false);
+ hasDictionary, isSorted, dictionaryElementSize, isSingleValue,
maxNumberOfMultiValues, expectedCompressionType,
+ false, dataType, totalNumberOfEntries);
}
private void createAndValidateIndex(ColumnIndexType indexType, String
column, int cardinality, int bits,
Schema schema, boolean isAutoGenerated, boolean hasDictionary, boolean
isSorted, int dictionaryElementSize,
boolean isSingleValued, int maxNumberOfMultiValues, ChunkCompressionType
expectedCompressionType,
- boolean forwardIndexDisabled)
+ boolean forwardIndexDisabled, DataType dataType, int
totalNumberOfEntries)
throws Exception {
try (SegmentDirectory segmentDirectory =
SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
@@ -512,18 +533,19 @@ public class SegmentPreProcessorTest {
SegmentPreProcessor processor = new
SegmentPreProcessor(segmentDirectory, _indexLoadingConfig, schema)) {
processor.process();
validateIndex(indexType, column, cardinality, bits, schema,
isAutoGenerated, hasDictionary, isSorted,
- dictionaryElementSize, isSingleValued, maxNumberOfMultiValues,
expectedCompressionType, forwardIndexDisabled);
+ dictionaryElementSize, isSingleValued, maxNumberOfMultiValues,
expectedCompressionType, forwardIndexDisabled,
+ dataType, totalNumberOfEntries);
}
}
private void validateIndex(ColumnIndexType indexType, String column, int
cardinality, int bits, Schema schema,
boolean isAutoGenerated, boolean hasDictionary, boolean isSorted, int
dictionaryElementSize,
boolean isSingleValued, int maxNumberOfMultiValues, ChunkCompressionType
expectedCompressionType,
- boolean forwardIndexDisabled)
+ boolean forwardIndexDisabled, DataType dataType, int
totalNumberOfEntries)
throws Exception {
SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
ColumnMetadata columnMetadata =
segmentMetadata.getColumnMetadataFor(column);
- assertEquals(columnMetadata.getFieldSpec(), new DimensionFieldSpec(column,
DataType.STRING, isSingleValued));
+ assertEquals(columnMetadata.getFieldSpec(), new DimensionFieldSpec(column,
dataType, isSingleValued));
assertEquals(columnMetadata.getCardinality(), cardinality);
assertEquals(columnMetadata.getTotalDocs(), 100000);
assertEquals(columnMetadata.getBitsPerElement(), bits);
@@ -531,7 +553,7 @@ public class SegmentPreProcessorTest {
assertEquals(columnMetadata.isSorted(), isSorted);
assertEquals(columnMetadata.hasDictionary(), hasDictionary);
assertEquals(columnMetadata.getMaxNumberOfMultiValues(),
maxNumberOfMultiValues);
- assertEquals(columnMetadata.getTotalNumberOfEntries(), 100000);
+ assertEquals(columnMetadata.getTotalNumberOfEntries(),
totalNumberOfEntries);
assertEquals(columnMetadata.isAutoGenerated(), isAutoGenerated);
try (SegmentDirectory segmentDirectory1 =
SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
@@ -1501,7 +1523,7 @@ public class SegmentPreProcessorTest {
// Forward index is always going to be present for default SV columns with
forward index disabled. This is because
// such default columns are going to be sorted and the
forwardIndexDisabled flag is a no-op for sorted columns
createAndValidateIndex(ColumnIndexType.FORWARD_INDEX,
NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV, 1, 1,
- _newColumnsSchemaWithForwardIndexDisabled, true, true, true, 4, true,
0, null, true);
+ _newColumnsSchemaWithForwardIndexDisabled, true, true, true, 4, true,
0, null, true, DataType.STRING, 100000);
// Create a segment in V1, add a column with no forward index enabled
constructV1Segment();
@@ -1513,7 +1535,7 @@ public class SegmentPreProcessorTest {
// Forward index is always going to be present for default SV columns with
forward index disabled. This is because
// such default columns are going to be sorted and the
forwardIndexDisabled flag is a no-op for sorted columns
createAndValidateIndex(ColumnIndexType.FORWARD_INDEX,
NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV, 1, 1,
- _newColumnsSchemaWithForwardIndexDisabled, true, true, true, 4, true,
0, null, true);
+ _newColumnsSchemaWithForwardIndexDisabled, true, true, true, 4, true,
0, null, true, DataType.STRING, 100000);
// Add the column to the no dictionary column list
Set<String> existingNoDictionaryColumns =
_indexLoadingConfig.getNoDictionaryColumns();
@@ -1527,9 +1549,9 @@ public class SegmentPreProcessorTest {
assertNull(columnMetadata);
try {
- createAndValidateIndex(ColumnIndexType.FORWARD_INDEX,
- NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV, 1, 1,
_newColumnsSchemaWithForwardIndexDisabled, true, true, false,
- 4, false, 1, null, false);
+ createAndValidateIndex(ColumnIndexType.FORWARD_INDEX,
NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV, 1, 1,
+ _newColumnsSchemaWithForwardIndexDisabled, true, true, false, 4,
false, 1, null, false, DataType.STRING,
+ 100000);
Assert.fail("Forward index cannot be disabled for dictionary disabled
columns!");
} catch (IllegalStateException e) {
assertEquals(e.getMessage(), "Dictionary disabled column:
newForwardIndexDisabledColumnSV cannot disable the "
@@ -1544,7 +1566,8 @@ public class SegmentPreProcessorTest {
try {
createAndValidateIndex(ColumnIndexType.FORWARD_INDEX,
NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV, 1, 1,
- _newColumnsSchemaWithForwardIndexDisabled, true, true, false, 4,
false, 1, null, false);
+ _newColumnsSchemaWithForwardIndexDisabled, true, true, false, 4,
false, 1, null, false, DataType.STRING,
+ 100000);
Assert.fail("Forward index cannot be disabled for dictionary disabled
columns!");
} catch (IllegalStateException e) {
assertEquals(e.getMessage(), "Dictionary disabled column:
newForwardIndexDisabledColumnSV cannot disable the "
@@ -1569,7 +1592,7 @@ public class SegmentPreProcessorTest {
// is a no-op
createAndValidateIndex(ColumnIndexType.FORWARD_INDEX,
NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV, 1, 1,
_newColumnsSchemaWithForwardIndexDisabled, true, true, true,
- 4, true, 0, null, false);
+ 4, true, 0, null, false, DataType.STRING, 100000);
constructV1Segment();
segmentMetadata = new SegmentMetadataImpl(_indexDir);
@@ -1579,9 +1602,8 @@ public class SegmentPreProcessorTest {
// Column should be sorted and should be created successfully since for SV
columns the forwardIndexDisabled flag
// is a no-op
- createAndValidateIndex(ColumnIndexType.FORWARD_INDEX,
- NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV, 1, 1,
_newColumnsSchemaWithForwardIndexDisabled, true, true, true,
- 4, true, 0, null, false);
+ createAndValidateIndex(ColumnIndexType.FORWARD_INDEX,
NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV, 1, 1,
+ _newColumnsSchemaWithForwardIndexDisabled, true, true, true, 4, true,
0, null, false, DataType.STRING, 100000);
// Reset the sorted column list
_indexLoadingConfig.setSortedColumn(existingSortedColumns.isEmpty() ? null
: existingSortedColumns.get(0));
@@ -1599,7 +1621,8 @@ public class SegmentPreProcessorTest {
try {
createAndValidateIndex(ColumnIndexType.FORWARD_INDEX,
NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV, 1, 1,
- _newColumnsSchemaWithForwardIndexDisabled, true, true, true, 4,
true, 0, null, false);
+ _newColumnsSchemaWithForwardIndexDisabled, true, true, true, 4,
true, 0, null, false, DataType.STRING,
+ 100000);
Assert.fail("Forward index cannot be disabled without enabling the
inverted index!");
} catch (IllegalStateException e) {
assertEquals(e.getMessage(), "Inverted index must be enabled for forward
index disabled column: "
@@ -1613,9 +1636,9 @@ public class SegmentPreProcessorTest {
assertNull(columnMetadata);
try {
- createAndValidateIndex(ColumnIndexType.FORWARD_INDEX,
- NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV, 1, 1,
_newColumnsSchemaWithForwardIndexDisabled, true, true, true,
- 4, true, 0, null, false);
+ createAndValidateIndex(ColumnIndexType.FORWARD_INDEX,
NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV, 1, 1,
+ _newColumnsSchemaWithForwardIndexDisabled, true, true, true, 4,
true, 0, null, false, DataType.STRING,
+ 100000);
} catch (IllegalStateException e) {
assertEquals(e.getMessage(), "Inverted index must be enabled for forward
index disabled column: "
+ "newForwardIndexDisabledColumnSV");
@@ -1647,9 +1670,9 @@ public class SegmentPreProcessorTest {
// Validate that the forward index doesn't exist and that inverted index
does exist
createAndValidateIndex(ColumnIndexType.FORWARD_INDEX,
NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, 1, 1,
- _newColumnsSchemaWithForwardIndexDisabled, true, true, false, 4,
false, 1, null, true);
+ _newColumnsSchemaWithForwardIndexDisabled, true, true, false, 4,
false, 1, null, true, DataType.STRING, 100000);
createAndValidateIndex(ColumnIndexType.INVERTED_INDEX,
NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, 1, 1,
- _newColumnsSchemaWithForwardIndexDisabled, true, true, false, 4,
false, 1, null, true);
+ _newColumnsSchemaWithForwardIndexDisabled, true, true, false, 4,
false, 1, null, true, DataType.STRING, 100000);
// Create a segment in V1, add a column with no forward index enabled
constructV1Segment();
@@ -1660,9 +1683,9 @@ public class SegmentPreProcessorTest {
// Validate that the forward index doesn't exist and that inverted index
does exist
createAndValidateIndex(ColumnIndexType.FORWARD_INDEX,
NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, 1, 1,
- _newColumnsSchemaWithForwardIndexDisabled, true, true, false, 4,
false, 1, null, true);
+ _newColumnsSchemaWithForwardIndexDisabled, true, true, false, 4,
false, 1, null, true, DataType.STRING, 100000);
createAndValidateIndex(ColumnIndexType.INVERTED_INDEX,
NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, 1, 1,
- _newColumnsSchemaWithForwardIndexDisabled, true, true, false, 4,
false, 1, null, true);
+ _newColumnsSchemaWithForwardIndexDisabled, true, true, false, 4,
false, 1, null, true, DataType.STRING, 100000);
// Add the column to the no dictionary column list
Set<String> existingNoDictionaryColumns =
_indexLoadingConfig.getNoDictionaryColumns();
@@ -1675,9 +1698,10 @@ public class SegmentPreProcessorTest {
// should be null since column does not exist in the schema
assertNull(columnMetadata);
- assertThrows(IllegalStateException.class, () ->
createAndValidateIndex(ColumnIndexType.FORWARD_INDEX,
- NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, 1, 1,
_newColumnsSchemaWithForwardIndexDisabled, true, true, false,
- 4, false, 1, null, false));
+ assertThrows(IllegalStateException.class,
+ () -> createAndValidateIndex(ColumnIndexType.FORWARD_INDEX,
NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, 1, 1,
+ _newColumnsSchemaWithForwardIndexDisabled, true, true, false, 4,
false, 1, null, false, DataType.STRING,
+ 100000));
constructV1Segment();
segmentMetadata = new SegmentMetadataImpl(_indexDir);
@@ -1685,9 +1709,10 @@ public class SegmentPreProcessorTest {
// should be null since column does not exist in the schema
assertNull(columnMetadata);
- assertThrows(IllegalStateException.class, () ->
createAndValidateIndex(ColumnIndexType.FORWARD_INDEX,
- NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, 1, 1,
_newColumnsSchemaWithForwardIndexDisabled, true, true, false,
- 4, false, 1, null, false));
+ assertThrows(IllegalStateException.class,
+ () -> createAndValidateIndex(ColumnIndexType.FORWARD_INDEX,
NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, 1, 1,
+ _newColumnsSchemaWithForwardIndexDisabled, true, true, false, 4,
false, 1, null, false, DataType.STRING,
+ 100000));
// Reset the no dictionary columns
_indexLoadingConfig.setNoDictionaryColumns(existingNoDictionaryColumns);
@@ -1703,9 +1728,10 @@ public class SegmentPreProcessorTest {
// should be null since column does not exist in the schema
assertNull(columnMetadata);
- assertThrows(IllegalStateException.class, () ->
createAndValidateIndex(ColumnIndexType.FORWARD_INDEX,
- NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, 1, 1,
_newColumnsSchemaWithForwardIndexDisabled, true, true, false,
- 4, false, 1, null, false));
+ assertThrows(IllegalStateException.class,
+ () -> createAndValidateIndex(ColumnIndexType.FORWARD_INDEX,
NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, 1, 1,
+ _newColumnsSchemaWithForwardIndexDisabled, true, true, false, 4,
false, 1, null, false, DataType.STRING,
+ 100000));
constructV1Segment();
segmentMetadata = new SegmentMetadataImpl(_indexDir);
@@ -1713,9 +1739,10 @@ public class SegmentPreProcessorTest {
// should be null since column does not exist in the schema
assertNull(columnMetadata);
- assertThrows(IllegalStateException.class, () ->
createAndValidateIndex(ColumnIndexType.FORWARD_INDEX,
- NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, 1, 1,
_newColumnsSchemaWithForwardIndexDisabled, true, true, false,
- 4, false, 1, null, false));
+ assertThrows(IllegalStateException.class,
+ () -> createAndValidateIndex(ColumnIndexType.FORWARD_INDEX,
NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, 1, 1,
+ _newColumnsSchemaWithForwardIndexDisabled, true, true, false, 4,
false, 1, null, false, DataType.STRING,
+ 100000));
_indexLoadingConfig.setForwardIndexDisabledColumns(new HashSet<>());
}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java
index b2e2d75c98..9ec3215c6d 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java
@@ -106,7 +106,8 @@ public interface IndexCreationContext {
.withTotalDocs(columnMetadata.getTotalDocs())
.withDictionary(columnMetadata.hasDictionary())
.withMinValue(columnMetadata.getMinValue())
- .withMaxValue(columnMetadata.getMaxValue());
+ .withMaxValue(columnMetadata.getMaxValue())
+
.withMaxNumberOfMultiValueElements(columnMetadata.getMaxNumberOfMultiValues());
}
public Builder withLengthOfLongestEntry(int lengthOfLongestEntry) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]