This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ae93ce74e7 Support rewriting forward index upon changing compression 
type for existing raw MV column (#9510)
ae93ce74e7 is described below

commit ae93ce74e7dbff2efba0ed0c39eb458495e67e1a
Author: Vivek Iyer Vaidyanathan <[email protected]>
AuthorDate: Mon Oct 17 16:26:20 2022 -0700

    Support rewriting forward index upon changing compression type for existing 
raw MV column (#9510)
    
    * Make StreamMessage generic
    
    * Allow changing compressionType for MV columns during segmentReload
    
    * Address review comments
    
    * Refactor and cleanup code
    
    * Optimize for loop in ForwardIndexHandler
---
 .../impl/fwd/MultiValueVarByteRawIndexCreator.java |  57 ++++-
 .../segment/index/loader/ForwardIndexHandler.java  | 233 +++++++++++++++------
 .../segment/readers/PinotSegmentColumnReader.java  | 111 ++++++++--
 .../index/loader/ForwardIndexHandlerTest.java      | 138 ++++++++++--
 .../index/loader/SegmentPreProcessorTest.java      | 123 ++++++-----
 .../segment/spi/creator/IndexCreationContext.java  |   3 +-
 6 files changed, 515 insertions(+), 150 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java
index 471b4bd315..84d471d50e 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java
@@ -75,10 +75,8 @@ public class MultiValueVarByteRawIndexCreator implements 
ForwardIndexCreator {
       int totalDocs, DataType valueType, int writerVersion, int 
maxRowLengthInBytes, int maxNumberOfElements)
       throws IOException {
     //we will prepend the actual content with numElements and length array 
containing length of each element
-    int maxLengthPrefixes = Integer.BYTES * maxNumberOfElements;
-    int totalMaxLength = Integer.BYTES + maxRowLengthInBytes + 
maxLengthPrefixes;
-    Preconditions.checkArgument((maxLengthPrefixes | maxRowLengthInBytes | 
totalMaxLength | maxNumberOfElements) > 0,
-        "integer overflow detected");
+    int totalMaxLength = getTotalRowStorageBytes(maxNumberOfElements, 
maxRowLengthInBytes);
+
     File file = new File(baseIndexDir,
         column + Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION);
     int numDocsPerChunk = Math.max(
@@ -119,4 +117,55 @@ public class MultiValueVarByteRawIndexCreator implements 
ForwardIndexCreator {
       throws IOException {
     _indexWriter.close();
   }
+
+  /**
+   * The actual content in an MV array is prepended with 2 prefixes:
+   * 1. elementLengthStoragePrefixInBytes - bytes required to store the length 
of each element in the largest array
+   * 2. numElementsStoragePrefixInBytes - Number of elements in the array
+   *
+   * This function returns the total bytes needed to store (1) 
elementLengthStoragePrefixInBytes
+   */
+  public static int getElementLengthStoragePrefixInBytes(int 
maxNumberOfElements) {
+    return Integer.BYTES * maxNumberOfElements;
+  }
+
+  /**
+   * The actual content in an MV array is prepended with 2 prefixes:
+   * 1. elementLengthStoragePrefixInBytes - bytes required to store the length 
of each element in the largest array
+   * 2. numElementsStoragePrefixInBytes - Number of elements in the array
+   *
+   * This function returns the bytes needed to store (2) 
numElementsStoragePrefixInBytes
+   */
+  public static int getNumElementsStoragePrefix() {
+    return Integer.BYTES;
+  }
+
+  /**
+   * The actual content in an MV array is prepended with 2 prefixes:
+   * 1. elementLengthStoragePrefixInBytes - bytes required to store the length 
of each element in the largest array
+   * 2. numElementsStoragePrefixInBytes - Number of elements in the array
+   *
+   * This function returns the bytes needed to store the (1), (2) and the 
actual content.
+   */
+  public static int getTotalRowStorageBytes(int maxNumberOfElements, int 
maxRowDataLengthInBytes) {
+    int elementLengthStoragePrefixInBytes = 
getElementLengthStoragePrefixInBytes(maxNumberOfElements);
+    int numElementsStoragePrefixInBytes = getNumElementsStoragePrefix();
+    int totalMaxLength = elementLengthStoragePrefixInBytes + 
numElementsStoragePrefixInBytes + maxRowDataLengthInBytes;
+    Preconditions.checkArgument(
+        (elementLengthStoragePrefixInBytes | maxRowDataLengthInBytes | 
totalMaxLength | maxNumberOfElements) > 0,
+        "integer overflow detected");
+
+    return totalMaxLength;
+  }
+
+  /**
+   * The actual content in an MV array is prepended with 2 prefixes:
+   * 1. elementLengthStoragePrefixInBytes - bytes required to store the length 
of each element in the largest array
+   * 2. numberOfElementsStoragePrefix - Number of elements in the array
+   *
+   * This function returns the bytes needed to store the actual content.
+   */
+  public static int getMaxRowDataLengthInBytes(int totalMaxLength, int 
maxNumberOfElements) {
+    return totalMaxLength - getNumElementsStoragePrefix() - 
getElementLengthStoragePrefixInBytes(maxNumberOfElements);
+  }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
index d9c317e2e6..67ba9ff6e6 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
@@ -27,7 +27,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import org.apache.commons.io.FileUtils;
-import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
+import 
org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueVarByteRawIndexCreator;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.segment.spi.V1Constants;
@@ -37,6 +37,7 @@ import 
org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
 import org.apache.pinot.segment.spi.creator.SegmentVersion;
 import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
 import org.apache.pinot.segment.spi.store.ColumnIndexType;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
 import org.slf4j.Logger;
@@ -48,12 +49,12 @@ import org.slf4j.LoggerFactory;
  * that this handler only works for segment versions >= 3.0. Support for 
segment version < 3.0 is not added because
  * majority of the usecases are in versions >= 3.0 and this avoids adding tech 
debt. The currently supported
  * operations are:
- * 1. Change compression on raw SV columns.
+ * 1. Change compression on raw SV and MV columns.
  *
  *  TODO: Add support for the following:
- *  1. Change compression for raw MV columns
- *  2. Enable dictionary
- *  3. Disable dictionary
+ *  1. Enable dictionary
+ *  2. Disable dictionary
+ *  3. Segment versions < V3
  */
 public class ForwardIndexHandler implements IndexHandler {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ForwardIndexHandler.class);
@@ -106,7 +107,7 @@ public class ForwardIndexHandler implements IndexHandler {
       throws Exception {
     Map<String, Operation> columnOperationMap = new HashMap<>();
 
-    // Does not work for segment versions < V3
+    // Does not work for segment versions < V3.
     if (_segmentMetadata.getVersion().compareTo(SegmentVersion.v3) < 0) {
       return columnOperationMap;
     }
@@ -140,11 +141,6 @@ public class ForwardIndexHandler implements IndexHandler {
   private boolean shouldChangeCompressionType(String column, 
SegmentDirectory.Reader segmentReader) throws Exception {
     ColumnMetadata existingColMetadata = 
_segmentMetadata.getColumnMetadataFor(column);
 
-    // TODO: Remove this MV column limitation.
-    if (!existingColMetadata.isSingleValue()) {
-      return false;
-    }
-
     // The compression type for an existing segment can only be determined by 
reading the forward index header.
     try (ForwardIndexReader fwdIndexReader = 
LoaderUtils.getForwardIndexReader(segmentReader, existingColMetadata)) {
       ChunkCompressionType existingCompressionType = 
fwdIndexReader.getCompressionType();
@@ -173,12 +169,15 @@ public class ForwardIndexHandler implements IndexHandler {
       IndexCreatorProvider indexCreatorProvider)
       throws Exception {
     Preconditions.checkState(_segmentMetadata.getVersion() == 
SegmentVersion.v3);
-
     ColumnMetadata existingColMetadata = 
_segmentMetadata.getColumnMetadataFor(column);
+    boolean isSingleValue = existingColMetadata.isSingleValue();
+
     File indexDir = _segmentMetadata.getIndexDir();
     String segmentName = _segmentMetadata.getName();
     File inProgress = new File(indexDir, column + ".fwd.inprogress");
-    File fwdIndexFile = new File(indexDir, column + 
V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
+    String fileExtension = isSingleValue ? 
V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION
+        : V1Constants.Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION;
+    File fwdIndexFile = new File(indexDir, column + fileExtension);
 
     if (!inProgress.exists()) {
       // Marker file does not exist, which means last run ended normally.
@@ -199,16 +198,49 @@ public class ForwardIndexHandler implements IndexHandler {
     // are processed only if a valid compressionType is specified in 
fieldConfig.
     ChunkCompressionType newCompressionType = compressionConfigs.get(column);
 
-    int numDocs = existingColMetadata.getTotalDocs();
+    if (isSingleValue) {
+      rewriteRawSVForwardIndex(column, existingColMetadata, indexDir, 
segmentWriter, indexCreatorProvider,
+          newCompressionType);
+    } else {
+      rewriteRawMVForwardIndex(column, existingColMetadata, indexDir, 
segmentWriter, indexCreatorProvider,
+          newCompressionType);
+    }
 
+    // We used the existing forward index to generate a new forward index. The 
existing forward index will be in V3
+    // format and the new forward index will be in V1 format. Remove the 
existing forward index as it is not needed
+    // anymore. Note that removeIndex() will only mark an index for removal 
and remove the in-memory state. The
+    // actual cleanup from columns.psf file will happen when 
singleFileIndexDirectory.cleanupRemovedIndices() is
+    // called during segmentWriter.close().
+    segmentWriter.removeIndex(column, ColumnIndexType.FORWARD_INDEX);
+    LoaderUtils.writeIndexToV3Format(segmentWriter, column, fwdIndexFile, 
ColumnIndexType.FORWARD_INDEX);
+
+    // Delete the marker file.
+    FileUtils.deleteQuietly(inProgress);
+
+    LOGGER.info("Created forward index for segment: {}, column: {}", 
segmentName, column);
+  }
+
+  private void rewriteRawMVForwardIndex(String column, ColumnMetadata 
existingColMetadata, File indexDir,
+      SegmentDirectory.Writer segmentWriter, IndexCreatorProvider 
indexCreatorProvider,
+      ChunkCompressionType newCompressionType)
+      throws Exception {
     try (ForwardIndexReader reader = 
LoaderUtils.getForwardIndexReader(segmentWriter, existingColMetadata)) {
+      // For VarByte MV columns like String and Bytes, the storage 
representation of each row contains the following
+      // components:
+      // 1. bytes required to store the actual elements of the MV row (A)
+      // 2. bytes required to store the number of elements in the MV row (B)
+      // 3. bytes required to store the length of each MV element (C)
+      //
+      // lengthOfLongestEntry = A + B + C
+      // maxRowLengthInBytes = A
       int lengthOfLongestEntry = reader.getLengthOfLongestEntry();
-      Preconditions.checkState(lengthOfLongestEntry >= 0,
-          "lengthOfLongestEntry cannot be negative. segment=" + segmentName + 
" column={}" + column);
+      int maxNumberOfMVEntries = 
existingColMetadata.getMaxNumberOfMultiValues();
+      int maxRowLengthInBytes =
+          
MultiValueVarByteRawIndexCreator.getMaxRowDataLengthInBytes(lengthOfLongestEntry,
 maxNumberOfMVEntries);
 
       IndexCreationContext.Forward context =
           
IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(existingColMetadata)
-              .withLengthOfLongestEntry(lengthOfLongestEntry).build()
+              
.withLengthOfLongestEntry(lengthOfLongestEntry).withMaxRowLengthInBytes(maxRowLengthInBytes).build()
               .forForwardIndex(newCompressionType, 
_indexLoadingConfig.getColumnProperties());
 
       try (ForwardIndexCreator creator = 
indexCreatorProvider.newForwardIndexCreator(context)) {
@@ -220,54 +252,137 @@ public class ForwardIndexHandler implements IndexHandler 
{
           throw new UnsupportedOperationException(failureMsg);
         }
 
-        PinotSegmentColumnReader columnReader =
-            new PinotSegmentColumnReader(reader, null, null, 
existingColMetadata.getMaxNumberOfMultiValues());
+        int numDocs = existingColMetadata.getTotalDocs();
+        forwardIndexWriterHelper(column, reader, creator, numDocs);
+      }
+    }
+  }
 
-        for (int i = 0; i < numDocs; i++) {
-          Object val = columnReader.getValue(i);
-
-          // JSON fields are either stored as string or bytes. No special 
handling is needed because we make this
-          // decision based on the storedType of the reader.
-          switch (reader.getStoredType()) {
-            case INT:
-              creator.putInt((int) val);
-              break;
-            case LONG:
-              creator.putLong((long) val);
-              break;
-            case FLOAT:
-              creator.putFloat((float) val);
-              break;
-            case DOUBLE:
-              creator.putDouble((double) val);
-              break;
-            case STRING:
-              creator.putString((String) val);
-              break;
-            case BYTES:
-              creator.putBytes((byte[]) val);
-              break;
-            case BIG_DECIMAL:
-              creator.putBigDecimal((BigDecimal) val);
-              break;
-            default:
-              throw new IllegalStateException();
-          }
+  private void rewriteRawSVForwardIndex(String column, ColumnMetadata 
existingColMetadata, File indexDir,
+      SegmentDirectory.Writer segmentWriter, IndexCreatorProvider 
indexCreatorProvider,
+      ChunkCompressionType newCompressionType)
+      throws Exception {
+    try (ForwardIndexReader reader = 
LoaderUtils.getForwardIndexReader(segmentWriter, existingColMetadata)) {
+      int lengthOfLongestEntry = reader.getLengthOfLongestEntry();
+
+      IndexCreationContext.Forward context =
+          
IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(existingColMetadata)
+              .withLengthOfLongestEntry(lengthOfLongestEntry).build()
+              .forForwardIndex(newCompressionType, 
_indexLoadingConfig.getColumnProperties());
+
+      try (ForwardIndexCreator creator = 
indexCreatorProvider.newForwardIndexCreator(context)) {
+        // If creator stored type and the reader stored type do not match, 
throw an exception.
+        if (!reader.getStoredType().equals(creator.getValueType())) {
+          String failureMsg =
+              "Unsupported operation to change datatype for column=" + column 
+ " from " + reader.getStoredType()
+                  .toString() + " to " + creator.getValueType().toString();
+          throw new UnsupportedOperationException(failureMsg);
         }
+
+        int numDocs = existingColMetadata.getTotalDocs();
+        forwardIndexWriterHelper(column, reader, creator, numDocs);
       }
     }
+  }
 
-    // We used the existing forward index to generate a new forward index. The 
existing forward index will be in V3
-    // format and the new forward index will be in V1 format. Remove the 
existing forward index as it is not needed
-    // anymore. Note that removeIndex() will only mark an index for removal 
and remove the in-memory state. The
-    // actual cleanup from columns.psf file will happen when 
singleFileIndexDirectory.cleanupRemovedIndices() is
-    // called during segmentWriter.close().
-    segmentWriter.removeIndex(column, ColumnIndexType.FORWARD_INDEX);
-    LoaderUtils.writeIndexToV3Format(segmentWriter, column, fwdIndexFile, 
ColumnIndexType.FORWARD_INDEX);
+  private void forwardIndexWriterHelper(String column, ForwardIndexReader 
reader, ForwardIndexCreator creator,
+      int numDocs) {
+    // If creator stored type should match reader stored type. We do not 
support changing datatypes.
+    if (!reader.getStoredType().equals(creator.getValueType())) {
+      String failureMsg =
+          "Unsupported operation to change datatype for column=" + column + " 
from " + reader.getStoredType().toString()
+              + " to " + creator.getValueType().toString();
+      throw new UnsupportedOperationException(failureMsg);
+    }
 
-    // Delete the marker file.
-    FileUtils.deleteQuietly(inProgress);
+    ForwardIndexReaderContext readerContext = reader.createContext();
+    boolean isSVColumn = reader.isSingleValue();
 
-    LOGGER.info("Created forward index for segment: {}, column: {}", 
segmentName, column);
+    switch (reader.getStoredType()) {
+      // JSON fields are either stored as string or bytes. No special handling 
is needed because we make this
+      // decision based on the storedType of the reader.
+      case INT: {
+        for (int i = 0; i < numDocs; i++) {
+          if (isSVColumn) {
+            int val = reader.getInt(i, readerContext);
+            creator.putInt(val);
+          } else {
+            int[] ints = reader.getIntMV(i, readerContext);
+            creator.putIntMV(ints);
+          }
+        }
+        break;
+      }
+      case LONG: {
+        for (int i = 0; i < numDocs; i++) {
+          if (isSVColumn) {
+            long val = reader.getLong(i, readerContext);
+            creator.putLong(val);
+          } else {
+            long[] longs = reader.getLongMV(i, readerContext);
+            creator.putLongMV(longs);
+          }
+        }
+        break;
+      }
+      case FLOAT: {
+        for (int i = 0; i < numDocs; i++) {
+          if (isSVColumn) {
+            float val = reader.getFloat(i, readerContext);
+            creator.putFloat(val);
+          } else {
+            float[] floats = reader.getFloatMV(i, readerContext);
+            creator.putFloatMV(floats);
+          }
+        }
+        break;
+      }
+      case DOUBLE: {
+        for (int i = 0; i < numDocs; i++) {
+          if (isSVColumn) {
+            double val = reader.getDouble(i, readerContext);
+            creator.putDouble(val);
+          } else {
+            double[] doubles = reader.getDoubleMV(i, readerContext);
+            creator.putDoubleMV(doubles);
+          }
+        }
+        break;
+      }
+      case STRING: {
+        for (int i = 0; i < numDocs; i++) {
+          if (isSVColumn) {
+            String val = reader.getString(i, readerContext);
+            creator.putString(val);
+          } else {
+            String[] strings = reader.getStringMV(i, readerContext);
+            creator.putStringMV(strings);
+          }
+        }
+        break;
+      }
+      case BYTES: {
+        for (int i = 0; i < numDocs; i++) {
+          if (isSVColumn) {
+            byte[] val = reader.getBytes(i, readerContext);
+            creator.putBytes(val);
+          } else {
+            byte[][] bytesArray = reader.getBytesMV(i, readerContext);
+            creator.putBytesMV(bytesArray);
+          }
+        }
+        break;
+      }
+      case BIG_DECIMAL: {
+        for (int i = 0; i < numDocs; i++) {
+          Preconditions.checkState(isSVColumn, "BigDecimal is not supported 
for MV columns");
+          BigDecimal val = reader.getBigDecimal(i, readerContext);
+          creator.putBigDecimal(val);
+        }
+        break;
+      }
+      default:
+        throw new IllegalStateException();
+    }
   }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java
index a510bfd128..8b9ba6bb6f 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java
@@ -37,6 +37,7 @@ public class PinotSegmentColumnReader implements Closeable {
   private final Dictionary _dictionary;
   private final NullValueVectorReader _nullValueVectorReader;
   private final int[] _dictIdBuffer;
+  private final int _maxNumValuesPerMVEntry;
 
   public PinotSegmentColumnReader(IndexSegment indexSegment, String column) {
     DataSource dataSource = indexSegment.getDataSource(column);
@@ -48,8 +49,11 @@ public class PinotSegmentColumnReader implements Closeable {
     _nullValueVectorReader = dataSource.getNullValueVector();
     if (_forwardIndexReader.isSingleValue()) {
       _dictIdBuffer = null;
+      _maxNumValuesPerMVEntry = -1;
     } else {
-      _dictIdBuffer = new 
int[dataSource.getDataSourceMetadata().getMaxNumValuesPerMVEntry()];
+      _maxNumValuesPerMVEntry = 
dataSource.getDataSourceMetadata().getMaxNumValuesPerMVEntry();
+      Preconditions.checkState(_maxNumValuesPerMVEntry >= 0, 
"maxNumValuesPerMVEntry is negative for an MV column.");
+      _dictIdBuffer = new int[_maxNumValuesPerMVEntry];
     }
   }
 
@@ -59,6 +63,7 @@ public class PinotSegmentColumnReader implements Closeable {
     _forwardIndexReaderContext = _forwardIndexReader.createContext();
     _dictionary = dictionary;
     _nullValueVectorReader = nullValueVectorReader;
+    _maxNumValuesPerMVEntry = maxNumValuesPerMVEntry;
     if (_forwardIndexReader.isSingleValue()) {
       _dictIdBuffer = null;
     } else {
@@ -95,26 +100,90 @@ public class PinotSegmentColumnReader implements Closeable 
{
         return values;
       }
     } else {
-      // NOTE: Only support single-value raw index
-      assert _forwardIndexReader.isSingleValue();
-
-      switch (_forwardIndexReader.getStoredType()) {
-        case INT:
-          return _forwardIndexReader.getInt(docId, _forwardIndexReaderContext);
-        case LONG:
-          return _forwardIndexReader.getLong(docId, 
_forwardIndexReaderContext);
-        case FLOAT:
-          return _forwardIndexReader.getFloat(docId, 
_forwardIndexReaderContext);
-        case DOUBLE:
-          return _forwardIndexReader.getDouble(docId, 
_forwardIndexReaderContext);
-        case BIG_DECIMAL:
-          return _forwardIndexReader.getBigDecimal(docId, 
_forwardIndexReaderContext);
-        case STRING:
-          return _forwardIndexReader.getString(docId, 
_forwardIndexReaderContext);
-        case BYTES:
-          return _forwardIndexReader.getBytes(docId, 
_forwardIndexReaderContext);
-        default:
-          throw new IllegalStateException();
+      if (_forwardIndexReader.isSingleValue()) {
+        switch (_forwardIndexReader.getStoredType()) {
+          case INT:
+            return _forwardIndexReader.getInt(docId, 
_forwardIndexReaderContext);
+          case LONG:
+            return _forwardIndexReader.getLong(docId, 
_forwardIndexReaderContext);
+          case FLOAT:
+            return _forwardIndexReader.getFloat(docId, 
_forwardIndexReaderContext);
+          case DOUBLE:
+            return _forwardIndexReader.getDouble(docId, 
_forwardIndexReaderContext);
+          case BIG_DECIMAL:
+            return _forwardIndexReader.getBigDecimal(docId, 
_forwardIndexReaderContext);
+          case STRING:
+            return _forwardIndexReader.getString(docId, 
_forwardIndexReaderContext);
+          case BYTES:
+            return _forwardIndexReader.getBytes(docId, 
_forwardIndexReaderContext);
+          default:
+            throw new IllegalStateException();
+        }
+      } else {
+        switch (_forwardIndexReader.getStoredType()) {
+          case INT: {
+            int[] buffer = new int[_maxNumValuesPerMVEntry];
+            int length = _forwardIndexReader.getIntMV(docId, buffer, 
_forwardIndexReaderContext);
+            Object[] values = new Object[length];
+            for (int i = 0; i < length; i++) {
+              values[i] = buffer[i];
+            }
+
+            return values;
+          }
+          case LONG: {
+            long[] buffer = new long[_maxNumValuesPerMVEntry];
+            int length = _forwardIndexReader.getLongMV(docId, buffer, 
_forwardIndexReaderContext);
+            Object[] values = new Object[length];
+            for (int i = 0; i < length; i++) {
+              values[i] = buffer[i];
+            }
+
+            return values;
+          }
+          case FLOAT: {
+            float[] buffer = new float[_maxNumValuesPerMVEntry];
+            int length = _forwardIndexReader.getFloatMV(docId, buffer, 
_forwardIndexReaderContext);
+            Object[] values = new Object[length];
+            for (int i = 0; i < length; i++) {
+              values[i] = buffer[i];
+            }
+
+            return values;
+          }
+          case DOUBLE: {
+            double[] buffer = new double[_maxNumValuesPerMVEntry];
+            int length = _forwardIndexReader.getDoubleMV(docId, buffer, 
_forwardIndexReaderContext);
+            Object[] values = new Object[length];
+            for (int i = 0; i < length; i++) {
+              values[i] = buffer[i];
+            }
+
+            return values;
+          }
+          case STRING: {
+            String[] buffer = new String[_maxNumValuesPerMVEntry];
+            int length = _forwardIndexReader.getStringMV(docId, buffer, 
_forwardIndexReaderContext);
+            Object[] values = new Object[length];
+            for (int i = 0; i < length; i++) {
+              values[i] = buffer[i];
+            }
+
+            return values;
+          }
+          case BYTES: {
+            byte[][] buffer = new byte[_maxNumValuesPerMVEntry][];
+            int length = _forwardIndexReader.getBytesMV(docId, buffer, 
_forwardIndexReaderContext);
+            Object[] values = new Object[length];
+            for (int i = 0; i < length; i++) {
+              values[i] = buffer[i];
+            }
+
+            return values;
+          }
+          default:
+            throw new IllegalStateException("Invalid stored type=" + 
_forwardIndexReader.getStoredType());
+        }
       }
     }
   }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
index 12bb8b61a2..f70779feed 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
@@ -81,15 +81,23 @@ public class ForwardIndexHandlerTest {
   private static final String DIM_ZSTANDARD_INTEGER = "DIM_ZSTANDARD_INTEGER";
   private static final String DIM_LZ4_INTEGER = "DIM_LZ4_INTEGER";
 
+  // Dictionary columns
   private static final String DIM_DICT_INTEGER = "DIM_DICT_INTEGER";
   private static final String DIM_DICT_STRING = "DIM_DICT_STRING";
   private static final String DIM_DICT_LONG = "DIM_DICT_LONG";
 
-  private static final String METRIC_PASSTHROUGH_INTEGER = 
"METRIC_PASSTHROUGH_INTEGER";
+  // Metric columns
+  private static final String METRIC_PASS_THROUGH_INTEGER = 
"METRIC_PASS_THROUGH_INTEGER";
   private static final String METRIC_SNAPPY_INTEGER = "METRIC_SNAPPY_INTEGER";
   private static final String METRIC_ZSTANDARD_INTEGER = 
"METRIC_ZSTANDARD_INTEGER";
   private static final String METRIC_LZ4_INTEGER = "METRIC_LZ4_INTEGER";
 
+  // Multivalue columns
+  private static final String DIM_MV_PASS_THROUGH_INTEGER = 
"DIM_MV_PASS_THROUGH_INTEGER";
+  private static final String DIM_MV_PASS_THROUGH_LONG = 
"DIM_MV_PASS_THROUGH_LONG";
+  private static final String DIM_MV_PASS_THROUGH_STRING = 
"DIM_MV_PASS_THROUGH_STRING";
+  private static final String DIM_MV_PASS_THROUGH_BYTES = 
"DIM_MV_PASS_THROUGH_BYTES";
+
   private static final List<String> RAW_SNAPPY_INDEX_COLUMNS =
       Arrays.asList(DIM_SNAPPY_STRING, DIM_SNAPPY_LONG, DIM_SNAPPY_INTEGER, 
METRIC_SNAPPY_INTEGER);
 
@@ -98,7 +106,8 @@ public class ForwardIndexHandlerTest {
 
   private static final List<String> RAW_PASS_THROUGH_INDEX_COLUMNS =
       Arrays.asList(DIM_PASS_THROUGH_STRING, DIM_PASS_THROUGH_LONG, 
DIM_PASS_THROUGH_INTEGER,
-          METRIC_PASSTHROUGH_INTEGER);
+          METRIC_PASS_THROUGH_INTEGER, DIM_MV_PASS_THROUGH_INTEGER, 
DIM_MV_PASS_THROUGH_LONG,
+          DIM_MV_PASS_THROUGH_STRING, DIM_MV_PASS_THROUGH_BYTES);
 
   private static final List<String> RAW_LZ4_INDEX_COLUMNS =
       Arrays.asList(DIM_LZ4_STRING, DIM_LZ4_LONG, DIM_LZ4_INTEGER, 
METRIC_LZ4_INTEGER);
@@ -171,11 +180,13 @@ public class ForwardIndexHandlerTest {
         .addSingleValueDimension(DIM_DICT_INTEGER, FieldSpec.DataType.INT)
         .addSingleValueDimension(DIM_DICT_LONG, FieldSpec.DataType.LONG)
         .addSingleValueDimension(DIM_DICT_STRING, FieldSpec.DataType.STRING)
-        .addMetric(METRIC_PASSTHROUGH_INTEGER, FieldSpec.DataType.INT)
+        .addMetric(METRIC_PASS_THROUGH_INTEGER, FieldSpec.DataType.INT)
         .addMetric(METRIC_SNAPPY_INTEGER, 
FieldSpec.DataType.INT).addMetric(METRIC_LZ4_INTEGER, FieldSpec.DataType.INT)
         .addMetric(METRIC_ZSTANDARD_INTEGER, FieldSpec.DataType.INT)
-
-        .build();
+        .addMultiValueDimension(DIM_MV_PASS_THROUGH_INTEGER, 
FieldSpec.DataType.INT)
+        .addMultiValueDimension(DIM_MV_PASS_THROUGH_LONG, 
FieldSpec.DataType.LONG)
+        .addMultiValueDimension(DIM_MV_PASS_THROUGH_STRING, 
FieldSpec.DataType.STRING)
+        .addMultiValueDimension(DIM_MV_PASS_THROUGH_BYTES, 
FieldSpec.DataType.BYTES).build();
 
     SegmentGeneratorConfig config = new SegmentGeneratorConfig(_tableConfig, 
_schema);
     config.setOutDir(INDEX_DIR.getPath());
@@ -200,16 +211,42 @@ public class ForwardIndexHandlerTest {
     Integer[] tempIntRows = new Integer[rowLength];
     Long[] tempLongRows = new Long[rowLength];
 
+    int maxNumberOfMVEntries = random.nextInt(500);
+    String[][] tempMVStringRows = new String[rowLength][maxNumberOfMVEntries];
+    Integer[][] tempMVIntRows = new Integer[rowLength][maxNumberOfMVEntries];
+    Long[][] tempMVLongRows = new Long[rowLength][maxNumberOfMVEntries];
+    byte[][][] tempMVByteRows = new byte[rowLength][maxNumberOfMVEntries][];
+
     for (int i = 0; i < rowLength; i++) {
       //Adding a fixed value to check for filter queries
       if (i % 10 == 0) {
         tempStringRows[i] = "testRow";
         tempIntRows[i] = 1001;
         tempLongRows[i] = 1001L;
+
+        // Avoid creating empty arrays.
+        int numMVElements = random.nextInt(maxNumberOfMVEntries - 1) + 1;
+        for (int j = 0; j < numMVElements; j++) {
+          tempMVIntRows[i][j] = 1001;
+          tempMVLongRows[i][j] = 1001L;
+          String str = "testRow";
+          tempMVStringRows[i][j] = str;
+          tempMVByteRows[i][j] = (byte[]) str.getBytes();
+        }
       } else {
         tempStringRows[i] = "n" + i;
         tempIntRows[i] = i;
         tempLongRows[i] = (long) i;
+
+        // Avoid creating empty arrays.
+        int numMVElements = random.nextInt(maxNumberOfMVEntries - 1) + 1;
+        for (int j = 0; j < numMVElements; j++) {
+          tempMVIntRows[i][j] = j;
+          tempMVLongRows[i][j] = (long) j;
+          String str = "n" + i;
+          tempMVStringRows[i][j] = str;
+          tempMVByteRows[i][j] = (byte[]) str.getBytes();
+        }
       }
     }
 
@@ -228,7 +265,7 @@ public class ForwardIndexHandlerTest {
       row.putValue(DIM_PASS_THROUGH_INTEGER, tempIntRows[i]);
       row.putValue(DIM_LZ4_INTEGER, tempIntRows[i]);
       row.putValue(METRIC_LZ4_INTEGER, tempIntRows[i]);
-      row.putValue(METRIC_PASSTHROUGH_INTEGER, tempIntRows[i]);
+      row.putValue(METRIC_PASS_THROUGH_INTEGER, tempIntRows[i]);
       row.putValue(METRIC_ZSTANDARD_INTEGER, tempIntRows[i]);
       row.putValue(METRIC_SNAPPY_INTEGER, tempIntRows[i]);
 
@@ -243,6 +280,12 @@ public class ForwardIndexHandlerTest {
       row.putValue(DIM_DICT_LONG, tempLongRows[i]);
       row.putValue(DIM_DICT_STRING, tempStringRows[i]);
 
+      // MV columns
+      row.putValue(DIM_MV_PASS_THROUGH_INTEGER, tempMVIntRows[i]);
+      row.putValue(DIM_MV_PASS_THROUGH_LONG, tempMVLongRows[i]);
+      row.putValue(DIM_MV_PASS_THROUGH_STRING, tempMVStringRows[i]);
+      row.putValue(DIM_MV_PASS_THROUGH_BYTES, tempMVByteRows[i]);
+
       rows.add(row);
     }
     return rows;
@@ -278,7 +321,7 @@ public class ForwardIndexHandlerTest {
     operationMap = fwdIndexHandler.computeOperation(writer);
     assertEquals(operationMap, Collections.EMPTY_MAP);
 
-    // TEST4: Add random index. ForwardIndexHandler should be a No-Op.
+    // TEST4: Add an additional text index. ForwardIndexHandler should be a 
No-Op.
     indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
     indexLoadingConfig.getTextIndexColumns().add(DIM_DICT_INTEGER);
     indexLoadingConfig.getTextIndexColumns().add(DIM_LZ4_INTEGER);
@@ -287,12 +330,21 @@ public class ForwardIndexHandlerTest {
     assertEquals(operationMap, Collections.EMPTY_MAP);
 
     // TEST5: Change compression
+    Random rand = new Random();
 
     // Create new tableConfig with the modified fieldConfigs.
     List<FieldConfig> fieldConfigs = new 
ArrayList<>(_tableConfig.getFieldConfigList());
-    FieldConfig config = fieldConfigs.remove(0);
-    FieldConfig newConfig = new FieldConfig(config.getName(), 
FieldConfig.EncodingType.RAW, Collections.emptyList(),
-        FieldConfig.CompressionCodec.ZSTANDARD, null);
+    int randIdx = rand.nextInt(fieldConfigs.size());
+    FieldConfig config = fieldConfigs.remove(randIdx);
+    FieldConfig.CompressionCodec newCompressionType = null;
+    for (FieldConfig.CompressionCodec type : _allCompressionTypes) {
+      if (config.getCompressionCodec() != type) {
+        newCompressionType = type;
+      }
+    }
+    FieldConfig newConfig =
+        new FieldConfig(config.getName(), FieldConfig.EncodingType.RAW, 
Collections.emptyList(), newCompressionType,
+            null);
     fieldConfigs.add(newConfig);
     TableConfig tableConfig =
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setNoDictionaryColumns(_noDictionaryColumns)
@@ -339,7 +391,6 @@ public class ForwardIndexHandlerTest {
   public void testRewriteRawForwardIndexForSingleColumn()
       throws Exception {
     for (int i = 0; i < _noDictionaryColumns.size(); i++) {
-
       // For every noDictionaryColumn, change the compressionType to all 
available types, one by one.
       for (FieldConfig.CompressionCodec compressionType : 
_allCompressionTypes) {
         // Setup
@@ -365,6 +416,9 @@ public class ForwardIndexHandlerTest {
         IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, 
tableConfig);
         IndexCreatorProvider indexCreatorProvider = 
IndexingOverrides.getIndexCreatorProvider();
         ForwardIndexHandler fwdIndexHandler = new 
ForwardIndexHandler(existingSegmentMetadata, indexLoadingConfig);
+        boolean val = fwdIndexHandler.needUpdateIndices(writer);
+
+
 
         fwdIndexHandler.updateIndices(writer, indexCreatorProvider);
 
@@ -434,6 +488,7 @@ public class ForwardIndexHandlerTest {
         new SegmentLocalFSDirectory(_segmentDirectory, 
existingSegmentMetadata, ReadMode.mmap);
     SegmentDirectory.Writer writer = segmentLocalFSDirectory.createWriter();
     ColumnMetadata columnMetadata = 
existingSegmentMetadata.getColumnMetadataFor(columnName);
+    boolean isSingleValue = columnMetadata.isSingleValue();
 
     // Check Compression type in header
     ForwardIndexReader fwdIndexReader = 
LoaderUtils.getForwardIndexReader(writer, columnMetadata);
@@ -449,12 +504,61 @@ public class ForwardIndexHandlerTest {
           Object val = columnReader.getValue(rowIdx);
 
           FieldSpec.DataType dataType = forwardIndexReader.getStoredType();
-          if (dataType == FieldSpec.DataType.STRING) {
-            assertEquals((String) val, "testRow");
-          } else if (dataType == FieldSpec.DataType.INT) {
-            assertEquals((int) val, 1001, columnName + " " + rowIdx + " " + 
expectedCompressionType);
-          } else if (dataType == FieldSpec.DataType.LONG) {
-            assertEquals((long) val, 1001L, columnName + " " + rowIdx + " " + 
expectedCompressionType);
+
+          switch (dataType) {
+            case STRING: {
+              if (isSingleValue) {
+                assertEquals((String) val, "testRow");
+              } else {
+                Object[] values = (Object[]) val;
+                int length = values.length;
+                for (int i = 0; i < length; i++) {
+                  assertEquals((String) values[i], "testRow");
+                }
+              }
+              break;
+            }
+            case INT: {
+              if (isSingleValue) {
+                assertEquals((int) val, 1001, columnName + " " + rowIdx + " " 
+ expectedCompressionType);
+              } else {
+                Object[] values = (Object[]) val;
+                int length = values.length;
+                for (int i = 0; i < length; i++) {
+                  assertEquals((int) values[i], 1001, columnName + " " + 
rowIdx + " " + expectedCompressionType);
+                }
+              }
+              break;
+            }
+            case LONG: {
+              if (isSingleValue) {
+                assertEquals((long) val, 1001L, columnName + " " + rowIdx + " 
" + expectedCompressionType);
+              } else {
+                Object[] values = (Object[]) val;
+                int length = values.length;
+                for (int i = 0; i < length; i++) {
+                  assertEquals((long) values[i], 1001, columnName + " " + 
rowIdx + " " + expectedCompressionType);
+                }
+              }
+              break;
+            }
+            case BYTES: {
+              byte[] expectedVal = "testRow".getBytes();
+              if (isSingleValue) {
+                assertEquals((byte[]) val, expectedVal, columnName + " " + 
rowIdx + " " + expectedCompressionType);
+              } else {
+                Object[] values = (Object[]) val;
+                int length = values.length;
+                for (int i = 0; i < length; i++) {
+                  assertEquals((byte[]) values[i], expectedVal,
+                      columnName + " " + rowIdx + " " + 
expectedCompressionType);
+                }
+              }
+              break;
+            }
+            default:
+              // Unreachable code.
+              throw new IllegalStateException("Invalid datatype for column=" + 
columnName);
           }
         }
       }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
index babbe646bc..85ffeb555d 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
@@ -98,6 +98,9 @@ public class SegmentPreProcessorTest {
   private static final String NEWLY_ADDED_STRING_MV_COL_RAW = 
"newTextMVColRaw";
   private static final String NEWLY_ADDED_STRING_MV_COL_DICT = 
"newTextMVColDict";
 
+  // For raw MV column.
+  private static final String EXISTING_INT_COL_RAW_MV = "column6";
+
   // For create fst index tests
   private static final String NEWLY_ADDED_FST_COL_DICT = "newFSTColDict";
 
@@ -209,11 +212,15 @@ public class SegmentPreProcessorTest {
       throws Exception {
     FileUtils.deleteQuietly(INDEX_DIR);
 
+    List<String> rawCols = new ArrayList<>();
+    rawCols.add(EXISTING_STRING_COL_RAW);
+    rawCols.add(EXISTING_INT_COL_RAW_MV);
+
     // Create inverted index for 'column7' when constructing the segment.
     SegmentGeneratorConfig segmentGeneratorConfig =
         SegmentTestUtils.getSegmentGeneratorConfigWithSchema(_avroFile, 
INDEX_DIR, "testTable", _tableConfig, _schema);
     
segmentGeneratorConfig.setInvertedIndexCreationColumns(Collections.singletonList(COLUMN7_NAME));
-    
segmentGeneratorConfig.setRawIndexCreationColumns(Collections.singletonList(EXISTING_STRING_COL_RAW));
+    segmentGeneratorConfig.setRawIndexCreationColumns(rawCols);
     SegmentIndexCreationDriver driver = SegmentCreationDriverFactory.get(null);
     driver.init(segmentGeneratorConfig);
     driver.build();
@@ -328,19 +335,19 @@ public class SegmentPreProcessorTest {
     ChunkCompressionType newCompressionType = ChunkCompressionType.ZSTANDARD;
     compressionConfigs.put(EXISTING_STRING_COL_RAW, newCompressionType);
     _indexLoadingConfig.setCompressionConfigs(compressionConfigs);
-    _indexLoadingConfig.setNoDictionaryColumns(new HashSet<String>() {{
-      add(EXISTING_STRING_COL_RAW);
-    }});
+    _indexLoadingConfig.getNoDictionaryColumns().add(EXISTING_STRING_COL_RAW);
 
     // Test1: Rewriting forward index will be a no-op for v1 segments. Default 
LZ4 compressionType will be retained.
     constructV1Segment();
-    checkForwardIndexCreation(EXISTING_STRING_COL_RAW, 5, 3, _schema, false, 
false, false, 0, ChunkCompressionType.LZ4);
+    checkForwardIndexCreation(EXISTING_STRING_COL_RAW, 5, 3, _schema, false, 
false, false, 0, ChunkCompressionType.LZ4,
+        true, 0, DataType.STRING, 100000);
 
     // Convert the segment to V3.
     new SegmentV1V2ToV3FormatConverter().convert(_indexDir);
 
     // Test2: Now forward index will be rewritten with ZSTANDARD 
compressionType.
-    checkForwardIndexCreation(EXISTING_STRING_COL_RAW, 5, 3, _schema, false, 
false, false, 0, newCompressionType);
+    checkForwardIndexCreation(EXISTING_STRING_COL_RAW, 5, 3, _schema, false, 
false, false, 0, newCompressionType, true,
+        0, DataType.STRING, 100000);
 
     // Test3: Change compression on existing raw index column. Also add text 
index on same column. Check correctness.
     newCompressionType = ChunkCompressionType.SNAPPY;
@@ -355,7 +362,7 @@ public class SegmentPreProcessorTest {
     assertNotNull(columnMetadata);
     checkTextIndexCreation(EXISTING_STRING_COL_RAW, 5, 3, _schema, false, 
false, false, 0);
     validateIndex(ColumnIndexType.FORWARD_INDEX, EXISTING_STRING_COL_RAW, 5, 
3, _schema, false, false, false, 0, true,
-        0, newCompressionType, false);
+        0, newCompressionType, false, DataType.STRING, 100000);
 
     // Test4: Change compression on RAW index column. Change another index on 
another column. Check correctness.
     newCompressionType = ChunkCompressionType.ZSTANDARD;
@@ -372,7 +379,18 @@ public class SegmentPreProcessorTest {
     checkFSTIndexCreation(EXISTING_STRING_COL_DICT, 9, 4, 
_newColumnsSchemaWithFST, false, false, 26);
     // Check forward index.
     validateIndex(ColumnIndexType.FORWARD_INDEX, EXISTING_STRING_COL_RAW, 5, 
3, _schema, false, false, false, 0, true,
-        0, newCompressionType, false);
+        0, newCompressionType, false, DataType.STRING, 100000);
+
+    // Test5: Change compressionType for an MV column
+    newCompressionType = ChunkCompressionType.ZSTANDARD;
+    compressionConfigs.put(EXISTING_INT_COL_RAW_MV, newCompressionType);
+    _indexLoadingConfig.setCompressionConfigs(compressionConfigs);
+    _indexLoadingConfig.getNoDictionaryColumns().add(EXISTING_INT_COL_RAW_MV);
+
+    constructV1Segment();
+    new SegmentV1V2ToV3FormatConverter().convert(_indexDir);
+    checkForwardIndexCreation(EXISTING_INT_COL_RAW_MV, 18499, 15, _schema, 
false, false, false, 0,
+        ChunkCompressionType.ZSTANDARD, false, 13, DataType.INT, 106688);
   }
 
   /**
@@ -474,14 +492,14 @@ public class SegmentPreProcessorTest {
       boolean isSorted, int dictionaryElementSize)
       throws Exception {
     createAndValidateIndex(ColumnIndexType.FST_INDEX, column, cardinality, 
bits, schema, isAutoGenerated, true,
-        isSorted, dictionaryElementSize, true, 0, null, false);
+        isSorted, dictionaryElementSize, true, 0, null, false, 
DataType.STRING, 100000);
   }
 
   private void checkTextIndexCreation(String column, int cardinality, int 
bits, Schema schema, boolean isAutoGenerated,
       boolean hasDictionary, boolean isSorted, int dictionaryElementSize)
       throws Exception {
     createAndValidateIndex(ColumnIndexType.TEXT_INDEX, column, cardinality, 
bits, schema, isAutoGenerated,
-        hasDictionary, isSorted, dictionaryElementSize, true, 0, null, false);
+        hasDictionary, isSorted, dictionaryElementSize, true, 0, null, false, 
DataType.STRING, 100000);
   }
 
   private void checkTextIndexCreation(String column, int cardinality, int 
bits, Schema schema, boolean isAutoGenerated,
@@ -489,21 +507,24 @@ public class SegmentPreProcessorTest {
       int maxNumberOfMultiValues)
       throws Exception {
     createAndValidateIndex(ColumnIndexType.TEXT_INDEX, column, cardinality, 
bits, schema, isAutoGenerated,
-        hasDictionary, isSorted, dictionaryElementSize, isSingleValue, 
maxNumberOfMultiValues, null, false);
+        hasDictionary, isSorted, dictionaryElementSize, isSingleValue, 
maxNumberOfMultiValues, null, false,
+        DataType.STRING, 100000);
   }
 
   private void checkForwardIndexCreation(String column, int cardinality, int 
bits, Schema schema,
       boolean isAutoGenerated, boolean hasDictionary, boolean isSorted, int 
dictionaryElementSize,
-      ChunkCompressionType expectedCompressionType)
+      ChunkCompressionType expectedCompressionType, boolean isSingleValue, int 
maxNumberOfMultiValues,
+      DataType dataType, int totalNumberOfEntries)
       throws Exception {
     createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, column, cardinality, 
bits, schema, isAutoGenerated,
-        hasDictionary, isSorted, dictionaryElementSize, true, 0, 
expectedCompressionType, false);
+        hasDictionary, isSorted, dictionaryElementSize, isSingleValue, 
maxNumberOfMultiValues, expectedCompressionType,
+        false, dataType, totalNumberOfEntries);
   }
 
   private void createAndValidateIndex(ColumnIndexType indexType, String 
column, int cardinality, int bits,
       Schema schema, boolean isAutoGenerated, boolean hasDictionary, boolean 
isSorted, int dictionaryElementSize,
       boolean isSingleValued, int maxNumberOfMultiValues, ChunkCompressionType 
expectedCompressionType,
-      boolean forwardIndexDisabled)
+      boolean forwardIndexDisabled, DataType dataType, int 
totalNumberOfEntries)
       throws Exception {
 
     try (SegmentDirectory segmentDirectory = 
SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
@@ -512,18 +533,19 @@ public class SegmentPreProcessorTest {
         SegmentPreProcessor processor = new 
SegmentPreProcessor(segmentDirectory, _indexLoadingConfig, schema)) {
       processor.process();
       validateIndex(indexType, column, cardinality, bits, schema, 
isAutoGenerated, hasDictionary, isSorted,
-          dictionaryElementSize, isSingleValued, maxNumberOfMultiValues, 
expectedCompressionType, forwardIndexDisabled);
+          dictionaryElementSize, isSingleValued, maxNumberOfMultiValues, 
expectedCompressionType, forwardIndexDisabled,
+          dataType, totalNumberOfEntries);
     }
   }
 
   private void validateIndex(ColumnIndexType indexType, String column, int 
cardinality, int bits, Schema schema,
       boolean isAutoGenerated, boolean hasDictionary, boolean isSorted, int 
dictionaryElementSize,
       boolean isSingleValued, int maxNumberOfMultiValues, ChunkCompressionType 
expectedCompressionType,
-      boolean forwardIndexDisabled)
+      boolean forwardIndexDisabled, DataType dataType, int 
totalNumberOfEntries)
       throws Exception {
     SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
     ColumnMetadata columnMetadata = 
segmentMetadata.getColumnMetadataFor(column);
-    assertEquals(columnMetadata.getFieldSpec(), new DimensionFieldSpec(column, 
DataType.STRING, isSingleValued));
+    assertEquals(columnMetadata.getFieldSpec(), new DimensionFieldSpec(column, 
dataType, isSingleValued));
     assertEquals(columnMetadata.getCardinality(), cardinality);
     assertEquals(columnMetadata.getTotalDocs(), 100000);
     assertEquals(columnMetadata.getBitsPerElement(), bits);
@@ -531,7 +553,7 @@ public class SegmentPreProcessorTest {
     assertEquals(columnMetadata.isSorted(), isSorted);
     assertEquals(columnMetadata.hasDictionary(), hasDictionary);
     assertEquals(columnMetadata.getMaxNumberOfMultiValues(), 
maxNumberOfMultiValues);
-    assertEquals(columnMetadata.getTotalNumberOfEntries(), 100000);
+    assertEquals(columnMetadata.getTotalNumberOfEntries(), 
totalNumberOfEntries);
     assertEquals(columnMetadata.isAutoGenerated(), isAutoGenerated);
 
     try (SegmentDirectory segmentDirectory1 = 
SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
@@ -1501,7 +1523,7 @@ public class SegmentPreProcessorTest {
     // Forward index is always going to be present for default SV columns with 
forward index disabled. This is because
     // such default columns are going to be sorted and the 
forwardIndexDisabled flag is a no-op for sorted columns
     createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, 
NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV, 1, 1,
-        _newColumnsSchemaWithForwardIndexDisabled, true, true, true, 4, true, 
0, null, true);
+        _newColumnsSchemaWithForwardIndexDisabled, true, true, true, 4, true, 
0, null, true, DataType.STRING, 100000);
 
     // Create a segment in V1, add a column with no forward index enabled
     constructV1Segment();
@@ -1513,7 +1535,7 @@ public class SegmentPreProcessorTest {
     // Forward index is always going to be present for default SV columns with 
forward index disabled. This is because
     // such default columns are going to be sorted and the 
forwardIndexDisabled flag is a no-op for sorted columns
     createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, 
NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV, 1, 1,
-        _newColumnsSchemaWithForwardIndexDisabled, true, true, true, 4, true, 
0, null, true);
+        _newColumnsSchemaWithForwardIndexDisabled, true, true, true, 4, true, 
0, null, true, DataType.STRING, 100000);
 
     // Add the column to the no dictionary column list
     Set<String> existingNoDictionaryColumns = 
_indexLoadingConfig.getNoDictionaryColumns();
@@ -1527,9 +1549,9 @@ public class SegmentPreProcessorTest {
     assertNull(columnMetadata);
 
     try {
-    createAndValidateIndex(ColumnIndexType.FORWARD_INDEX,
-        NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV, 1, 1, 
_newColumnsSchemaWithForwardIndexDisabled, true, true, false,
-        4, false, 1, null, false);
+      createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, 
NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV, 1, 1,
+          _newColumnsSchemaWithForwardIndexDisabled, true, true, false, 4, 
false, 1, null, false, DataType.STRING,
+          100000);
       Assert.fail("Forward index cannot be disabled for dictionary disabled 
columns!");
     } catch (IllegalStateException e) {
       assertEquals(e.getMessage(), "Dictionary disabled column: 
newForwardIndexDisabledColumnSV cannot disable the "
@@ -1544,7 +1566,8 @@ public class SegmentPreProcessorTest {
 
     try {
       createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, 
NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV, 1, 1,
-          _newColumnsSchemaWithForwardIndexDisabled, true, true, false, 4, 
false, 1, null, false);
+          _newColumnsSchemaWithForwardIndexDisabled, true, true, false, 4, 
false, 1, null, false, DataType.STRING,
+          100000);
       Assert.fail("Forward index cannot be disabled for dictionary disabled 
columns!");
     } catch (IllegalStateException e) {
       assertEquals(e.getMessage(), "Dictionary disabled column: 
newForwardIndexDisabledColumnSV cannot disable the "
@@ -1569,7 +1592,7 @@ public class SegmentPreProcessorTest {
     // is a no-op
     createAndValidateIndex(ColumnIndexType.FORWARD_INDEX,
         NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV, 1, 1, 
_newColumnsSchemaWithForwardIndexDisabled, true, true, true,
-        4, true, 0, null, false);
+        4, true, 0, null, false, DataType.STRING, 100000);
 
     constructV1Segment();
     segmentMetadata = new SegmentMetadataImpl(_indexDir);
@@ -1579,9 +1602,8 @@ public class SegmentPreProcessorTest {
 
     // Column should be sorted and should be created successfully since for SV 
columns the forwardIndexDisabled flag
     // is a no-op
-    createAndValidateIndex(ColumnIndexType.FORWARD_INDEX,
-        NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV, 1, 1, 
_newColumnsSchemaWithForwardIndexDisabled, true, true, true,
-        4, true, 0, null, false);
+    createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, 
NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV, 1, 1,
+        _newColumnsSchemaWithForwardIndexDisabled, true, true, true, 4, true, 
0, null, false, DataType.STRING, 100000);
 
     // Reset the sorted column list
     _indexLoadingConfig.setSortedColumn(existingSortedColumns.isEmpty() ? null 
: existingSortedColumns.get(0));
@@ -1599,7 +1621,8 @@ public class SegmentPreProcessorTest {
 
     try {
       createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, 
NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV, 1, 1,
-          _newColumnsSchemaWithForwardIndexDisabled, true, true, true, 4, 
true, 0, null, false);
+          _newColumnsSchemaWithForwardIndexDisabled, true, true, true, 4, 
true, 0, null, false, DataType.STRING,
+          100000);
       Assert.fail("Forward index cannot be disabled without enabling the 
inverted index!");
     } catch (IllegalStateException e) {
       assertEquals(e.getMessage(), "Inverted index must be enabled for forward 
index disabled column: "
@@ -1613,9 +1636,9 @@ public class SegmentPreProcessorTest {
     assertNull(columnMetadata);
 
     try {
-      createAndValidateIndex(ColumnIndexType.FORWARD_INDEX,
-          NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV, 1, 1, 
_newColumnsSchemaWithForwardIndexDisabled, true, true, true,
-          4, true, 0, null, false);
+      createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, 
NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV, 1, 1,
+          _newColumnsSchemaWithForwardIndexDisabled, true, true, true, 4, 
true, 0, null, false, DataType.STRING,
+          100000);
     } catch (IllegalStateException e) {
       assertEquals(e.getMessage(), "Inverted index must be enabled for forward 
index disabled column: "
           + "newForwardIndexDisabledColumnSV");
@@ -1647,9 +1670,9 @@ public class SegmentPreProcessorTest {
 
     // Validate that the forward index doesn't exist and that inverted index 
does exist
     createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, 
NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, 1, 1,
-        _newColumnsSchemaWithForwardIndexDisabled, true, true, false, 4, 
false, 1, null, true);
+        _newColumnsSchemaWithForwardIndexDisabled, true, true, false, 4, 
false, 1, null, true, DataType.STRING, 100000);
     createAndValidateIndex(ColumnIndexType.INVERTED_INDEX, 
NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, 1, 1,
-        _newColumnsSchemaWithForwardIndexDisabled, true, true, false, 4, 
false, 1, null, true);
+        _newColumnsSchemaWithForwardIndexDisabled, true, true, false, 4, 
false, 1, null, true, DataType.STRING, 100000);
 
     // Create a segment in V1, add a column with no forward index enabled
     constructV1Segment();
@@ -1660,9 +1683,9 @@ public class SegmentPreProcessorTest {
 
     // Validate that the forward index doesn't exist and that inverted index 
does exist
     createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, 
NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, 1, 1,
-        _newColumnsSchemaWithForwardIndexDisabled, true, true, false, 4, 
false, 1, null, true);
+        _newColumnsSchemaWithForwardIndexDisabled, true, true, false, 4, 
false, 1, null, true, DataType.STRING, 100000);
     createAndValidateIndex(ColumnIndexType.INVERTED_INDEX, 
NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, 1, 1,
-        _newColumnsSchemaWithForwardIndexDisabled, true, true, false, 4, 
false, 1, null, true);
+        _newColumnsSchemaWithForwardIndexDisabled, true, true, false, 4, 
false, 1, null, true, DataType.STRING, 100000);
 
     // Add the column to the no dictionary column list
     Set<String> existingNoDictionaryColumns = 
_indexLoadingConfig.getNoDictionaryColumns();
@@ -1675,9 +1698,10 @@ public class SegmentPreProcessorTest {
     // should be null since column does not exist in the schema
     assertNull(columnMetadata);
 
-    assertThrows(IllegalStateException.class, () -> 
createAndValidateIndex(ColumnIndexType.FORWARD_INDEX,
-        NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, 1, 1, 
_newColumnsSchemaWithForwardIndexDisabled, true, true, false,
-        4, false, 1, null, false));
+    assertThrows(IllegalStateException.class,
+        () -> createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, 
NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, 1, 1,
+            _newColumnsSchemaWithForwardIndexDisabled, true, true, false, 4, 
false, 1, null, false, DataType.STRING,
+            100000));
 
     constructV1Segment();
     segmentMetadata = new SegmentMetadataImpl(_indexDir);
@@ -1685,9 +1709,10 @@ public class SegmentPreProcessorTest {
     // should be null since column does not exist in the schema
     assertNull(columnMetadata);
 
-    assertThrows(IllegalStateException.class, () -> 
createAndValidateIndex(ColumnIndexType.FORWARD_INDEX,
-        NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, 1, 1, 
_newColumnsSchemaWithForwardIndexDisabled, true, true, false,
-        4, false, 1, null, false));
+    assertThrows(IllegalStateException.class,
+        () -> createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, 
NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, 1, 1,
+            _newColumnsSchemaWithForwardIndexDisabled, true, true, false, 4, 
false, 1, null, false, DataType.STRING,
+            100000));
 
     // Reset the no dictionary columns
     _indexLoadingConfig.setNoDictionaryColumns(existingNoDictionaryColumns);
@@ -1703,9 +1728,10 @@ public class SegmentPreProcessorTest {
     // should be null since column does not exist in the schema
     assertNull(columnMetadata);
 
-    assertThrows(IllegalStateException.class, () -> 
createAndValidateIndex(ColumnIndexType.FORWARD_INDEX,
-        NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, 1, 1, 
_newColumnsSchemaWithForwardIndexDisabled, true, true, false,
-        4, false, 1, null, false));
+    assertThrows(IllegalStateException.class,
+        () -> createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, 
NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, 1, 1,
+            _newColumnsSchemaWithForwardIndexDisabled, true, true, false, 4, 
false, 1, null, false, DataType.STRING,
+            100000));
 
     constructV1Segment();
     segmentMetadata = new SegmentMetadataImpl(_indexDir);
@@ -1713,9 +1739,10 @@ public class SegmentPreProcessorTest {
     // should be null since column does not exist in the schema
     assertNull(columnMetadata);
 
-    assertThrows(IllegalStateException.class, () -> 
createAndValidateIndex(ColumnIndexType.FORWARD_INDEX,
-        NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, 1, 1, 
_newColumnsSchemaWithForwardIndexDisabled, true, true, false,
-        4, false, 1, null, false));
+    assertThrows(IllegalStateException.class,
+        () -> createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, 
NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, 1, 1,
+            _newColumnsSchemaWithForwardIndexDisabled, true, true, false, 4, 
false, 1, null, false, DataType.STRING,
+            100000));
 
     _indexLoadingConfig.setForwardIndexDisabledColumns(new HashSet<>());
   }
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java
index b2e2d75c98..9ec3215c6d 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java
@@ -106,7 +106,8 @@ public interface IndexCreationContext {
           .withTotalDocs(columnMetadata.getTotalDocs())
           .withDictionary(columnMetadata.hasDictionary())
           .withMinValue(columnMetadata.getMinValue())
-          .withMaxValue(columnMetadata.getMaxValue());
+          .withMaxValue(columnMetadata.getMaxValue())
+          
.withMaxNumberOfMultiValueElements(columnMetadata.getMaxNumberOfMultiValues());
     }
 
     public Builder withLengthOfLongestEntry(int lengthOfLongestEntry) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to