This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new aa5d65fccf Cleanup some reader/writer logic for raw forward index 
(#11669)
aa5d65fccf is described below

commit aa5d65fccf87623970d2c5bdae8019d5d09c6ae7
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Mon Sep 25 14:39:11 2023 -0700

    Cleanup some reader/writer logic for raw forward index (#11669)
---
 .../BenchmarkFixedByteSVForwardIndexReader.java    |  11 +-
 .../pinot/perf/BenchmarkRawForwardIndexReader.java |   8 +-
 .../pinot/perf/BenchmarkRawForwardIndexWriter.java |   8 +-
 ...riter.java => BaseChunkForwardIndexWriter.java} |  79 +++---
 ....java => FixedByteChunkForwardIndexWriter.java} |  34 +--
 ...er.java => VarByteChunkForwardIndexWriter.java} |  35 +--
 ....java => VarByteChunkForwardIndexWriterV4.java} |  45 +++-
 .../creator/impl/SegmentColumnarIndexCreator.java  |  40 +--
 .../fwd/MultiValueFixedByteRawIndexCreator.java    |  68 ++---
 .../impl/fwd/MultiValueVarByteRawIndexCreator.java |  29 +-
 .../fwd/SingleValueFixedByteRawIndexCreator.java   |  12 +-
 .../fwd/SingleValueVarByteRawIndexCreator.java     |  19 +-
 .../index/forward/ForwardIndexCreatorFactory.java  |  98 +++----
 .../index/forward/ForwardIndexReaderFactory.java   |  39 +--
 .../segment/index/loader/ForwardIndexHandler.java  |   1 +
 .../ColumnMinMaxValueGenerator.java                | 293 +++++++++------------
 .../forward/BaseChunkForwardIndexReader.java       |   5 +-
 .../FixedByteChunkMVForwardIndexReader.java        |  17 +-
 .../FixedByteChunkSVForwardIndexReader.java        |   4 +-
 .../FixedBytePower2ChunkSVForwardIndexReader.java  |   4 +-
 .../forward/VarByteChunkMVForwardIndexReader.java  |  24 +-
 .../forward/VarByteChunkSVForwardIndexReader.java  |   6 +-
 .../VarByteChunkSVForwardIndexReaderV4.java        |  54 ++--
 .../startree/v2/store/StarTreeLoaderUtils.java     |  13 +-
 .../impl/VarByteChunkSVForwardIndexWriterTest.java |  70 +++--
 .../segment/index/creator/RawIndexCreatorTest.java |  10 +-
 .../segment/index/creator/VarByteChunkV4Test.java  |  10 +-
 .../forward/FixedByteChunkSVForwardIndexTest.java  |  39 ++-
 .../index/forward/ForwardIndexTypeTest.java        |  10 +-
 .../forward/VarByteChunkSVForwardIndexTest.java    |  29 +-
 .../converter/DictionaryToRawIndexConverter.java   |   4 +-
 31 files changed, 516 insertions(+), 602 deletions(-)

diff --git 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkFixedByteSVForwardIndexReader.java
 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkFixedByteSVForwardIndexReader.java
index 2c2444345c..27d1c90f98 100644
--- 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkFixedByteSVForwardIndexReader.java
+++ 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkFixedByteSVForwardIndexReader.java
@@ -23,7 +23,7 @@ import java.io.IOException;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import org.apache.commons.io.FileUtils;
-import 
org.apache.pinot.segment.local.io.writer.impl.FixedByteChunkSVForwardIndexWriter;
+import 
org.apache.pinot.segment.local.io.writer.impl.FixedByteChunkForwardIndexWriter;
 import 
org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext;
 import 
org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader;
 import 
org.apache.pinot.segment.local.segment.index.readers.forward.FixedBytePower2ChunkSVForwardIndexReader;
@@ -69,18 +69,17 @@ public class BenchmarkFixedByteSVForwardIndexReader {
     File pow2CompressedIndexFile = new File(INDEX_DIR, 
UUID.randomUUID().toString());
     _doubleBuffer = new double[_blockSize];
     _longBuffer = new long[_blockSize];
-    try (FixedByteChunkSVForwardIndexWriter writer = new 
FixedByteChunkSVForwardIndexWriter(compressedIndexFile,
+    try (FixedByteChunkForwardIndexWriter writer = new 
FixedByteChunkForwardIndexWriter(compressedIndexFile,
         ChunkCompressionType.LZ4, _numBlocks * _blockSize, 1000, Long.BYTES, 
3);
-        FixedByteChunkSVForwardIndexWriter passthroughWriter = new 
FixedByteChunkSVForwardIndexWriter(
-            uncompressedIndexFile,
+        FixedByteChunkForwardIndexWriter passThroughWriter = new 
FixedByteChunkForwardIndexWriter(uncompressedIndexFile,
             ChunkCompressionType.PASS_THROUGH, _numBlocks * _blockSize, 1000, 
Long.BYTES, 3);
-        FixedByteChunkSVForwardIndexWriter pow2Writer = new 
FixedByteChunkSVForwardIndexWriter(pow2CompressedIndexFile,
+        FixedByteChunkForwardIndexWriter pow2Writer = new 
FixedByteChunkForwardIndexWriter(pow2CompressedIndexFile,
             ChunkCompressionType.LZ4, _numBlocks * _blockSize, 1000, 
Long.BYTES, 4)) {
       for (int i = 0; i < _numBlocks * _blockSize; i++) {
         long next = ThreadLocalRandom.current().nextLong();
         writer.putLong(next);
         pow2Writer.putLong(next);
-        passthroughWriter.putLong(next);
+        passThroughWriter.putLong(next);
       }
     }
     _compressedReader = new 
FixedByteChunkSVForwardIndexReader(PinotDataBuffer.loadBigEndianFile(compressedIndexFile),
diff --git 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRawForwardIndexReader.java
 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRawForwardIndexReader.java
index ae223e7bb9..6da32f2906 100644
--- 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRawForwardIndexReader.java
+++ 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRawForwardIndexReader.java
@@ -25,8 +25,8 @@ import java.util.SplittableRandom;
 import java.util.UUID;
 import java.util.function.LongSupplier;
 import org.apache.commons.io.FileUtils;
-import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter;
-import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriterV4;
+import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriter;
+import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4;
 import 
org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext;
 import 
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader;
 import 
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReaderV4;
@@ -137,7 +137,7 @@ public class BenchmarkRawForwardIndexReader {
         throws IOException {
       super.setup();
       _file = new File(TARGET_DIR, UUID.randomUUID().toString());
-      try (VarByteChunkSVForwardIndexWriterV4 writer = new 
VarByteChunkSVForwardIndexWriterV4(_file,
+      try (VarByteChunkForwardIndexWriterV4 writer = new 
VarByteChunkForwardIndexWriterV4(_file,
           _chunkCompressionType, _maxChunkSize)) {
         for (int i = 0; i < _records; i++) {
           writer.putBytes(_bytes[i]);
@@ -163,7 +163,7 @@ public class BenchmarkRawForwardIndexReader {
         throws IOException {
       super.setup();
       _file = new File(TARGET_DIR, UUID.randomUUID().toString());
-      try (VarByteChunkSVForwardIndexWriter writer = new 
VarByteChunkSVForwardIndexWriter(_file, _chunkCompressionType,
+      try (VarByteChunkForwardIndexWriter writer = new 
VarByteChunkForwardIndexWriter(_file, _chunkCompressionType,
           _records, _maxChunkSize / _maxLength, _maxLength, 3)) {
         for (int i = 0; i < _records; i++) {
           writer.putBytes(_bytes[i]);
diff --git 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRawForwardIndexWriter.java
 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRawForwardIndexWriter.java
index 86d92e22d0..f2938e949d 100644
--- 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRawForwardIndexWriter.java
+++ 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRawForwardIndexWriter.java
@@ -25,8 +25,8 @@ import java.util.SplittableRandom;
 import java.util.UUID;
 import java.util.function.LongSupplier;
 import org.apache.commons.io.FileUtils;
-import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter;
-import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriterV4;
+import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriter;
+import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4;
 import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
@@ -138,7 +138,7 @@ public class BenchmarkRawForwardIndexWriter {
   public void writeV4(BytesCounter counter)
       throws IOException {
     try (
-        VarByteChunkSVForwardIndexWriterV4 writer = new 
VarByteChunkSVForwardIndexWriterV4(_file, _chunkCompressionType,
+        VarByteChunkForwardIndexWriterV4 writer = new 
VarByteChunkForwardIndexWriterV4(_file, _chunkCompressionType,
             _maxChunkSize)) {
       for (int i = 0; i < _records; i++) {
         writer.putBytes(_bytes[i]);
@@ -151,7 +151,7 @@ public class BenchmarkRawForwardIndexWriter {
   @BenchmarkMode(Mode.SingleShotTime)
   public void writeV3(BytesCounter counter)
       throws IOException {
-    try (VarByteChunkSVForwardIndexWriter writer = new 
VarByteChunkSVForwardIndexWriter(_file, _chunkCompressionType,
+    try (VarByteChunkForwardIndexWriter writer = new 
VarByteChunkForwardIndexWriter(_file, _chunkCompressionType,
         _records, _maxChunkSize / _maxLength, _maxLength, 3)) {
       for (int i = 0; i < _records; i++) {
         writer.putBytes(_bytes[i]);
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/BaseChunkSVForwardIndexWriter.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/BaseChunkForwardIndexWriter.java
similarity index 76%
rename from 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/BaseChunkSVForwardIndexWriter.java
rename to 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/BaseChunkForwardIndexWriter.java
index d0c41b0763..70d8f38706 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/BaseChunkSVForwardIndexWriter.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/BaseChunkForwardIndexWriter.java
@@ -28,22 +28,38 @@ import java.nio.channels.FileChannel;
 import org.apache.pinot.segment.local.io.compression.ChunkCompressorFactory;
 import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
 import org.apache.pinot.segment.spi.compression.ChunkCompressor;
-import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 /**
- * Base implementation for chunk-based single-value raw 
(non-dictionary-encoded) forward index writer.
+ * Base implementation for chunk-based raw (non-dictionary-encoded) forward 
index writer where each chunk contains fixed
+ * number of docs.
+ *
+ * <p>The layout of the file is as follows:
+ * <ul>
+ *   <li>Header Section
+ *   <ul>
+ *     <li>File format version (int)</li>
+ *     <li>Total number of chunks (int)</li>
+ *     <li>Number of docs per chunk (int)</li>
+ *     <li>Size of entry in bytes (int)</li>
+ *     <li>Total number of docs (int)</li>
+ *     <li>Compression type enum value (int)</li>
+ *     <li>Start offset of data header (int)</li>
+ *     <li>Data header (start offsets for all chunks)
+ *     <ul>
+ *       <li>For version 2, offset is stored as int</li>
+ *       <li>For version 3 onwards, offset is stored as long</li>
+ *     </ul>
+ *     </li>
+ *   </ul>
+ *   </li>
+ *   <li>Individual Chunks</li>
+ * </ul>
  */
-public abstract class BaseChunkSVForwardIndexWriter implements Closeable {
-  // TODO: Remove this before release 0.5.0
-  public static final int DEFAULT_VERSION = 
ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION;
-  public static final int CURRENT_VERSION = 3;
-
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(BaseChunkSVForwardIndexWriter.class);
-  private static final int FILE_HEADER_ENTRY_CHUNK_OFFSET_SIZE_V1V2 = 
Integer.BYTES;
-  private static final int FILE_HEADER_ENTRY_CHUNK_OFFSET_SIZE_V3 = Long.BYTES;
+public abstract class BaseChunkForwardIndexWriter implements Closeable {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(BaseChunkForwardIndexWriter.class);
 
   protected final FileChannel _dataFile;
   protected ByteBuffer _header;
@@ -69,15 +85,15 @@ public abstract class BaseChunkSVForwardIndexWriter 
implements Closeable {
    * @param fixed if the data type is fixed width (required for version 
validation)
    * @throws IOException if the file isn't found or can't be mapped
    */
-  protected BaseChunkSVForwardIndexWriter(File file, ChunkCompressionType 
compressionType, int totalDocs,
+  protected BaseChunkForwardIndexWriter(File file, ChunkCompressionType 
compressionType, int totalDocs,
       int numDocsPerChunk, long chunkSize, int sizeOfEntry, int version, 
boolean fixed)
       throws IOException {
-    Preconditions.checkArgument(version == DEFAULT_VERSION || version == 
CURRENT_VERSION
-        || (fixed && version == 4));
-    Preconditions.checkArgument(chunkSize <= Integer.MAX_VALUE, "chunk size 
limited to 2GB");
+    Preconditions.checkArgument(version == 2 || version == 3 || (fixed && 
version == 4),
+        "Illegal version: %s for %s bytes values", version, fixed ? "fixed" : 
"variable");
+    Preconditions.checkArgument(chunkSize <= Integer.MAX_VALUE, "Chunk size 
limited to 2GB");
     _chunkSize = (int) chunkSize;
     _chunkCompressor = ChunkCompressorFactory.getCompressor(compressionType);
-    _headerEntryChunkOffsetSize = getHeaderEntryChunkOffsetSize(version);
+    _headerEntryChunkOffsetSize = version == 2 ? Integer.BYTES : Long.BYTES;
     _dataOffset = writeHeader(compressionType, totalDocs, numDocsPerChunk, 
sizeOfEntry, version);
     _chunkBuffer = ByteBuffer.allocateDirect(_chunkSize);
     int maxCompressedChunkSize = 
_chunkCompressor.maxCompressedSize(_chunkSize); // may exceed original chunk 
size
@@ -85,19 +101,6 @@ public abstract class BaseChunkSVForwardIndexWriter 
implements Closeable {
     _dataFile = new RandomAccessFile(file, "rw").getChannel();
   }
 
-  public static int getHeaderEntryChunkOffsetSize(int version) {
-    switch (version) {
-      case 1:
-      case 2:
-        return FILE_HEADER_ENTRY_CHUNK_OFFSET_SIZE_V1V2;
-      case 3:
-      case 4:
-        return FILE_HEADER_ENTRY_CHUNK_OFFSET_SIZE_V3;
-      default:
-        throw new IllegalStateException("Invalid version: " + version);
-    }
-  }
-
   @Override
   public void close()
       throws IOException {
@@ -143,19 +146,17 @@ public abstract class BaseChunkSVForwardIndexWriter 
implements Closeable {
     _header.putInt(sizeOfEntry);
     offset += Integer.BYTES;
 
-    if (version > 1) {
-      // Write total number of docs.
-      _header.putInt(totalDocs);
-      offset += Integer.BYTES;
+    // Write total number of docs.
+    _header.putInt(totalDocs);
+    offset += Integer.BYTES;
 
-      // Write the compressor type
-      _header.putInt(compressionType.getValue());
-      offset += Integer.BYTES;
+    // Write the compressor type
+    _header.putInt(compressionType.getValue());
+    offset += Integer.BYTES;
 
-      // Start of chunk offsets.
-      int dataHeaderStart = offset + Integer.BYTES;
-      _header.putInt(dataHeaderStart);
-    }
+    // Start of chunk offsets.
+    int dataHeaderStart = offset + Integer.BYTES;
+    _header.putInt(dataHeaderStart);
 
     return headerSize;
   }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedByteChunkSVForwardIndexWriter.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedByteChunkForwardIndexWriter.java
similarity index 70%
rename from 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedByteChunkSVForwardIndexWriter.java
rename to 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedByteChunkForwardIndexWriter.java
index 7b942b111b..11a250791b 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedByteChunkSVForwardIndexWriter.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedByteChunkForwardIndexWriter.java
@@ -26,34 +26,11 @@ import 
org.apache.pinot.segment.spi.compression.ChunkCompressionType;
 
 
 /**
- * Class to write out fixed length bytes into a single column.
- * Client responsible to ensure that they call the correct set method that
- * matches the sizeOfEntry. Avoiding checks here, as they can be expensive
- * when called for each row.
- *
- * The layout of the file is as follows:
- * <p> Header Section: </p>
- * <ul>
- *   <li> Integer: File format version. </li>
- *   <li> Integer: Total number of chunks. </li>
- *   <li> Integer: Number of docs per chunk. </li>
- *   <li> Integer: Length of entry (in bytes). </li>
- *   <li> Integer: Total number of docs (version 2 onwards). </li>
- *   <li> Integer: Compression type enum value (version 2 onwards). </li>
- *   <li> Integer: Start offset of data header (version 2 onwards). </li>
- *   <li> Integer array: Integer offsets for all chunks in the data (upto 
version 2),
- *   Long array: Long offsets for all chunks in the data (version 3 onwards) 
</li>
- * </ul>
- *
- * <p> Individual Chunks: </p>
- * <ul>
- *   <li> Data bytes. </li>
- * </ul>
- *
- * Only sequential writes are supported.
+ * Chunk-based raw (non-dictionary-encoded) forward index writer where each 
chunk contains fixed number of docs, and
+ * each entry has fixed number of bytes.
  */
 @NotThreadSafe
-public class FixedByteChunkSVForwardIndexWriter extends 
BaseChunkSVForwardIndexWriter {
+public class FixedByteChunkForwardIndexWriter extends 
BaseChunkForwardIndexWriter {
   private int _chunkDataOffset;
 
   /**
@@ -68,12 +45,11 @@ public class FixedByteChunkSVForwardIndexWriter extends 
BaseChunkSVForwardIndexW
    * @throws FileNotFoundException Throws {@link FileNotFoundException} if the 
specified file is not found.
    * @throws IOException Throws {@link IOException} if there are any errors 
mapping the underlying ByteBuffer.
    */
-  public FixedByteChunkSVForwardIndexWriter(File file, ChunkCompressionType 
compressionType, int totalDocs,
+  public FixedByteChunkForwardIndexWriter(File file, ChunkCompressionType 
compressionType, int totalDocs,
       int numDocsPerChunk, int sizeOfEntry, int writerVersion)
       throws IOException {
     super(file, compressionType, totalDocs, 
normalizeDocsPerChunk(writerVersion, numDocsPerChunk),
-        (sizeOfEntry * normalizeDocsPerChunk(writerVersion, numDocsPerChunk)), 
sizeOfEntry,
-        writerVersion, true);
+        (sizeOfEntry * normalizeDocsPerChunk(writerVersion, numDocsPerChunk)), 
sizeOfEntry, writerVersion, true);
     _chunkDataOffset = 0;
   }
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriter.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriter.java
similarity index 82%
rename from 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriter.java
rename to 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriter.java
index bc55c3482d..b6daaf7fe7 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriter.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriter.java
@@ -30,33 +30,20 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 
 
 /**
- * Class to write out variable length bytes into a single column.
+ * Chunk-based raw (non-dictionary-encoded) forward index writer where each 
chunk contains fixed number of docs, and
+ * the entries are variable length.
  *
- * The layout of the file is as follows:
- * <p> Header Section: </p>
+ * <p>The layout of each chunk is as follows:
  * <ul>
- *   <li> Integer: File format version. </li>
- *   <li> Integer: Total number of chunks. </li>
- *   <li> Integer: Number of docs per chunk. </li>
- *   <li> Integer: Length of longest entry (in bytes). </li>
- *   <li> Integer: Total number of docs (version 2 onwards). </li>
- *   <li> Integer: Compression type enum value (version 2 onwards). </li>
- *   <li> Integer: Start offset of data header (version 2 onwards). </li>
- *   <li> Integer array: Integer offsets for all chunks in the data (upto 
version 2),
- *   Long array: Long offsets for all chunks in the data (version 3 onwards) 
</li>
+ *   <li>
+ *     Header Section: start offsets (stored as int) of the entry within the 
data section. For partial chunks, offset
+ *     values are 0 for missing entries.
+ *   </li>
+ *   <li>Data Section</li>
  * </ul>
- *
- * <p> Individual Chunks: </p>
- * <ul>
- *   <li> Integer offsets to start position of rows: For partial chunks, 
offset values are 0 for missing rows. </li>
- *   <li> Data bytes. </li>
- * </ul>
- *
- * Only sequential writes are supported.
  */
 @NotThreadSafe
-public class VarByteChunkSVForwardIndexWriter extends 
BaseChunkSVForwardIndexWriter implements VarByteChunkWriter {
-
+public class VarByteChunkForwardIndexWriter extends 
BaseChunkForwardIndexWriter implements VarByteChunkWriter {
   public static final int CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE = Integer.BYTES;
 
   private final int _chunkHeaderSize;
@@ -75,8 +62,8 @@ public class VarByteChunkSVForwardIndexWriter extends 
BaseChunkSVForwardIndexWri
    * @throws FileNotFoundException Throws {@link FileNotFoundException} if the 
specified file is
    *     not found.
    */
-  public VarByteChunkSVForwardIndexWriter(File file, ChunkCompressionType 
compressionType,
-      int totalDocs, int numDocsPerChunk, int lengthOfLongestEntry, int 
writerVersion)
+  public VarByteChunkForwardIndexWriter(File file, ChunkCompressionType 
compressionType, int totalDocs,
+      int numDocsPerChunk, int lengthOfLongestEntry, int writerVersion)
       throws IOException {
     super(file, compressionType, totalDocs, numDocsPerChunk,
         numDocsPerChunk * (CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE + (long) 
lengthOfLongestEntry),
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriterV4.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV4.java
similarity index 87%
rename from 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriterV4.java
rename to 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV4.java
index 060266ef44..d70ed2dcbc 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriterV4.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV4.java
@@ -39,16 +39,43 @@ import org.slf4j.LoggerFactory;
 
 
 /**
- * Class to write out variable length bytes into a single column.
+ * Chunk-based raw (non-dictionary-encoded) forward index writer where each 
chunk contains variable number of docs, and
+ * the entries are variable length.
  *
- *
- * Only sequential writes are supported.
+ * <p>The layout of the file is as follows:
+ * <ul>
+ *   <li>Header Section
+ *   <ul>
+ *     <li>File format version (int)</li>
+ *     <li>Target decompressed chunk size (int)</li>
+ *     <li>Compression type enum value (int)</li>
+ *     <li>Start offset of chunk data (int)</li>
+ *     <li>Data header (for each chunk)
+ *     <ul>
+ *       <li>First docId in the chunk (int), where MSB is used to mark huge 
chunk</li>
+ *       <li>Start offset of the chunk (unsigned int)</li>
+ *     </ul>
+ *     </li>
+ *   </ul>
+ *   </li>
+ *   <li>Individual Chunks
+ *   <ul>
+ *     <li>Regular chunk
+ *     <ul>
+ *       <li>Header Section: start offsets (stored as int) of the entry within 
the data section</li>
+ *       <li>Data Section</li>
+ *     </ul>
+ *     </li>
+ *     <li>Huge chunk: contains one single value</li>
+ *   </ul>
+ *   </li>
+ * </ul>
  */
 @NotThreadSafe
-public class VarByteChunkSVForwardIndexWriterV4 implements VarByteChunkWriter {
-
+public class VarByteChunkForwardIndexWriterV4 implements VarByteChunkWriter {
   public static final int VERSION = 4;
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(VarByteChunkSVForwardIndexWriterV4.class);
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(VarByteChunkForwardIndexWriterV4.class);
   private static final String DATA_BUFFER_SUFFIX = ".buf";
 
   private final File _dataBuffer;
@@ -63,15 +90,15 @@ public class VarByteChunkSVForwardIndexWriterV4 implements 
VarByteChunkWriter {
   private int _metadataSize = 0;
   private long _chunkOffset = 0;
 
-  public VarByteChunkSVForwardIndexWriterV4(File file, ChunkCompressionType 
compressionType, int chunkSize)
+  public VarByteChunkForwardIndexWriterV4(File file, ChunkCompressionType 
compressionType, int chunkSize)
       throws IOException {
     _dataBuffer = new File(file.getName() + DATA_BUFFER_SUFFIX);
     _output = new RandomAccessFile(file, "rw");
     _dataChannel = new RandomAccessFile(_dataBuffer, "rw").getChannel();
     _chunkCompressor = ChunkCompressorFactory.getCompressor(compressionType, 
true);
     _chunkBuffer = 
ByteBuffer.allocateDirect(chunkSize).order(ByteOrder.LITTLE_ENDIAN);
-    _compressionBuffer = 
ByteBuffer.allocateDirect(_chunkCompressor.maxCompressedSize(chunkSize))
-        .order(ByteOrder.LITTLE_ENDIAN);
+    _compressionBuffer =
+        
ByteBuffer.allocateDirect(_chunkCompressor.maxCompressedSize(chunkSize)).order(ByteOrder.LITTLE_ENDIAN);
     // reserve space for numDocs
     _chunkBuffer.position(Integer.BYTES);
     writeHeader(_chunkCompressor.compressionType(), chunkSize);
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
index c275aec4a5..5ec40796d6 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
@@ -78,6 +78,7 @@ import org.slf4j.LoggerFactory;
 import static org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Column.*;
 import static org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Segment.*;
 
+
 /**
  * Segment creator which writes data in a columnar form.
  */
@@ -149,6 +150,7 @@ public class SegmentColumnarIndexCreator implements 
SegmentCreator {
       IndexType<ForwardIndexConfig, ?, ForwardIndexCreator> forwardIdx = 
StandardIndexes.forward();
       boolean forwardIndexDisabled = 
!originalConfig.getConfig(forwardIdx).isEnabled();
 
+      //@formatter:off
       IndexCreationContext.Common context = IndexCreationContext.builder()
           .withIndexDir(_indexDir)
           .withDictionary(dictEnabledColumn)
@@ -161,6 +163,7 @@ public class SegmentColumnarIndexCreator implements 
SegmentCreator {
           .withForwardIndexDisabled(forwardIndexDisabled)
           .withTextCommitOnClose(true)
           .build();
+      //@formatter:on
 
       FieldIndexConfigs config = adaptConfig(columnName, originalConfig, 
columnIndexCreationInfo, segmentCreationSpec);
 
@@ -175,8 +178,8 @@ public class SegmentColumnarIndexCreator implements 
SegmentCreator {
           LOGGER.info("Creating dictionary index in column {}.{} even when it 
is disabled in config",
               segmentCreationSpec.getTableName(), columnName);
         }
-        SegmentDictionaryCreator creator = new 
DictionaryIndexPlugin().getIndexType()
-            .createIndexCreator(context, dictConfig);
+        SegmentDictionaryCreator creator =
+            new 
DictionaryIndexPlugin().getIndexType().createIndexCreator(context, dictConfig);
 
         try {
           creator.build(context.getSortedUniqueElementsArray());
@@ -229,9 +232,9 @@ public class SegmentColumnarIndexCreator implements 
SegmentCreator {
     // Sorted columns treat the 'forwardIndexDisabled' flag as a no-op
     ForwardIndexConfig fwdConfig = config.getConfig(StandardIndexes.forward());
     if (!fwdConfig.isEnabled() && columnIndexCreationInfo.isSorted()) {
-      builder.add(StandardIndexes.forward(), new 
ForwardIndexConfig.Builder(fwdConfig)
-          .withLegacyProperties(segmentCreationSpec.getColumnProperties(), 
columnName)
-          .build());
+      builder.add(StandardIndexes.forward(),
+          new 
ForwardIndexConfig.Builder(fwdConfig).withLegacyProperties(segmentCreationSpec.getColumnProperties(),
+              columnName).build());
     }
     // Initialize inverted index creator; skip creating inverted index if 
sorted
     if (columnIndexCreationInfo.isSorted()) {
@@ -289,8 +292,8 @@ public class SegmentColumnarIndexCreator implements 
SegmentCreator {
 
     FieldIndexConfigs fieldIndexConfigs = 
config.getIndexConfigsByColName().get(column);
     if 
(DictionaryIndexType.ignoreDictionaryOverride(config.isOptimizeDictionary(),
-        config.isOptimizeDictionaryForMetrics(), 
config.getNoDictionarySizeRatioThreshold(), spec,
-        fieldIndexConfigs, info.getDistinctValueCount(), 
info.getTotalNumberOfEntries())) {
+        config.isOptimizeDictionaryForMetrics(), 
config.getNoDictionarySizeRatioThreshold(), spec, fieldIndexConfigs,
+        info.getDistinctValueCount(), info.getTotalNumberOfEntries())) {
       // Ignore overrides and pick from config
       createDictionary = info.isCreateDictionary();
     }
@@ -560,7 +563,7 @@ public class SegmentColumnarIndexCreator implements 
SegmentCreator {
       Object min = columnIndexCreationInfo.getMin();
       Object max = columnIndexCreationInfo.getMax();
       if (min != null && max != null) {
-        addColumnMinMaxValueInfo(properties, column, min.toString(), 
max.toString(), fieldSpec.getDataType());
+        addColumnMinMaxValueInfo(properties, column, min.toString(), 
max.toString(), dataType.getStoredType());
       }
     }
 
@@ -574,19 +577,18 @@ public class SegmentColumnarIndexCreator implements 
SegmentCreator {
   }
 
   public static void addColumnMinMaxValueInfo(PropertiesConfiguration 
properties, String column, String minValue,
-      String maxValue, DataType dataType) {
-    properties.setProperty(getKeyFor(column, MIN_VALUE), 
getValidPropertyValue(minValue, false, dataType));
-    properties.setProperty(getKeyFor(column, MAX_VALUE), 
getValidPropertyValue(maxValue, true, dataType));
+      String maxValue, DataType storedType) {
+    properties.setProperty(getKeyFor(column, MIN_VALUE), 
getValidPropertyValue(minValue, false, storedType));
+    properties.setProperty(getKeyFor(column, MAX_VALUE), 
getValidPropertyValue(maxValue, true, storedType));
   }
 
   /**
    * Helper method to get the valid value for setting min/max.
    */
-  private static String getValidPropertyValue(String value, boolean isMax, 
DataType dataType) {
-    String valueWithinLengthLimit = getValueWithinLengthLimit(value, isMax, 
dataType);
-    return dataType.getStoredType() == DataType.STRING
-        ? 
CommonsConfigurationUtils.replaceSpecialCharacterInPropertyValue(valueWithinLengthLimit)
-        : valueWithinLengthLimit;
+  private static String getValidPropertyValue(String value, boolean isMax, 
DataType storedType) {
+    String valueWithinLengthLimit = getValueWithinLengthLimit(value, isMax, 
storedType);
+    return storedType == DataType.STRING ? 
CommonsConfigurationUtils.replaceSpecialCharacterInPropertyValue(
+        valueWithinLengthLimit) : valueWithinLengthLimit;
   }
 
   /**
@@ -594,12 +596,12 @@ public class SegmentColumnarIndexCreator implements 
SegmentCreator {
    * returns a truncated version of the string with maintaining min or max 
value.
    */
   @VisibleForTesting
-  static String getValueWithinLengthLimit(String value, boolean isMax, 
DataType dataType) {
+  static String getValueWithinLengthLimit(String value, boolean isMax, 
DataType storedType) {
     int length = value.length();
     if (length <= METADATA_PROPERTY_LENGTH_LIMIT) {
       return value;
     }
-    switch (dataType.getStoredType()) {
+    switch (storedType) {
       case STRING:
         if (isMax) {
           int trimIndexValue = METADATA_PROPERTY_LENGTH_LIMIT - 1;
@@ -635,7 +637,7 @@ public class SegmentColumnarIndexCreator implements 
SegmentCreator {
           return 
BytesUtils.toHexString(Arrays.copyOf(BytesUtils.toBytes(value), 
(METADATA_PROPERTY_LENGTH_LIMIT / 2)));
         }
       default:
-        throw new IllegalStateException("Unsupported data type for property 
value length reduction: " + dataType);
+        throw new IllegalStateException("Unsupported stored type for property 
value length reduction: " + storedType);
     }
   }
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
index 82f5bff1ca..a1ba59f3b2 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
@@ -21,23 +21,21 @@ package 
org.apache.pinot.segment.local.segment.creator.impl.fwd;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import 
org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter;
-import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter;
-import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriterV4;
+import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriter;
+import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4;
 import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkWriter;
 import org.apache.pinot.segment.spi.V1Constants.Indexes;
 import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
 import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 
 
 /**
- * Forward index creator for raw (non-dictionary-encoded) single-value column 
of variable length
- * data type (STRING,
- * BYTES).
+ * Raw (non-dictionary-encoded) forward index creator for multi-value column 
of fixed length data type (INT, LONG,
+ * FLOAT, DOUBLE).
  */
 public class MultiValueFixedByteRawIndexCreator implements ForwardIndexCreator 
{
-
   private static final int DEFAULT_NUM_DOCS_PER_CHUNK = 1000;
   private static final int TARGET_MAX_CHUNK_SIZE = 1024 * 1024;
 
@@ -56,9 +54,8 @@ public class MultiValueFixedByteRawIndexCreator implements 
ForwardIndexCreator {
   public MultiValueFixedByteRawIndexCreator(File baseIndexDir, 
ChunkCompressionType compressionType, String column,
       int totalDocs, DataType valueType, int maxNumberOfMultiValueElements)
       throws IOException {
-    this(baseIndexDir, compressionType, column, totalDocs, valueType,
-        maxNumberOfMultiValueElements, false,
-        BaseChunkSVForwardIndexWriter.DEFAULT_VERSION);
+    this(baseIndexDir, compressionType, column, totalDocs, valueType, 
maxNumberOfMultiValueElements, false,
+        ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION);
   }
 
   /**
@@ -72,20 +69,19 @@ public class MultiValueFixedByteRawIndexCreator implements 
ForwardIndexCreator {
    * @param deriveNumDocsPerChunk true if writer should auto-derive the number 
of rows per chunk
    * @param writerVersion writer format version
    */
-  public MultiValueFixedByteRawIndexCreator(File baseIndexDir, 
ChunkCompressionType compressionType,
-      String column, int totalDocs, DataType valueType, int 
maxNumberOfMultiValueElements,
-      boolean deriveNumDocsPerChunk, int writerVersion)
+  public MultiValueFixedByteRawIndexCreator(File baseIndexDir, 
ChunkCompressionType compressionType, String column,
+      int totalDocs, DataType valueType, int maxNumberOfMultiValueElements, 
boolean deriveNumDocsPerChunk,
+      int writerVersion)
       throws IOException {
     File file = new File(baseIndexDir, column + 
Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION);
     // Store the length followed by the values
     int totalMaxLength = Integer.BYTES + (maxNumberOfMultiValueElements * 
valueType.getStoredType().size());
-    int numDocsPerChunk =
-        deriveNumDocsPerChunk ? Math.max(TARGET_MAX_CHUNK_SIZE / 
(totalMaxLength
-            + 
VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE), 1) : 
DEFAULT_NUM_DOCS_PER_CHUNK;
-    _indexWriter = writerVersion < VarByteChunkSVForwardIndexWriterV4.VERSION
-        ? new VarByteChunkSVForwardIndexWriter(file, compressionType, 
totalDocs, numDocsPerChunk, totalMaxLength,
-        writerVersion)
-        : new VarByteChunkSVForwardIndexWriterV4(file, compressionType, 
TARGET_MAX_CHUNK_SIZE);
+    int numDocsPerChunk = deriveNumDocsPerChunk ? Math.max(
+        TARGET_MAX_CHUNK_SIZE / (totalMaxLength + 
VarByteChunkForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE), 1)
+        : DEFAULT_NUM_DOCS_PER_CHUNK;
+    _indexWriter = writerVersion < VarByteChunkForwardIndexWriterV4.VERSION ? 
new VarByteChunkForwardIndexWriter(file,
+        compressionType, totalDocs, numDocsPerChunk, totalMaxLength, 
writerVersion)
+        : new VarByteChunkForwardIndexWriterV4(file, compressionType, 
TARGET_MAX_CHUNK_SIZE);
     _valueType = valueType;
   }
 
@@ -105,60 +101,52 @@ public class MultiValueFixedByteRawIndexCreator 
implements ForwardIndexCreator {
   }
 
   @Override
-  public void putIntMV(final int[] values) {
-
-    byte[] bytes = new byte[Integer.BYTES
-        + values.length * Integer.BYTES]; //numValues, bytes required to store 
the content
+  public void putIntMV(int[] values) {
+    byte[] bytes = new byte[Integer.BYTES + values.length * Integer.BYTES];
     ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
     //write the length
     byteBuffer.putInt(values.length);
     //write the content of each element
-    for (final int value : values) {
+    for (int value : values) {
       byteBuffer.putInt(value);
     }
     _indexWriter.putBytes(bytes);
   }
 
   @Override
-  public void putLongMV(final long[] values) {
-
-    byte[] bytes = new byte[Integer.BYTES
-        + values.length * Long.BYTES]; //numValues, bytes required to store 
the content
+  public void putLongMV(long[] values) {
+    byte[] bytes = new byte[Integer.BYTES + values.length * Long.BYTES];
     ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
     //write the length
     byteBuffer.putInt(values.length);
     //write the content of each element
-    for (final long value : values) {
+    for (long value : values) {
       byteBuffer.putLong(value);
     }
     _indexWriter.putBytes(bytes);
   }
 
   @Override
-  public void putFloatMV(final float[] values) {
-
-    byte[] bytes = new byte[Integer.BYTES
-        + values.length * Float.BYTES]; //numValues, bytes required to store 
the content
+  public void putFloatMV(float[] values) {
+    byte[] bytes = new byte[Integer.BYTES + values.length * Float.BYTES];
     ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
     //write the length
     byteBuffer.putInt(values.length);
     //write the content of each element
-    for (final float value : values) {
+    for (float value : values) {
       byteBuffer.putFloat(value);
     }
     _indexWriter.putBytes(bytes);
   }
 
   @Override
-  public void putDoubleMV(final double[] values) {
-
-    byte[] bytes = new byte[Integer.BYTES
-        + values.length * Double.BYTES]; //numValues, bytes required to store 
the content
+  public void putDoubleMV(double[] values) {
+    byte[] bytes = new byte[Integer.BYTES + values.length * Double.BYTES];
     ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
     //write the length
     byteBuffer.putInt(values.length);
     //write the content of each element
-    for (final double value : values) {
+    for (double value : values) {
       byteBuffer.putDouble(value);
     }
     _indexWriter.putBytes(bytes);
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java
index 84d471d50e..85dba85ab9 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java
@@ -21,24 +21,23 @@ package 
org.apache.pinot.segment.local.segment.creator.impl.fwd;
 import com.google.common.base.Preconditions;
 import java.io.File;
 import java.io.IOException;
-import 
org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter;
-import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter;
+import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriter;
+import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4;
 import org.apache.pinot.segment.spi.V1Constants.Indexes;
 import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
 import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 
 
 /**
- * Forward index creator for raw (non-dictionary-encoded) single-value column 
of variable length
- * data type (STRING,
+ * Raw (non-dictionary-encoded) forward index creator for multi-value column 
of variable length data type (STRING,
  * BYTES).
  */
 public class MultiValueVarByteRawIndexCreator implements ForwardIndexCreator {
-
   private static final int TARGET_MAX_CHUNK_SIZE = 1024 * 1024;
 
-  private final VarByteChunkSVForwardIndexWriter _indexWriter;
+  private final VarByteChunkForwardIndexWriter _indexWriter;
   private final DataType _valueType;
 
   /**
@@ -55,8 +54,8 @@ public class MultiValueVarByteRawIndexCreator implements 
ForwardIndexCreator {
   public MultiValueVarByteRawIndexCreator(File baseIndexDir, 
ChunkCompressionType compressionType, String column,
       int totalDocs, DataType valueType, int maxRowLengthInBytes, int 
maxNumberOfElements)
       throws IOException {
-    this(baseIndexDir, compressionType, column, totalDocs, valueType,
-        BaseChunkSVForwardIndexWriter.DEFAULT_VERSION, maxRowLengthInBytes, 
maxNumberOfElements);
+    this(baseIndexDir, compressionType, column, totalDocs, valueType, 
ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION,
+        maxRowLengthInBytes, maxNumberOfElements);
   }
 
   /**
@@ -77,13 +76,17 @@ public class MultiValueVarByteRawIndexCreator implements 
ForwardIndexCreator {
     //we will prepend the actual content with numElements and length array 
containing length of each element
     int totalMaxLength = getTotalRowStorageBytes(maxNumberOfElements, 
maxRowLengthInBytes);
 
-    File file = new File(baseIndexDir,
-        column + Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION);
+    File file = new File(baseIndexDir, column + 
Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION);
     int numDocsPerChunk = Math.max(
-        TARGET_MAX_CHUNK_SIZE / (totalMaxLength + 
VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE),
+        TARGET_MAX_CHUNK_SIZE / (totalMaxLength + 
VarByteChunkForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE),
         1);
-    _indexWriter = new VarByteChunkSVForwardIndexWriter(file, compressionType, 
totalDocs, numDocsPerChunk,
-        totalMaxLength, writerVersion);
+    // TODO: Support V4 MV reader
+    // Currently fall back to V2 for backward compatible
+    if (writerVersion == VarByteChunkForwardIndexWriterV4.VERSION) {
+      writerVersion = 2;
+    }
+    _indexWriter = new VarByteChunkForwardIndexWriter(file, compressionType, 
totalDocs, numDocsPerChunk, totalMaxLength,
+        writerVersion);
     _valueType = valueType;
   }
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/SingleValueFixedByteRawIndexCreator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/SingleValueFixedByteRawIndexCreator.java
index 81391d274c..c16aa57a5c 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/SingleValueFixedByteRawIndexCreator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/SingleValueFixedByteRawIndexCreator.java
@@ -20,22 +20,22 @@ package 
org.apache.pinot.segment.local.segment.creator.impl.fwd;
 
 import java.io.File;
 import java.io.IOException;
-import 
org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter;
-import 
org.apache.pinot.segment.local.io.writer.impl.FixedByteChunkSVForwardIndexWriter;
+import 
org.apache.pinot.segment.local.io.writer.impl.FixedByteChunkForwardIndexWriter;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
 import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 
 
 /**
- * Forward index creator for raw (non-dictionary-encoded) single-value column 
of fixed length data type (INT, LONG,
+ * Raw (non-dictionary-encoded) forward index creator for single-value column 
of fixed length data type (INT, LONG,
  * FLOAT, DOUBLE).
  */
 public class SingleValueFixedByteRawIndexCreator implements 
ForwardIndexCreator {
   private static final int NUM_DOCS_PER_CHUNK = 1000; // TODO: Auto-derive 
this based on metadata.
 
-  private final FixedByteChunkSVForwardIndexWriter _indexWriter;
+  private final FixedByteChunkForwardIndexWriter _indexWriter;
   private final DataType _valueType;
 
   /**
@@ -51,7 +51,7 @@ public class SingleValueFixedByteRawIndexCreator implements 
ForwardIndexCreator
   public SingleValueFixedByteRawIndexCreator(File baseIndexDir, 
ChunkCompressionType compressionType, String column,
       int totalDocs, DataType valueType)
       throws IOException {
-    this(baseIndexDir, compressionType, column, totalDocs, valueType, 
BaseChunkSVForwardIndexWriter.DEFAULT_VERSION);
+    this(baseIndexDir, compressionType, column, totalDocs, valueType, 
ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION);
   }
 
   /**
@@ -70,7 +70,7 @@ public class SingleValueFixedByteRawIndexCreator implements 
ForwardIndexCreator
       throws IOException {
     File file = new File(baseIndexDir, column + 
V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
     _indexWriter =
-        new FixedByteChunkSVForwardIndexWriter(file, compressionType, 
totalDocs, NUM_DOCS_PER_CHUNK, valueType.size(),
+        new FixedByteChunkForwardIndexWriter(file, compressionType, totalDocs, 
NUM_DOCS_PER_CHUNK, valueType.size(),
             writerVersion);
     _valueType = valueType;
   }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/SingleValueVarByteRawIndexCreator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/SingleValueVarByteRawIndexCreator.java
index f01550ac40..9e3658802f 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/SingleValueVarByteRawIndexCreator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/SingleValueVarByteRawIndexCreator.java
@@ -22,18 +22,18 @@ import com.google.common.annotations.VisibleForTesting;
 import java.io.File;
 import java.io.IOException;
 import java.math.BigDecimal;
-import 
org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter;
-import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter;
-import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriterV4;
+import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriter;
+import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4;
 import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkWriter;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
 import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 
 
 /**
- * Forward index creator for raw (non-dictionary-encoded) single-value column 
of variable length data type (BIG_DECIMAL,
+ * Raw (non-dictionary-encoded) forward index creator for single-value column 
of variable length data type (BIG_DECIMAL,
  * STRING, BYTES).
  */
 public class SingleValueVarByteRawIndexCreator implements ForwardIndexCreator {
@@ -57,7 +57,7 @@ public class SingleValueVarByteRawIndexCreator implements 
ForwardIndexCreator {
       int totalDocs, DataType valueType, int maxLength)
       throws IOException {
     this(baseIndexDir, compressionType, column, totalDocs, valueType, 
maxLength, false,
-        BaseChunkSVForwardIndexWriter.DEFAULT_VERSION);
+        ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION);
   }
 
   /**
@@ -77,16 +77,15 @@ public class SingleValueVarByteRawIndexCreator implements 
ForwardIndexCreator {
       throws IOException {
     File file = new File(baseIndexDir, column + 
V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
     int numDocsPerChunk = deriveNumDocsPerChunk ? 
getNumDocsPerChunk(maxLength) : DEFAULT_NUM_DOCS_PER_CHUNK;
-    _indexWriter = writerVersion < VarByteChunkSVForwardIndexWriterV4.VERSION
-        ? new VarByteChunkSVForwardIndexWriter(file, compressionType, 
totalDocs, numDocsPerChunk, maxLength,
-        writerVersion)
-        : new VarByteChunkSVForwardIndexWriterV4(file, compressionType, 
TARGET_MAX_CHUNK_SIZE);
+    _indexWriter = writerVersion < VarByteChunkForwardIndexWriterV4.VERSION ? 
new VarByteChunkForwardIndexWriter(file,
+        compressionType, totalDocs, numDocsPerChunk, maxLength, writerVersion)
+        : new VarByteChunkForwardIndexWriterV4(file, compressionType, 
TARGET_MAX_CHUNK_SIZE);
     _valueType = valueType;
   }
 
   @VisibleForTesting
   public static int getNumDocsPerChunk(int lengthOfLongestEntry) {
-    int overheadPerEntry = lengthOfLongestEntry + 
VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
+    int overheadPerEntry = lengthOfLongestEntry + 
VarByteChunkForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
     return Math.max(TARGET_MAX_CHUNK_SIZE / overheadPerEntry, 1);
   }
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java
index 9cb35aa36f..a57158b44f 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java
@@ -33,6 +33,7 @@ import 
org.apache.pinot.segment.spi.creator.IndexCreationContext;
 import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
 import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
 import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
 
 
 public class ForwardIndexCreatorFactory {
@@ -41,107 +42,90 @@ public class ForwardIndexCreatorFactory {
 
   public static ForwardIndexCreator createIndexCreator(IndexCreationContext 
context, ForwardIndexConfig indexConfig)
       throws Exception {
-    String colName = context.getFieldSpec().getName();
+    File indexDir = context.getIndexDir();
+    FieldSpec fieldSpec = context.getFieldSpec();
+    String columnName = fieldSpec.getName();
+    int numTotalDocs = context.getTotalDocs();
 
-    if (!context.hasDictionary()) {
+    if (context.hasDictionary()) {
+      // Dictionary enabled columns
+      int cardinality = context.getCardinality();
+      if (fieldSpec.isSingleValueField()) {
+        if (context.isSorted()) {
+          return new SingleValueSortedForwardIndexCreator(indexDir, 
columnName, cardinality);
+        } else {
+          return new SingleValueUnsortedForwardIndexCreator(indexDir, 
columnName, cardinality, numTotalDocs);
+        }
+      } else {
+        return new MultiValueUnsortedForwardIndexCreator(indexDir, columnName, 
cardinality, numTotalDocs,
+            context.getTotalNumberOfEntries());
+      }
+    } else {
+      // Dictionary disabled columns
+      DataType storedType = fieldSpec.getDataType().getStoredType();
       ChunkCompressionType chunkCompressionType = 
indexConfig.getChunkCompressionType();
       if (chunkCompressionType == null) {
-        chunkCompressionType = 
ForwardIndexType.getDefaultCompressionType(context.getFieldSpec().getFieldType());
+        chunkCompressionType = 
ForwardIndexType.getDefaultCompressionType(fieldSpec.getFieldType());
       }
-
-      // Dictionary disabled columns
       boolean deriveNumDocsPerChunk = indexConfig.isDeriveNumDocsPerChunk();
       int writerVersion = indexConfig.getRawIndexWriterVersion();
-      if (context.getFieldSpec().isSingleValueField()) {
-        return getRawIndexCreatorForSVColumn(context.getIndexDir(), 
chunkCompressionType, colName,
-            context.getFieldSpec().getDataType().getStoredType(),
-            context.getTotalDocs(), context.getLengthOfLongestEntry(), 
deriveNumDocsPerChunk, writerVersion);
+      if (fieldSpec.isSingleValueField()) {
+        return getRawIndexCreatorForSVColumn(indexDir, chunkCompressionType, 
columnName, storedType, numTotalDocs,
+            context.getLengthOfLongestEntry(), deriveNumDocsPerChunk, 
writerVersion);
       } else {
-        return getRawIndexCreatorForMVColumn(context.getIndexDir(), 
chunkCompressionType, colName,
-            context.getFieldSpec().getDataType().getStoredType(),
-            context.getTotalDocs(), 
context.getMaxNumberOfMultiValueElements(), deriveNumDocsPerChunk, 
writerVersion,
+        return getRawIndexCreatorForMVColumn(indexDir, chunkCompressionType, 
columnName, storedType, numTotalDocs,
+            context.getMaxNumberOfMultiValueElements(), deriveNumDocsPerChunk, 
writerVersion,
             context.getMaxRowLengthInBytes());
       }
-    } else {
-      // Dictionary enabled columns
-      if (context.getFieldSpec().isSingleValueField()) {
-        if (context.isSorted()) {
-          return new 
SingleValueSortedForwardIndexCreator(context.getIndexDir(), colName,
-              context.getCardinality());
-        } else {
-          return new 
SingleValueUnsortedForwardIndexCreator(context.getIndexDir(), colName,
-              context.getCardinality(), context.getTotalDocs());
-        }
-      } else {
-        return new 
MultiValueUnsortedForwardIndexCreator(context.getIndexDir(), colName,
-            context.getCardinality(), context.getTotalDocs(), 
context.getTotalNumberOfEntries());
-      }
     }
   }
 
   /**
    * Helper method to build the raw index creator for the column.
    * Assumes that column to be indexed is single valued.
-   *
-   * @param file Output index file
-   * @param column Column name
-   * @param totalDocs Total number of documents to index
-   * @param lengthOfLongestEntry Length of longest entry
-   * @param deriveNumDocsPerChunk true if varbyte writer should auto-derive 
the number of rows per chunk
-   * @param writerVersion version to use for the raw index writer
-   * @return raw index creator
    */
-  public static ForwardIndexCreator getRawIndexCreatorForSVColumn(File file, 
ChunkCompressionType compressionType,
-      String column, FieldSpec.DataType dataType, int totalDocs, int 
lengthOfLongestEntry,
-      boolean deriveNumDocsPerChunk, int writerVersion)
+  public static ForwardIndexCreator getRawIndexCreatorForSVColumn(File 
indexDir, ChunkCompressionType compressionType,
+      String column, DataType storedType, int numTotalDocs, int 
lengthOfLongestEntry, boolean deriveNumDocsPerChunk,
+      int writerVersion)
       throws IOException {
-    switch (dataType.getStoredType()) {
+    switch (storedType) {
       case INT:
       case LONG:
       case FLOAT:
       case DOUBLE:
-        return new SingleValueFixedByteRawIndexCreator(file, compressionType, 
column, totalDocs, dataType,
+        return new SingleValueFixedByteRawIndexCreator(indexDir, 
compressionType, column, numTotalDocs, storedType,
             writerVersion);
       case BIG_DECIMAL:
       case STRING:
       case BYTES:
-        return new SingleValueVarByteRawIndexCreator(file, compressionType, 
column, totalDocs, dataType,
+        return new SingleValueVarByteRawIndexCreator(indexDir, 
compressionType, column, numTotalDocs, storedType,
             lengthOfLongestEntry, deriveNumDocsPerChunk, writerVersion);
       default:
-        throw new UnsupportedOperationException("Data type not supported for 
raw indexing: " + dataType);
+        throw new IllegalStateException("Unsupported stored type: " + 
storedType);
     }
   }
 
   /**
    * Helper method to build the raw index creator for the column.
    * Assumes that column to be indexed is multi-valued.
-   *
-   * @param file Output index file
-   * @param column Column name
-   * @param totalDocs Total number of documents to index
-   * @param deriveNumDocsPerChunk true if varbyte writer should auto-derive 
the number of rows
-   *     per chunk
-   * @param writerVersion version to use for the raw index writer
-   * @param maxRowLengthInBytes the length of the longest row in bytes
-   * @return raw index creator
    */
-  public static ForwardIndexCreator getRawIndexCreatorForMVColumn(File file, 
ChunkCompressionType compressionType,
-      String column, FieldSpec.DataType dataType, final int totalDocs, int 
maxNumberOfMultiValueElements,
+  public static ForwardIndexCreator getRawIndexCreatorForMVColumn(File 
indexDir, ChunkCompressionType compressionType,
+      String column, DataType storedType, int numTotalDocs, int 
maxNumberOfMultiValueElements,
       boolean deriveNumDocsPerChunk, int writerVersion, int 
maxRowLengthInBytes)
       throws IOException {
-    switch (dataType.getStoredType()) {
+    switch (storedType) {
       case INT:
       case LONG:
       case FLOAT:
       case DOUBLE:
-        return new MultiValueFixedByteRawIndexCreator(file, compressionType, 
column, totalDocs, dataType,
+        return new MultiValueFixedByteRawIndexCreator(indexDir, 
compressionType, column, numTotalDocs, storedType,
             maxNumberOfMultiValueElements, deriveNumDocsPerChunk, 
writerVersion);
       case STRING:
       case BYTES:
-        return new MultiValueVarByteRawIndexCreator(file, compressionType, 
column, totalDocs, dataType, writerVersion,
-            maxRowLengthInBytes, maxNumberOfMultiValueElements);
+        return new MultiValueVarByteRawIndexCreator(indexDir, compressionType, 
column, numTotalDocs, storedType,
+            writerVersion, maxRowLengthInBytes, maxNumberOfMultiValueElements);
       default:
-        throw new UnsupportedOperationException("Data type not supported for 
raw indexing: " + dataType);
+        throw new IllegalStateException("Unsupported stored type: " + 
storedType);
     }
   }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java
index 283f569f40..ecaaf875a5 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java
@@ -19,7 +19,7 @@
 
 package org.apache.pinot.segment.local.segment.index.forward;
 
-import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriterV4;
+import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4;
 import 
org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitMVForwardIndexReader;
 import 
org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitSVForwardIndexReaderV2;
 import 
org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkMVForwardIndexReader;
@@ -37,11 +37,10 @@ import org.apache.pinot.segment.spi.index.IndexType;
 import org.apache.pinot.segment.spi.index.StandardIndexes;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
 
 
-class ForwardIndexReaderFactory extends 
IndexReaderFactory.Default<ForwardIndexConfig, ForwardIndexReader> {
-
+public class ForwardIndexReaderFactory extends 
IndexReaderFactory.Default<ForwardIndexConfig, ForwardIndexReader> {
   public static final ForwardIndexReaderFactory INSTANCE = new 
ForwardIndexReaderFactory();
 
   @Override
@@ -69,22 +68,26 @@ class ForwardIndexReaderFactory extends 
IndexReaderFactory.Default<ForwardIndexC
             metadata.getBitsPerElement());
       }
     } else {
-      FieldSpec.DataType storedType = metadata.getDataType().getStoredType();
-      if (metadata.isSingleValue()) {
-        int version = dataBuffer.getInt(0);
-        if (storedType.isFixedWidth()) {
-          return version >= FixedBytePower2ChunkSVForwardIndexReader.VERSION
-              ? new FixedBytePower2ChunkSVForwardIndexReader(dataBuffer, 
storedType)
-              : new FixedByteChunkSVForwardIndexReader(dataBuffer, storedType);
-        }
-        if (version >= VarByteChunkSVForwardIndexWriterV4.VERSION) {
-          return new VarByteChunkSVForwardIndexReaderV4(dataBuffer, 
storedType);
-        }
-        return new VarByteChunkSVForwardIndexReader(dataBuffer, storedType);
+      return createRawIndexReader(dataBuffer, 
metadata.getDataType().getStoredType(), metadata.isSingleValue());
+    }
+  }
+
+  public static ForwardIndexReader createRawIndexReader(PinotDataBuffer 
dataBuffer, DataType storedType,
+      boolean isSingleValue) {
+    int version = dataBuffer.getInt(0);
+    if (isSingleValue) {
+      if (storedType.isFixedWidth()) {
+        return version == FixedBytePower2ChunkSVForwardIndexReader.VERSION
+            ? new FixedBytePower2ChunkSVForwardIndexReader(dataBuffer, 
storedType)
+            : new FixedByteChunkSVForwardIndexReader(dataBuffer, storedType);
       } else {
-        return storedType.isFixedWidth() ? new 
FixedByteChunkMVForwardIndexReader(dataBuffer, storedType)
-            : new VarByteChunkMVForwardIndexReader(dataBuffer, storedType);
+        return version == VarByteChunkForwardIndexWriterV4.VERSION ? new 
VarByteChunkSVForwardIndexReaderV4(dataBuffer,
+            storedType) : new VarByteChunkSVForwardIndexReader(dataBuffer, 
storedType);
       }
+    } else {
+      // TODO: Support V4 MV reader
+      return storedType.isFixedWidth() ? new 
FixedByteChunkMVForwardIndexReader(dataBuffer, storedType)
+          : new VarByteChunkMVForwardIndexReader(dataBuffer, storedType);
     }
   }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
index 91b0d46b0a..5b5ad75de6 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
@@ -331,6 +331,7 @@ public class ForwardIndexHandler extends BaseIndexHandler {
         }
       } else if (existingNoDictColumns.contains(column) && !newIsDict) {
         // Both existing and new column is RAW forward index encoded. Check if 
compression needs to be changed.
+        // TODO: Also check if raw index version needs to be changed
         if (shouldChangeCompressionType(column, segmentReader)) {
           columnOperationsMap.put(column, 
Collections.singletonList(Operation.CHANGE_RAW_INDEX_COMPRESSION_TYPE));
         }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/columnminmaxvalue/ColumnMinMaxValueGenerator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/columnminmaxvalue/ColumnMinMaxValueGenerator.java
index 406fc5c42b..6d3cc83c10 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/columnminmaxvalue/ColumnMinMaxValueGenerator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/columnminmaxvalue/ColumnMinMaxValueGenerator.java
@@ -24,20 +24,18 @@ import java.util.List;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.commons.lang3.StringUtils;
 import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentColumnarIndexCreator;
+import 
org.apache.pinot.segment.local.segment.index.forward.ForwardIndexReaderFactory;
 import org.apache.pinot.segment.local.segment.index.readers.BytesDictionary;
 import org.apache.pinot.segment.local.segment.index.readers.DoubleDictionary;
 import org.apache.pinot.segment.local.segment.index.readers.FloatDictionary;
 import org.apache.pinot.segment.local.segment.index.readers.IntDictionary;
 import org.apache.pinot.segment.local.segment.index.readers.LongDictionary;
 import org.apache.pinot.segment.local.segment.index.readers.StringDictionary;
-import 
org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext;
-import 
org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkMVForwardIndexReader;
-import 
org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader;
-import 
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader;
-import 
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.segment.spi.index.StandardIndexes;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
 import org.apache.pinot.segment.spi.utils.SegmentMetadataUtils;
@@ -139,47 +137,48 @@ public class ColumnMinMaxValueGenerator {
       return;
     }
 
-    DataType dataType = columnMetadata.getDataType().getStoredType();
+    DataType dataType = columnMetadata.getDataType();
+    DataType storedType = dataType.getStoredType();
     if (columnMetadata.hasDictionary()) {
       PinotDataBuffer dictionaryBuffer = 
_segmentWriter.getIndexFor(columnName, StandardIndexes.dictionary());
       int length = columnMetadata.getCardinality();
-      switch (dataType) {
+      switch (storedType) {
         case INT:
           try (IntDictionary intDictionary = new 
IntDictionary(dictionaryBuffer, length)) {
             
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, 
columnName,
-                intDictionary.getStringValue(0), 
intDictionary.getStringValue(length - 1), dataType);
+                intDictionary.getStringValue(0), 
intDictionary.getStringValue(length - 1), storedType);
           }
           break;
         case LONG:
           try (LongDictionary longDictionary = new 
LongDictionary(dictionaryBuffer, length)) {
             
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, 
columnName,
-                longDictionary.getStringValue(0), 
longDictionary.getStringValue(length - 1), dataType);
+                longDictionary.getStringValue(0), 
longDictionary.getStringValue(length - 1), storedType);
           }
           break;
         case FLOAT:
           try (FloatDictionary floatDictionary = new 
FloatDictionary(dictionaryBuffer, length)) {
             
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, 
columnName,
-                floatDictionary.getStringValue(0), 
floatDictionary.getStringValue(length - 1), dataType);
+                floatDictionary.getStringValue(0), 
floatDictionary.getStringValue(length - 1), storedType);
           }
           break;
         case DOUBLE:
           try (DoubleDictionary doubleDictionary = new 
DoubleDictionary(dictionaryBuffer, length)) {
             
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, 
columnName,
-                doubleDictionary.getStringValue(0), 
doubleDictionary.getStringValue(length - 1), dataType);
+                doubleDictionary.getStringValue(0), 
doubleDictionary.getStringValue(length - 1), storedType);
           }
           break;
         case STRING:
           try (StringDictionary stringDictionary = new 
StringDictionary(dictionaryBuffer, length,
               columnMetadata.getColumnMaxLength())) {
             
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, 
columnName,
-                stringDictionary.getStringValue(0), 
stringDictionary.getStringValue(length - 1), dataType);
+                stringDictionary.getStringValue(0), 
stringDictionary.getStringValue(length - 1), storedType);
           }
           break;
         case BYTES:
           try (BytesDictionary bytesDictionary = new 
BytesDictionary(dictionaryBuffer, length,
               columnMetadata.getColumnMaxLength())) {
             
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, 
columnName,
-                bytesDictionary.getStringValue(0), 
bytesDictionary.getStringValue(length - 1), dataType);
+                bytesDictionary.getStringValue(0), 
bytesDictionary.getStringValue(length - 1), storedType);
           }
           break;
         default:
@@ -188,129 +187,117 @@ public class ColumnMinMaxValueGenerator {
     } else {
       // setting min/max for non-dictionary columns.
       int numDocs = columnMetadata.getTotalDocs();
-      boolean isSingleValueField = 
_segmentMetadata.getSchema().getFieldSpecFor(columnName).isSingleValueField();
-      PinotDataBuffer forwardBuffer = _segmentWriter.getIndexFor(columnName, 
StandardIndexes.forward());
-      switch (dataType) {
-        case INT: {
-          int min = Integer.MAX_VALUE;
-          int max = Integer.MIN_VALUE;
-          if (isSingleValueField) {
-            try (FixedByteChunkSVForwardIndexReader rawIndexReader = new 
FixedByteChunkSVForwardIndexReader(
-                forwardBuffer, DataType.INT); ChunkReaderContext readerContext 
= rawIndexReader.createContext()) {
-                for (int docId = 0; docId < numDocs; docId++) {
-                  int value = rawIndexReader.getInt(docId, readerContext);
+      PinotDataBuffer rawIndexBuffer = _segmentWriter.getIndexFor(columnName, 
StandardIndexes.forward());
+      boolean isSingleValue = 
_segmentMetadata.getSchema().getFieldSpecFor(columnName).isSingleValueField();
+      try (
+          ForwardIndexReader rawIndexReader = 
ForwardIndexReaderFactory.createRawIndexReader(rawIndexBuffer, storedType,
+              isSingleValue); ForwardIndexReaderContext readerContext = 
rawIndexReader.createContext()) {
+        switch (storedType) {
+          case INT: {
+            int min = Integer.MAX_VALUE;
+            int max = Integer.MIN_VALUE;
+            if (isSingleValue) {
+              for (int docId = 0; docId < numDocs; docId++) {
+                int value = rawIndexReader.getInt(docId, readerContext);
+                min = Math.min(min, value);
+                max = Math.max(max, value);
+              }
+            } else {
+              for (int docId = 0; docId < numDocs; docId++) {
+                int[] values = rawIndexReader.getIntMV(docId, readerContext);
+                for (int value : values) {
                   min = Math.min(min, value);
                   max = Math.max(max, value);
                 }
+              }
             }
-          } else {
-            try (FixedByteChunkMVForwardIndexReader rawIndexReader = new 
FixedByteChunkMVForwardIndexReader(
-                forwardBuffer, DataType.INT); ChunkReaderContext readerContext 
= rawIndexReader.createContext()) {
-                for (int docId = 0; docId < numDocs; docId++) {
-                  int[] value = rawIndexReader.getIntMV(docId, readerContext);
-                  for (int i = 0; i < value.length; i++) {
-                    min = Math.min(min, value[i]);
-                    max = Math.max(max, value[i]);
-                  }
-                }
-            }
+            
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, 
columnName, String.valueOf(min),
+                String.valueOf(max), storedType);
+            break;
           }
-          
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, 
columnName,
-              String.valueOf(min), String.valueOf(max), dataType);
-         }
-         break;
-        case LONG: {
-          long min = Long.MAX_VALUE;
-          long max = Long.MIN_VALUE;
-          if (isSingleValueField) {
-            try (FixedByteChunkSVForwardIndexReader rawIndexReader = new 
FixedByteChunkSVForwardIndexReader(
-                forwardBuffer, DataType.LONG); ChunkReaderContext 
readerContext = rawIndexReader.createContext()) {
-                for (int docId = 0; docId < numDocs; docId++) {
-                  long value = rawIndexReader.getLong(docId, readerContext);
+          case LONG: {
+            long min = Long.MAX_VALUE;
+            long max = Long.MIN_VALUE;
+            if (isSingleValue) {
+              for (int docId = 0; docId < numDocs; docId++) {
+                long value = rawIndexReader.getLong(docId, readerContext);
+                min = Math.min(min, value);
+                max = Math.max(max, value);
+              }
+            } else {
+              for (int docId = 0; docId < numDocs; docId++) {
+                long[] values = rawIndexReader.getLongMV(docId, readerContext);
+                for (long value : values) {
                   min = Math.min(min, value);
                   max = Math.max(max, value);
                 }
+              }
             }
-          } else {
-            try (FixedByteChunkMVForwardIndexReader rawIndexReader = new 
FixedByteChunkMVForwardIndexReader(
-                forwardBuffer, DataType.LONG); ChunkReaderContext 
readerContext = rawIndexReader.createContext()) {
-                for (int docId = 0; docId < numDocs; docId++) {
-                  long[] value = rawIndexReader.getLongMV(docId, 
readerContext);
-                  for (int i = 0; i < value.length; i++) {
-                    min = Math.min(min, value[i]);
-                    max = Math.max(max, value[i]);
-                  }
-                }
-            }
+            
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, 
columnName, String.valueOf(min),
+                String.valueOf(max), storedType);
+            break;
           }
-          
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, 
columnName,
-                String.valueOf(min), String.valueOf(max), dataType);
-         }
-         break;
-        case FLOAT: {
-          float min = Float.MAX_VALUE;
-          float max = Float.MIN_VALUE;
-          if (isSingleValueField) {
-            try (FixedByteChunkSVForwardIndexReader rawIndexReader = new 
FixedByteChunkSVForwardIndexReader(
-                forwardBuffer, DataType.FLOAT); ChunkReaderContext 
readerContext = rawIndexReader.createContext()) {
-                for (int docId = 0; docId < numDocs; docId++) {
-                  float value = rawIndexReader.getFloat(docId, readerContext);
+          case FLOAT: {
+            float min = Float.POSITIVE_INFINITY;
+            float max = Float.NEGATIVE_INFINITY;
+            if (isSingleValue) {
+              for (int docId = 0; docId < numDocs; docId++) {
+                float value = rawIndexReader.getFloat(docId, readerContext);
+                min = Math.min(min, value);
+                max = Math.max(max, value);
+              }
+            } else {
+              for (int docId = 0; docId < numDocs; docId++) {
+                float[] values = rawIndexReader.getFloatMV(docId, 
readerContext);
+                for (float value : values) {
                   min = Math.min(min, value);
                   max = Math.max(max, value);
                 }
+              }
             }
-          } else {
-            try (FixedByteChunkMVForwardIndexReader rawIndexReader = new 
FixedByteChunkMVForwardIndexReader(
-                forwardBuffer, DataType.FLOAT); ChunkReaderContext 
readerContext = rawIndexReader.createContext()) {
-                for (int docId = 0; docId < numDocs; docId++) {
-                  float[] value = rawIndexReader.getFloatMV(docId, 
readerContext);
-                  for (int i = 0; i < value.length; i++) {
-                    min = Math.min(min, value[i]);
-                    max = Math.max(max, value[i]);
-                  }
-                }
-            }
+            
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, 
columnName, String.valueOf(min),
+                String.valueOf(max), storedType);
+            break;
           }
-          
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, 
columnName,
-                String.valueOf(min), String.valueOf(max), dataType);
-         }
-         break;
-        case DOUBLE: {
-          double min = Double.MAX_VALUE;
-          double max = Double.MIN_VALUE;
-          if (isSingleValueField) {
-            try (FixedByteChunkSVForwardIndexReader rawIndexReader = new 
FixedByteChunkSVForwardIndexReader(
-                forwardBuffer, DataType.DOUBLE); ChunkReaderContext 
readerContext = rawIndexReader.createContext()) {
-                for (int docId = 0; docId < numDocs; docId++) {
-                  double value = rawIndexReader.getDouble(docId, 
readerContext);
+          case DOUBLE: {
+            double min = Double.POSITIVE_INFINITY;
+            double max = Double.NEGATIVE_INFINITY;
+            if (isSingleValue) {
+              for (int docId = 0; docId < numDocs; docId++) {
+                double value = rawIndexReader.getDouble(docId, readerContext);
+                min = Math.min(min, value);
+                max = Math.max(max, value);
+              }
+            } else {
+              for (int docId = 0; docId < numDocs; docId++) {
+                double[] values = rawIndexReader.getDoubleMV(docId, 
readerContext);
+                for (double value : values) {
                   min = Math.min(min, value);
                   max = Math.max(max, value);
                 }
+              }
             }
-          } else {
-            try (FixedByteChunkMVForwardIndexReader rawIndexReader = new 
FixedByteChunkMVForwardIndexReader(
-                forwardBuffer, DataType.DOUBLE); ChunkReaderContext 
readerContext = rawIndexReader.createContext()) {
-                for (int docId = 0; docId < numDocs; docId++) {
-                  double[] value = rawIndexReader.getDoubleMV(docId, 
readerContext);
-                  for (int i = 0; i < value.length; i++) {
-                    min = Math.min(min, value[i]);
-                    max = Math.max(max, value[i]);
-                  }
-                }
-            }
+            
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, 
columnName, String.valueOf(min),
+                String.valueOf(max), storedType);
+            break;
           }
-          
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, 
columnName,
-                String.valueOf(min), String.valueOf(max), dataType);
-          }
-          break;
-        case STRING: {
-          String min = null;
-          String max = null;
-          if (isSingleValueField) {
-            try (VarByteChunkSVForwardIndexReader rawIndexReader = new 
VarByteChunkSVForwardIndexReader(forwardBuffer,
-                DataType.STRING); ChunkReaderContext readerContext = 
rawIndexReader.createContext()) {
-                for (int docId = 0; docId < numDocs; docId++) {
-                  String value = rawIndexReader.getString(docId, 
readerContext);
+          case STRING: {
+            String min = null;
+            String max = null;
+            if (isSingleValue) {
+              for (int docId = 0; docId < numDocs; docId++) {
+                String value = rawIndexReader.getString(docId, readerContext);
+                if (min == null || StringUtils.compare(min, value) > 0) {
+                  min = value;
+                }
+                if (max == null || StringUtils.compare(max, value) < 0) {
+                  max = value;
+                }
+              }
+            } else {
+              for (int docId = 0; docId < numDocs; docId++) {
+                String[] values = rawIndexReader.getStringMV(docId, 
readerContext);
+                for (String value : values) {
                   if (min == null || StringUtils.compare(min, value) > 0) {
                     min = value;
                   }
@@ -318,34 +305,28 @@ public class ColumnMinMaxValueGenerator {
                     max = value;
                   }
                 }
+              }
             }
-          } else {
-            try (VarByteChunkMVForwardIndexReader rawIndexReader = new 
VarByteChunkMVForwardIndexReader(forwardBuffer,
-                DataType.STRING); ChunkReaderContext readerContext = 
rawIndexReader.createContext()) {
-                for (int docId = 0; docId < numDocs; docId++) {
-                  String[] value = rawIndexReader.getStringMV(docId, 
readerContext);
-                  for (int i = 0; i < value.length; i++) {
-                    if (min == null || StringUtils.compare(min, value[i]) > 0) 
{
-                      min = value[i];
-                    }
-                    if (max == null || StringUtils.compare(max, value[i]) < 0) 
{
-                      max = value[i];
-                    }
-                  }
-                }
-            }
-          }
-          
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, 
columnName, min, max, dataType);
+            
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, 
columnName, min, max, storedType);
+            break;
           }
-          break;
-        case BYTES: {
-          byte[] min = null;
-          byte[] max = null;
-          if (isSingleValueField) {
-            try (VarByteChunkSVForwardIndexReader rawIndexReader = new 
VarByteChunkSVForwardIndexReader(forwardBuffer,
-                DataType.BYTES); ChunkReaderContext readerContext = 
rawIndexReader.createContext()) {
-                for (int docId = 0; docId < numDocs; docId++) {
-                  byte[] value = rawIndexReader.getBytes(docId, readerContext);
+          case BYTES: {
+            byte[] min = null;
+            byte[] max = null;
+            if (isSingleValue) {
+              for (int docId = 0; docId < numDocs; docId++) {
+                byte[] value = rawIndexReader.getBytes(docId, readerContext);
+                if (min == null || ByteArray.compare(value, min) > 0) {
+                  min = value;
+                }
+                if (max == null || ByteArray.compare(value, max) < 0) {
+                  max = value;
+                }
+              }
+            } else {
+              for (int docId = 0; docId < numDocs; docId++) {
+                byte[][] values = rawIndexReader.getBytesMV(docId, 
readerContext);
+                for (byte[] value : values) {
                   if (min == null || ByteArray.compare(value, min) > 0) {
                     min = value;
                   }
@@ -353,29 +334,15 @@ public class ColumnMinMaxValueGenerator {
                     max = value;
                   }
                 }
+              }
             }
-          } else {
-            try (VarByteChunkMVForwardIndexReader rawIndexReader = new 
VarByteChunkMVForwardIndexReader(forwardBuffer,
-                DataType.BYTES); ChunkReaderContext readerContext = 
rawIndexReader.createContext()) {
-                for (int docId = 0; docId < numDocs; docId++) {
-                  byte[][] value = rawIndexReader.getBytesMV(docId, 
readerContext);
-                  for (int i = 0; i < value.length; i++) {
-                    if (min == null || ByteArray.compare(value[i], min) > 0) {
-                      min = value[i];
-                    }
-                    if (max == null || ByteArray.compare(value[i], max) < 0) {
-                      max = value[i];
-                    }
-                  }
-                }
-            }
-          }
-          
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, 
columnName,
-              BytesUtils.toHexString(min), BytesUtils.toHexString(max), 
dataType);
+            
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, 
columnName,
+                BytesUtils.toHexString(min), BytesUtils.toHexString(max), 
storedType);
+            break;
           }
-          break;
-        default:
-          throw new IllegalStateException("Unsupported data type: " + dataType 
+ " for column: " + columnName);
+          default:
+            throw new IllegalStateException("Unsupported data type: " + 
dataType + " for column: " + columnName);
+        }
       }
     }
     _minMaxValueAdded = true;
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkForwardIndexReader.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkForwardIndexReader.java
index e37fb4a987..5e05346bbc 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkForwardIndexReader.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkForwardIndexReader.java
@@ -26,7 +26,6 @@ import java.nio.FloatBuffer;
 import java.nio.IntBuffer;
 import java.nio.LongBuffer;
 import org.apache.pinot.segment.local.io.compression.ChunkCompressorFactory;
-import 
org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter;
 import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
 import org.apache.pinot.segment.spi.compression.ChunkDecompressor;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
@@ -55,7 +54,7 @@ public abstract class BaseChunkForwardIndexReader implements 
ForwardIndexReader<
   protected final PinotDataBuffer _rawData;
   protected final boolean _isSingleValue;
 
-  public BaseChunkForwardIndexReader(PinotDataBuffer dataBuffer, DataType 
storedType, boolean isSingleValue) {
+  protected BaseChunkForwardIndexReader(PinotDataBuffer dataBuffer, DataType 
storedType, boolean isSingleValue) {
     _dataBuffer = dataBuffer;
     _storedType = storedType;
 
@@ -92,7 +91,7 @@ public abstract class BaseChunkForwardIndexReader implements 
ForwardIndexReader<
       _chunkDecompressor = 
ChunkCompressorFactory.getDecompressor(_compressionType);
     }
 
-    _headerEntryChunkOffsetSize = 
BaseChunkSVForwardIndexWriter.getHeaderEntryChunkOffsetSize(version);
+    _headerEntryChunkOffsetSize = version <= 2 ? Integer.BYTES : Long.BYTES;
 
     // Slice out the header from the data buffer.
     int dataHeaderLength = _numChunks * _headerEntryChunkOffsetSize;
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkMVForwardIndexReader.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkMVForwardIndexReader.java
index acbec1a98d..cc44bb341b 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkMVForwardIndexReader.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkMVForwardIndexReader.java
@@ -20,19 +20,18 @@ package 
org.apache.pinot.segment.local.segment.index.readers.forward;
 
 import java.nio.ByteBuffer;
 import javax.annotation.Nullable;
-import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter;
+import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriter;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 
 
 /**
- * Chunk-based multi-value raw (non-dictionary-encoded) forward index reader 
for values of
- * fixed length data type (INT, LONG, FLOAT, DOUBLE).
- * <p>For data layout, please refer to the documentation for {@link 
VarByteChunkSVForwardIndexWriter}
+ * Chunk-based multi-value raw (non-dictionary-encoded) forward index reader 
for values of fixed length data type (INT,
+ * LONG, FLOAT, DOUBLE).
+ * <p>For data layout, please refer to the documentation for {@link 
VarByteChunkForwardIndexWriter}
  */
 public final class FixedByteChunkMVForwardIndexReader extends 
BaseChunkForwardIndexReader {
-
-  private static final int ROW_OFFSET_SIZE = 
VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
+  private static final int ROW_OFFSET_SIZE = 
VarByteChunkForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
 
   private final int _maxChunkSize;
 
@@ -206,8 +205,7 @@ public final class FixedByteChunkMVForwardIndexReader 
extends BaseChunkForwardIn
         // Last row in the last chunk
         return _dataBuffer.size();
       } else {
-        int valueEndOffsetInChunk = _dataBuffer
-            .getInt(chunkStartOffset + (long) (chunkRowId + 1) * 
ROW_OFFSET_SIZE);
+        int valueEndOffsetInChunk = _dataBuffer.getInt(chunkStartOffset + 
(long) (chunkRowId + 1) * ROW_OFFSET_SIZE);
         if (valueEndOffsetInChunk == 0) {
           // Last row in the last chunk (chunk is incomplete, which stores 0 
as the offset for the absent rows)
           return _dataBuffer.size();
@@ -220,8 +218,7 @@ public final class FixedByteChunkMVForwardIndexReader 
extends BaseChunkForwardIn
         // Last row in the chunk
         return getChunkPosition(chunkId + 1);
       } else {
-        return chunkStartOffset + _dataBuffer
-            .getInt(chunkStartOffset + (long) (chunkRowId + 1) * 
ROW_OFFSET_SIZE);
+        return chunkStartOffset + _dataBuffer.getInt(chunkStartOffset + (long) 
(chunkRowId + 1) * ROW_OFFSET_SIZE);
       }
     }
   }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkSVForwardIndexReader.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkSVForwardIndexReader.java
index c394d0021a..4bda3f8090 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkSVForwardIndexReader.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkSVForwardIndexReader.java
@@ -20,7 +20,7 @@ package 
org.apache.pinot.segment.local.segment.index.readers.forward;
 
 import java.nio.ByteBuffer;
 import javax.annotation.Nullable;
-import 
org.apache.pinot.segment.local.io.writer.impl.FixedByteChunkSVForwardIndexWriter;
+import 
org.apache.pinot.segment.local.io.writer.impl.FixedByteChunkForwardIndexWriter;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 
@@ -28,7 +28,7 @@ import org.apache.pinot.spi.data.FieldSpec.DataType;
 /**
  * Chunk-based single-value raw (non-dictionary-encoded) forward index reader 
for values of fixed length data type (INT,
  * LONG, FLOAT, DOUBLE).
- * <p>For data layout, please refer to the documentation for {@link 
FixedByteChunkSVForwardIndexWriter}
+ * <p>For data layout, please refer to the documentation for {@link 
FixedByteChunkForwardIndexWriter}
  */
 public final class FixedByteChunkSVForwardIndexReader extends 
BaseChunkForwardIndexReader {
   private final int _chunkSize;
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBytePower2ChunkSVForwardIndexReader.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBytePower2ChunkSVForwardIndexReader.java
index 35c857219c..1fc567cfd4 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBytePower2ChunkSVForwardIndexReader.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBytePower2ChunkSVForwardIndexReader.java
@@ -20,7 +20,7 @@ package 
org.apache.pinot.segment.local.segment.index.readers.forward;
 
 import java.nio.ByteBuffer;
 import javax.annotation.Nullable;
-import 
org.apache.pinot.segment.local.io.writer.impl.FixedByteChunkSVForwardIndexWriter;
+import 
org.apache.pinot.segment.local.io.writer.impl.FixedByteChunkForwardIndexWriter;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 
@@ -28,7 +28,7 @@ import org.apache.pinot.spi.data.FieldSpec.DataType;
 /**
  * Chunk-based single-value raw (non-dictionary-encoded) forward index reader 
for values of fixed length data type (INT,
  * LONG, FLOAT, DOUBLE).
- * <p>For data layout, please refer to the documentation for {@link 
FixedByteChunkSVForwardIndexWriter}
+ * <p>For data layout, please refer to the documentation for {@link 
FixedByteChunkForwardIndexWriter}
  */
 public final class FixedBytePower2ChunkSVForwardIndexReader extends 
BaseChunkForwardIndexReader {
   public static final int VERSION = 4;
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkMVForwardIndexReader.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkMVForwardIndexReader.java
index c445312f6f..235132d44c 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkMVForwardIndexReader.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkMVForwardIndexReader.java
@@ -21,20 +21,18 @@ package 
org.apache.pinot.segment.local.segment.index.readers.forward;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import javax.annotation.Nullable;
-import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter;
+import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriter;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 
+
 /**
- * Chunk-based single-value raw (non-dictionary-encoded) forward index reader 
for values of
- * variable
- * length data type
+ * Chunk-based multi-value raw (non-dictionary-encoded) forward index reader 
for values of variable length data type
  * (STRING, BYTES).
- * <p>For data layout, please refer to the documentation for {@link 
VarByteChunkSVForwardIndexWriter}
+ * <p>For data layout, please refer to the documentation for {@link 
VarByteChunkForwardIndexWriter}
  */
 public final class VarByteChunkMVForwardIndexReader extends 
BaseChunkForwardIndexReader {
-
-  private static final int ROW_OFFSET_SIZE = 
VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
+  private static final int ROW_OFFSET_SIZE = 
VarByteChunkForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
 
   private final int _maxChunkSize;
 
@@ -54,8 +52,7 @@ public final class VarByteChunkMVForwardIndexReader extends 
BaseChunkForwardInde
   }
 
   @Override
-  public int getStringMV(final int docId, final String[] valueBuffer,
-      final ChunkReaderContext context) {
+  public int getStringMV(final int docId, final String[] valueBuffer, final 
ChunkReaderContext context) {
     byte[] compressedBytes;
     if (_isCompressed) {
       compressedBytes = getBytesCompressed(docId, context);
@@ -100,8 +97,7 @@ public final class VarByteChunkMVForwardIndexReader extends 
BaseChunkForwardInde
   }
 
   @Override
-  public int getBytesMV(final int docId, final byte[][] valueBuffer,
-      final ChunkReaderContext context) {
+  public int getBytesMV(final int docId, final byte[][] valueBuffer, final 
ChunkReaderContext context) {
     byte[] compressedBytes;
     if (_isCompressed) {
       compressedBytes = getBytesCompressed(docId, context);
@@ -229,8 +225,7 @@ public final class VarByteChunkMVForwardIndexReader extends 
BaseChunkForwardInde
         // Last row in the last chunk
         return _dataBuffer.size();
       } else {
-        int valueEndOffsetInChunk = _dataBuffer
-            .getInt(chunkStartOffset + (long) (chunkRowId + 1) * 
ROW_OFFSET_SIZE);
+        int valueEndOffsetInChunk = _dataBuffer.getInt(chunkStartOffset + 
(long) (chunkRowId + 1) * ROW_OFFSET_SIZE);
         if (valueEndOffsetInChunk == 0) {
           // Last row in the last chunk (chunk is incomplete, which stores 0 
as the offset for the absent rows)
           return _dataBuffer.size();
@@ -243,8 +238,7 @@ public final class VarByteChunkMVForwardIndexReader extends 
BaseChunkForwardInde
         // Last row in the chunk
         return getChunkPosition(chunkId + 1);
       } else {
-        return chunkStartOffset + _dataBuffer
-            .getInt(chunkStartOffset + (long) (chunkRowId + 1) * 
ROW_OFFSET_SIZE);
+        return chunkStartOffset + _dataBuffer.getInt(chunkStartOffset + (long) 
(chunkRowId + 1) * ROW_OFFSET_SIZE);
       }
     }
   }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReader.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReader.java
index 4a69f73463..8728dd53a8 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReader.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReader.java
@@ -21,7 +21,7 @@ package 
org.apache.pinot.segment.local.segment.index.readers.forward;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import javax.annotation.Nullable;
-import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter;
+import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriter;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.utils.BigDecimalUtils;
@@ -32,10 +32,10 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 /**
  * Chunk-based single-value raw (non-dictionary-encoded) forward index reader 
for values of variable length data type
  * (BIG_DECIMAL, STRING, BYTES).
- * <p>For data layout, please refer to the documentation for {@link 
VarByteChunkSVForwardIndexWriter}
+ * <p>For data layout, please refer to the documentation for {@link 
VarByteChunkForwardIndexWriter}
  */
 public final class VarByteChunkSVForwardIndexReader extends 
BaseChunkForwardIndexReader {
-  private static final int ROW_OFFSET_SIZE = 
VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
+  private static final int ROW_OFFSET_SIZE = 
VarByteChunkForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
 
   private final int _maxChunkSize;
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReaderV4.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReaderV4.java
index 919e737f36..c1d842b23c 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReaderV4.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReaderV4.java
@@ -18,14 +18,14 @@
  */
 package org.apache.pinot.segment.local.segment.index.readers.forward;
 
+import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.nio.charset.StandardCharsets;
-import javax.annotation.Nullable;
 import org.apache.pinot.segment.local.io.compression.ChunkCompressorFactory;
-import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriterV4;
+import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4;
 import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
 import org.apache.pinot.segment.spi.compression.ChunkDecompressor;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
@@ -38,11 +38,14 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
+/**
+ * Chunk-based single-value raw (non-dictionary-encoded) forward index reader 
for values of variable length data type
+ * (BIG_DECIMAL, STRING, BYTES).
+ * <p>For data layout, please refer to the documentation for {@link 
VarByteChunkForwardIndexWriterV4}
+ */
 public class VarByteChunkSVForwardIndexReaderV4
     implements 
ForwardIndexReader<VarByteChunkSVForwardIndexReaderV4.ReaderContext> {
-
   private static final Logger LOGGER = 
LoggerFactory.getLogger(VarByteChunkSVForwardIndexReaderV4.class);
-
   private static final int METADATA_ENTRY_SIZE = 8;
 
   private final FieldSpec.DataType _storedType;
@@ -54,10 +57,8 @@ public class VarByteChunkSVForwardIndexReaderV4
   private final PinotDataBuffer _chunks;
 
   public VarByteChunkSVForwardIndexReaderV4(PinotDataBuffer dataBuffer, 
FieldSpec.DataType storedType) {
-    if (dataBuffer.getInt(0) < VarByteChunkSVForwardIndexWriterV4.VERSION) {
-      throw new IllegalStateException("version " + dataBuffer.getInt(0) + " < "
-          + VarByteChunkSVForwardIndexWriterV4.VERSION);
-    }
+    int version = dataBuffer.getInt(0);
+    Preconditions.checkState(version == 
VarByteChunkForwardIndexWriterV4.VERSION, "Illegal index version: %s", version);
     _storedType = storedType;
     _targetDecompressedChunkSize = dataBuffer.getInt(4);
     _chunkCompressionType = ChunkCompressionType.valueOf(dataBuffer.getInt(8));
@@ -83,6 +84,20 @@ public class VarByteChunkSVForwardIndexReaderV4
     return _storedType;
   }
 
+  @Override
+  public ChunkCompressionType getCompressionType() {
+    // NOTE: Treat LZ4_LENGTH_PREFIXED as LZ4 because 
VarByteChunkForwardIndexWriterV4 implicitly override it
+    return _chunkCompressionType == ChunkCompressionType.LZ4_LENGTH_PREFIXED ? 
ChunkCompressionType.LZ4
+        : _chunkCompressionType;
+  }
+
+  @Override
+  public ReaderContext createContext() {
+    return _chunkCompressionType == ChunkCompressionType.PASS_THROUGH ? new 
UncompressedReaderContext(_chunks,
+        _metadata) : new CompressedReaderContext(_metadata, _chunks, 
_chunkDecompressor, _chunkCompressionType,
+        _targetDecompressedChunkSize);
+  }
+
   @Override
   public BigDecimal getBigDecimal(int docId, ReaderContext context) {
     return BigDecimalUtils.deserialize(context.getValue(docId));
@@ -98,15 +113,6 @@ public class VarByteChunkSVForwardIndexReaderV4
     return context.getValue(docId);
   }
 
-  @Nullable
-  @Override
-  public ReaderContext createContext() {
-    return _chunkCompressionType == ChunkCompressionType.PASS_THROUGH
-        ? new UncompressedReaderContext(_chunks, _metadata)
-        : new CompressedReaderContext(_metadata, _chunks, _chunkDecompressor, 
_chunkCompressionType,
-            _targetDecompressedChunkSize);
-  }
-
   @Override
   public void close()
       throws IOException {
@@ -209,9 +215,8 @@ public class VarByteChunkSVForwardIndexReaderV4
     protected byte[] readSmallUncompressedValue(int docId) {
       int index = docId - _docIdOffset;
       int offset = _chunk.getInt((index + 1) * Integer.BYTES);
-      int nextOffset = index == _numDocsInCurrentChunk - 1
-          ? _chunk.limit()
-          : _chunk.getInt((index + 2) * Integer.BYTES);
+      int nextOffset =
+          index == _numDocsInCurrentChunk - 1 ? _chunk.limit() : 
_chunk.getInt((index + 2) * Integer.BYTES);
       byte[] bytes = new byte[nextOffset - offset];
       _chunk.position(offset);
       _chunk.get(bytes);
@@ -220,8 +225,7 @@ public class VarByteChunkSVForwardIndexReaderV4
     }
 
     @Override
-    public void close()
-        throws IOException {
+    public void close() {
     }
   }
 
@@ -257,8 +261,7 @@ public class VarByteChunkSVForwardIndexReaderV4
     protected byte[] readSmallUncompressedValue(int docId) {
       int index = docId - _docIdOffset;
       int offset = _decompressedBuffer.getInt((index + 1) * Integer.BYTES);
-      int nextOffset = index == _numDocsInCurrentChunk - 1
-          ? _decompressedBuffer.limit()
+      int nextOffset = index == _numDocsInCurrentChunk - 1 ? 
_decompressedBuffer.limit()
           : _decompressedBuffer.getInt((index + 2) * Integer.BYTES);
       byte[] bytes = new byte[nextOffset - offset];
       _decompressedBuffer.position(offset);
@@ -295,8 +298,7 @@ public class VarByteChunkSVForwardIndexReaderV4
     }
 
     @Override
-    public void close()
-        throws IOException {
+    public void close() {
       CleanerUtil.cleanQuietly(_decompressedBuffer);
     }
   }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/store/StarTreeLoaderUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/store/StarTreeLoaderUtils.java
index 0c666703c0..2b09994c97 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/store/StarTreeLoaderUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/store/StarTreeLoaderUtils.java
@@ -24,16 +24,15 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.pinot.segment.local.aggregator.ValueAggregatorFactory;
-import 
org.apache.pinot.segment.local.segment.index.readers.forward.BaseChunkForwardIndexReader;
+import 
org.apache.pinot.segment.local.segment.index.forward.ForwardIndexReaderFactory;
 import 
org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitSVForwardIndexReaderV2;
-import 
org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader;
-import 
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader;
 import org.apache.pinot.segment.local.startree.OffHeapStarTree;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.datasource.DataSource;
 import org.apache.pinot.segment.spi.index.StandardIndexes;
 import org.apache.pinot.segment.spi.index.column.ColumnIndexContainer;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
 import 
org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair;
 import org.apache.pinot.segment.spi.index.startree.StarTree;
 import org.apache.pinot.segment.spi.index.startree.StarTreeV2;
@@ -84,12 +83,8 @@ public class StarTreeLoaderUtils {
         PinotDataBuffer forwardIndexDataBuffer = 
indexReader.getIndexFor(metric, StandardIndexes.forward());
         DataType dataType = 
ValueAggregatorFactory.getAggregatedValueType(functionColumnPair.getFunctionType());
         FieldSpec fieldSpec = new MetricFieldSpec(metric, dataType);
-        BaseChunkForwardIndexReader forwardIndex;
-        if (dataType == DataType.BYTES) {
-          forwardIndex = new 
VarByteChunkSVForwardIndexReader(forwardIndexDataBuffer, DataType.BYTES);
-        } else {
-          forwardIndex = new 
FixedByteChunkSVForwardIndexReader(forwardIndexDataBuffer, dataType);
-        }
+        ForwardIndexReader<?> forwardIndex =
+            
ForwardIndexReaderFactory.createRawIndexReader(forwardIndexDataBuffer, 
dataType.getStoredType(), true);
         dataSourceMap.put(metric, new StarTreeDataSource(fieldSpec, numDocs, 
forwardIndex, null));
       }
 
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriterTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriterTest.java
index 48dad11495..e3d89157db 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriterTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriterTest.java
@@ -46,7 +46,6 @@ import static org.testng.Assert.assertEquals;
 
 
 public class VarByteChunkSVForwardIndexWriterTest {
-
   private static final File OUTPUT_DIR =
       new File(FileUtils.getTempDirectory(), 
VarByteChunkSVForwardIndexWriterTest.class.getSimpleName());
 
@@ -67,31 +66,26 @@ public class VarByteChunkSVForwardIndexWriterTest {
     int[] numbersOfDocs = {10, 1000};
     int[][] entryLengths = {{1, 1}, {0, 10}, {0, 100}, {100, 100}, {900, 
1000}};
     int[] versions = {2, 3};
-    return Arrays.stream(ChunkCompressionType.values())
-        .flatMap(chunkCompressionType -> 
IntStream.of(versions).boxed().flatMap(
-            version -> IntStream.of(numbersOfDocs).boxed()
-                .flatMap(totalDocs -> 
IntStream.of(numDocsPerChunks).boxed().flatMap(
-                    numDocsPerChunk -> Arrays.stream(entryLengths).map(
-                        lengths -> new Object[]{
-                            chunkCompressionType, totalDocs, numDocsPerChunk,
-                            lengths, version
-                        })))))
-        .toArray(Object[][]::new);
+    return 
Arrays.stream(ChunkCompressionType.values()).flatMap(chunkCompressionType -> 
IntStream.of(versions).boxed()
+        .flatMap(version -> IntStream.of(numbersOfDocs).boxed().flatMap(
+            totalDocs -> IntStream.of(numDocsPerChunks).boxed()
+                .flatMap(numDocsPerChunk -> 
Arrays.stream(entryLengths).map(lengths -> new Object[]{
+                    chunkCompressionType, totalDocs, numDocsPerChunk, lengths, 
version
+                }))))).toArray(Object[][]::new);
   }
 
   @Test(dataProvider = "params")
-  public void testPutStrings(ChunkCompressionType compressionType, int 
totalDocs, int numDocsPerChunk,
-      int[] lengths, int version)
+  public void testPutStrings(ChunkCompressionType compressionType, int 
totalDocs, int numDocsPerChunk, int[] lengths,
+      int version)
       throws IOException {
     String column = "testCol-" + UUID.randomUUID();
     File file = new File(OUTPUT_DIR, column + 
V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
     List<String[]> arrays = generateStringArrays(totalDocs, lengths, 50);
-    int maxEntryLengthInBytes =
-        arrays.stream().mapToInt(array -> Integer.BYTES + 
Arrays.stream(array).mapToInt(
-            str -> Integer.BYTES + 
str.getBytes(UTF_8).length).sum()).max().orElse(0);
-    try (
-        VarByteChunkSVForwardIndexWriter writer = new 
VarByteChunkSVForwardIndexWriter(file, compressionType, totalDocs,
-            numDocsPerChunk, maxEntryLengthInBytes, version)) {
+    int maxEntryLengthInBytes = arrays.stream().mapToInt(
+            array -> Integer.BYTES + Arrays.stream(array).mapToInt(
+                str -> Integer.BYTES + 
str.getBytes(UTF_8).length).sum()).max().orElse(0);
+    try (VarByteChunkForwardIndexWriter writer = new 
VarByteChunkForwardIndexWriter(file, compressionType, totalDocs,
+        numDocsPerChunk, maxEntryLengthInBytes, version)) {
       for (String[] array : arrays) {
         writer.putStrings(array);
       }
@@ -116,18 +110,17 @@ public class VarByteChunkSVForwardIndexWriterTest {
   }
 
   @Test(dataProvider = "params")
-  public void testPutBytes(ChunkCompressionType compressionType, int 
totalDocs, int numDocsPerChunk,
-      int[] lengths, int version)
+  public void testPutBytes(ChunkCompressionType compressionType, int 
totalDocs, int numDocsPerChunk, int[] lengths,
+      int version)
       throws IOException {
     String column = "testCol-" + UUID.randomUUID();
     File file = new File(OUTPUT_DIR, column + 
V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
     List<String[]> arrays = generateStringArrays(totalDocs, lengths, 50);
-    int maxEntryLengthInBytes = arrays.stream()
-        .mapToInt(array -> Integer.BYTES
-            + Arrays.stream(array).mapToInt(str -> Integer.BYTES + 
str.getBytes(UTF_8).length).sum())
-        .max().orElse(0);
-    try (VarByteChunkSVForwardIndexWriter writer = new 
VarByteChunkSVForwardIndexWriter(file, compressionType,
-        totalDocs, numDocsPerChunk, maxEntryLengthInBytes, version)) {
+    int maxEntryLengthInBytes = arrays.stream().mapToInt(
+            array -> Integer.BYTES + Arrays.stream(array).mapToInt(
+                str -> Integer.BYTES + 
str.getBytes(UTF_8).length).sum()).max().orElse(0);
+    try (VarByteChunkForwardIndexWriter writer = new 
VarByteChunkForwardIndexWriter(file, compressionType, totalDocs,
+        numDocsPerChunk, maxEntryLengthInBytes, version)) {
       for (String[] array : arrays) {
         writer.putByteArrays(Arrays.stream(array).map(str -> 
str.getBytes(UTF_8)).toArray(byte[][]::new));
       }
@@ -167,17 +160,16 @@ public class VarByteChunkSVForwardIndexWriterTest {
 
   private static Iterator<String> generateStrings(int minLength, int 
maxLength) {
     SplittableRandom random = new SplittableRandom();
-    return IntStream.generate(() -> random.nextInt(minLength, maxLength + 1))
-        .mapToObj(length -> {
-          char[] string = new char[length];
-          Arrays.fill(string, 'b');
-          if (string.length > 0) {
-            string[0] = 'a';
-          }
-          if (string.length > 1) {
-            string[string.length - 1] = 'c';
-          }
-          return new String(string);
-        }).iterator();
+    return IntStream.generate(() -> random.nextInt(minLength, maxLength + 
1)).mapToObj(length -> {
+      char[] string = new char[length];
+      Arrays.fill(string, 'b');
+      if (string.length > 0) {
+        string[0] = 'a';
+      }
+      if (string.length > 1) {
+        string[string.length - 1] = 'c';
+      }
+      return new String(string);
+    }).iterator();
   }
 }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/RawIndexCreatorTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/RawIndexCreatorTest.java
index 391f5d03a8..9e9a593691 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/RawIndexCreatorTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/RawIndexCreatorTest.java
@@ -29,14 +29,16 @@ import java.util.Random;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.RandomStringUtils;
 import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import 
org.apache.pinot.segment.local.segment.index.forward.ForwardIndexReaderFactory;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import 
org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext;
 import 
org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader;
 import 
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader;
-import 
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader;
 import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.segment.spi.index.StandardIndexes;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
 import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
 import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
@@ -63,7 +65,6 @@ import org.testng.annotations.Test;
  * Class for testing Raw index creators.
  */
 public class RawIndexCreatorTest {
-
   private static final int NUM_ROWS = 10009;
   private static final int MAX_STRING_LENGTH = 101;
 
@@ -168,8 +169,9 @@ public class RawIndexCreatorTest {
   public void testStringRawIndexCreator()
       throws Exception {
     PinotDataBuffer indexBuffer = getIndexBufferForColumn(STRING_COLUMN);
-    try (VarByteChunkSVForwardIndexReader rawIndexReader = new 
VarByteChunkSVForwardIndexReader(indexBuffer,
-        DataType.STRING); ChunkReaderContext readerContext = 
rawIndexReader.createContext()) {
+    try (
+        ForwardIndexReader rawIndexReader = 
ForwardIndexReaderFactory.createRawIndexReader(indexBuffer, DataType.STRING,
+            true); ForwardIndexReaderContext readerContext = 
rawIndexReader.createContext()) {
       _recordReader.rewind();
       for (int row = 0; row < NUM_ROWS; row++) {
         GenericRow expectedRow = _recordReader.next();
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/VarByteChunkV4Test.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/VarByteChunkV4Test.java
index 857e29bdb0..ba59d057a7 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/VarByteChunkV4Test.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/VarByteChunkV4Test.java
@@ -30,7 +30,7 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 import org.apache.commons.io.FileUtils;
-import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriterV4;
+import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4;
 import 
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReaderV4;
 import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
@@ -89,7 +89,7 @@ public class VarByteChunkV4Test {
       throws IOException {
     _file = new File(TEST_DIR, "testStringSV");
     testSV(compressionType, longestEntry, chunkSize, 
FieldSpec.DataType.STRING, x -> x,
-        VarByteChunkSVForwardIndexWriterV4::putString, (reader, context, 
docId) -> reader.getString(docId, context));
+        VarByteChunkForwardIndexWriterV4::putString, (reader, context, docId) 
-> reader.getString(docId, context));
   }
 
   @Test(dataProvider = "params")
@@ -97,16 +97,16 @@ public class VarByteChunkV4Test {
       throws IOException {
     _file = new File(TEST_DIR, "testBytesSV");
     testSV(compressionType, longestEntry, chunkSize, FieldSpec.DataType.BYTES, 
x -> x.getBytes(StandardCharsets.UTF_8),
-        VarByteChunkSVForwardIndexWriterV4::putBytes, (reader, context, docId) 
-> reader.getBytes(docId, context));
+        VarByteChunkForwardIndexWriterV4::putBytes, (reader, context, docId) 
-> reader.getBytes(docId, context));
   }
 
   private <T> void testSV(ChunkCompressionType compressionType, int 
longestEntry, int chunkSize,
       FieldSpec.DataType dataType, Function<String, T> forwardMapper,
-      BiConsumer<VarByteChunkSVForwardIndexWriterV4, T> write,
+      BiConsumer<VarByteChunkForwardIndexWriterV4, T> write,
       Read<T> read)
       throws IOException {
     List<T> values = randomStrings(1000, 
longestEntry).map(forwardMapper).collect(Collectors.toList());
-    try (VarByteChunkSVForwardIndexWriterV4 writer = new 
VarByteChunkSVForwardIndexWriterV4(_file, compressionType,
+    try (VarByteChunkForwardIndexWriterV4 writer = new 
VarByteChunkForwardIndexWriterV4(_file, compressionType,
         chunkSize)) {
       for (T value : values) {
         write.accept(writer, value);
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedByteChunkSVForwardIndexTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedByteChunkSVForwardIndexTest.java
index 0e2386ba82..43d1e35ad0 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedByteChunkSVForwardIndexTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedByteChunkSVForwardIndexTest.java
@@ -24,7 +24,7 @@ import java.util.Arrays;
 import java.util.Random;
 import java.util.stream.IntStream;
 import org.apache.commons.io.FileUtils;
-import 
org.apache.pinot.segment.local.io.writer.impl.FixedByteChunkSVForwardIndexWriter;
+import 
org.apache.pinot.segment.local.io.writer.impl.FixedByteChunkForwardIndexWriter;
 import 
org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext;
 import 
org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader;
 import 
org.apache.pinot.segment.local.segment.index.readers.forward.FixedBytePower2ChunkSVForwardIndexReader;
@@ -38,14 +38,13 @@ import org.testng.annotations.Test;
 
 
 /**
- * Unit test for {@link FixedByteChunkSVForwardIndexReader} and {@link 
FixedByteChunkSVForwardIndexWriter} classes.
+ * Unit test for {@link FixedByteChunkSVForwardIndexReader} and {@link 
FixedByteChunkForwardIndexWriter} classes.
  *
- * This test writes {@link #NUM_VALUES} using {@link 
FixedByteChunkSVForwardIndexWriter}. It then reads
+ * This test writes {@link #NUM_VALUES} using {@link 
FixedByteChunkForwardIndexWriter}. It then reads
  * the values using {@link FixedByteChunkSVForwardIndexReader}, and asserts 
that what was written is the same as
  * what was read in.
  *
  * Number of docs and docs per chunk are chosen to generate complete as well 
partial chunks.
- *
  */
 public class FixedByteChunkSVForwardIndexTest {
   private static final int NUM_VALUES = 10009;
@@ -75,10 +74,10 @@ public class FixedByteChunkSVForwardIndexTest {
     FileUtils.deleteQuietly(outFileEightByte);
 
     // test both formats (4-byte chunk offsets and 8-byte chunk offsets)
-    try (FixedByteChunkSVForwardIndexWriter fourByteOffsetWriter = new 
FixedByteChunkSVForwardIndexWriter(
-        outFileFourByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, 
Integer.BYTES, version);
-        FixedByteChunkSVForwardIndexWriter eightByteOffsetWriter = new 
FixedByteChunkSVForwardIndexWriter(
-            outFileEightByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, 
Integer.BYTES, version)) {
+    try (FixedByteChunkForwardIndexWriter fourByteOffsetWriter = new 
FixedByteChunkForwardIndexWriter(outFileFourByte,
+        compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, Integer.BYTES, 
version);
+        FixedByteChunkForwardIndexWriter eightByteOffsetWriter = new 
FixedByteChunkForwardIndexWriter(outFileEightByte,
+            compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, Integer.BYTES, 
version)) {
       for (int value : expected) {
         fourByteOffsetWriter.putInt(value);
         eightByteOffsetWriter.putInt(value);
@@ -123,10 +122,10 @@ public class FixedByteChunkSVForwardIndexTest {
     FileUtils.deleteQuietly(outFileEightByte);
 
     // test both formats (4-byte chunk offsets and 8-byte chunk offsets)
-    try (FixedByteChunkSVForwardIndexWriter fourByteOffsetWriter = new 
FixedByteChunkSVForwardIndexWriter(
-        outFileFourByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, 
Long.BYTES, version);
-        FixedByteChunkSVForwardIndexWriter eightByteOffsetWriter = new 
FixedByteChunkSVForwardIndexWriter(
-            outFileEightByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, 
Long.BYTES, version)) {
+    try (FixedByteChunkForwardIndexWriter fourByteOffsetWriter = new 
FixedByteChunkForwardIndexWriter(outFileFourByte,
+        compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, Long.BYTES, version);
+        FixedByteChunkForwardIndexWriter eightByteOffsetWriter = new 
FixedByteChunkForwardIndexWriter(outFileEightByte,
+            compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, Long.BYTES, 
version)) {
       for (long value : expected) {
         fourByteOffsetWriter.putLong(value);
         eightByteOffsetWriter.putLong(value);
@@ -171,10 +170,10 @@ public class FixedByteChunkSVForwardIndexTest {
     FileUtils.deleteQuietly(outFileEightByte);
 
     // test both formats (4-byte chunk offsets and 8-byte chunk offsets)
-    try (FixedByteChunkSVForwardIndexWriter fourByteOffsetWriter = new 
FixedByteChunkSVForwardIndexWriter(
-        outFileFourByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, 
Float.BYTES, version);
-        FixedByteChunkSVForwardIndexWriter eightByteOffsetWriter = new 
FixedByteChunkSVForwardIndexWriter(
-            outFileEightByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, 
Float.BYTES, version)) {
+    try (FixedByteChunkForwardIndexWriter fourByteOffsetWriter = new 
FixedByteChunkForwardIndexWriter(outFileFourByte,
+        compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, Float.BYTES, version);
+        FixedByteChunkForwardIndexWriter eightByteOffsetWriter = new 
FixedByteChunkForwardIndexWriter(outFileEightByte,
+            compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, Float.BYTES, 
version)) {
       for (float value : expected) {
         fourByteOffsetWriter.putFloat(value);
         eightByteOffsetWriter.putFloat(value);
@@ -219,10 +218,10 @@ public class FixedByteChunkSVForwardIndexTest {
     FileUtils.deleteQuietly(outFileEightByte);
 
     // test both formats (4-byte chunk offsets and 8-byte chunk offsets)
-    try (FixedByteChunkSVForwardIndexWriter fourByteOffsetWriter = new 
FixedByteChunkSVForwardIndexWriter(
-        outFileFourByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, 
Double.BYTES, version);
-        FixedByteChunkSVForwardIndexWriter eightByteOffsetWriter = new 
FixedByteChunkSVForwardIndexWriter(
-            outFileEightByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, 
Double.BYTES, version)) {
+    try (FixedByteChunkForwardIndexWriter fourByteOffsetWriter = new 
FixedByteChunkForwardIndexWriter(outFileFourByte,
+        compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, Double.BYTES, 
version);
+        FixedByteChunkForwardIndexWriter eightByteOffsetWriter = new 
FixedByteChunkForwardIndexWriter(outFileEightByte,
+            compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, Double.BYTES, 
version)) {
       for (double value : expected) {
         fourByteOffsetWriter.putDouble(value);
         eightByteOffsetWriter.putDouble(value);
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexTypeTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexTypeTest.java
index b686d69ba8..2210211d71 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexTypeTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexTypeTest.java
@@ -32,7 +32,9 @@ import org.testng.Assert;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
-import static org.testng.Assert.*;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertSame;
 
 
 public class ForwardIndexTypeTest {
@@ -152,7 +154,7 @@ public class ForwardIndexTypeTest {
           new ForwardIndexConfig.Builder()
               .withCompressionType(null)
               .withDeriveNumDocsPerChunk(false)
-              .withRawIndexWriterVersion(2)
+              
.withRawIndexWriterVersion(ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION)
               .build()
       );
     }
@@ -174,7 +176,7 @@ public class ForwardIndexTypeTest {
             new ForwardIndexConfig.Builder()
                 .withCompressionType(compression == null ? null : 
ChunkCompressionType.valueOf(compression))
                 .withDeriveNumDocsPerChunk(false)
-                .withRawIndexWriterVersion(2)
+                
.withRawIndexWriterVersion(ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION)
                 .build()
       );
     }
@@ -195,7 +197,7 @@ public class ForwardIndexTypeTest {
       assertEquals(new ForwardIndexConfig.Builder()
           .withCompressionType(null)
           .withDeriveNumDocsPerChunk(true)
-          .withRawIndexWriterVersion(2)
+          
.withRawIndexWriterVersion(ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION)
           .build());
     }
 
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/VarByteChunkSVForwardIndexTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/VarByteChunkSVForwardIndexTest.java
index 419836c38a..17f169081b 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/VarByteChunkSVForwardIndexTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/VarByteChunkSVForwardIndexTest.java
@@ -27,8 +27,7 @@ import java.util.Random;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.commons.lang.StringUtils;
-import 
org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter;
-import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter;
+import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriter;
 import 
org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueVarByteRawIndexCreator;
 import 
org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext;
 import 
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader;
@@ -44,7 +43,7 @@ import static org.testng.Assert.fail;
 
 
 /**
- * Unit test for {@link VarByteChunkSVForwardIndexReader} and {@link 
VarByteChunkSVForwardIndexWriter} classes.
+ * Unit test for {@link VarByteChunkSVForwardIndexReader} and {@link 
VarByteChunkForwardIndexWriter} classes.
  */
 public class VarByteChunkSVForwardIndexTest {
   private static final int NUM_ENTRIES = 5003;
@@ -77,7 +76,7 @@ public class VarByteChunkSVForwardIndexTest {
   }
 
   /**
-   * This test writes {@link #NUM_ENTRIES} using {@link 
VarByteChunkSVForwardIndexWriter}. It then reads
+   * This test writes {@link #NUM_ENTRIES} using {@link 
VarByteChunkForwardIndexWriter}. It then reads
    * the strings & bytes using {@link VarByteChunkSVForwardIndexReader}, and 
asserts that what was written is the
    * same as
    * what was read in.
@@ -105,12 +104,10 @@ public class VarByteChunkSVForwardIndexTest {
     }
 
     // test both formats (4-byte chunk offsets and 8-byte chunk offsets)
-    try (VarByteChunkSVForwardIndexWriter fourByteOffsetWriter = new 
VarByteChunkSVForwardIndexWriter(outFileFourByte,
-        compressionType, NUM_ENTRIES, NUM_DOCS_PER_CHUNK, 
maxStringLengthInBytes,
-        BaseChunkSVForwardIndexWriter.DEFAULT_VERSION);
-        VarByteChunkSVForwardIndexWriter eightByteOffsetWriter = new 
VarByteChunkSVForwardIndexWriter(outFileEightByte,
-            compressionType, NUM_ENTRIES, NUM_DOCS_PER_CHUNK, 
maxStringLengthInBytes,
-            BaseChunkSVForwardIndexWriter.CURRENT_VERSION)) {
+    try (VarByteChunkForwardIndexWriter fourByteOffsetWriter = new 
VarByteChunkForwardIndexWriter(outFileFourByte,
+        compressionType, NUM_ENTRIES, NUM_DOCS_PER_CHUNK, 
maxStringLengthInBytes, 2);
+        VarByteChunkForwardIndexWriter eightByteOffsetWriter = new 
VarByteChunkForwardIndexWriter(outFileEightByte,
+            compressionType, NUM_ENTRIES, NUM_DOCS_PER_CHUNK, 
maxStringLengthInBytes, 3)) {
       // NOTE: No need to test BYTES explicitly because STRING is handled as 
UTF-8 encoded bytes
       for (int i = 0; i < NUM_ENTRIES; i++) {
         fourByteOffsetWriter.putString(expected[i]);
@@ -120,12 +117,10 @@ public class VarByteChunkSVForwardIndexTest {
 
     try (VarByteChunkSVForwardIndexReader fourByteOffsetReader = new 
VarByteChunkSVForwardIndexReader(
         PinotDataBuffer.mapReadOnlyBigEndianFile(outFileFourByte), 
DataType.STRING);
-        ChunkReaderContext fourByteOffsetReaderContext = fourByteOffsetReader
-            .createContext();
+        ChunkReaderContext fourByteOffsetReaderContext = 
fourByteOffsetReader.createContext();
         VarByteChunkSVForwardIndexReader eightByteOffsetReader = new 
VarByteChunkSVForwardIndexReader(
             PinotDataBuffer.mapReadOnlyBigEndianFile(outFileEightByte), 
DataType.STRING);
-        ChunkReaderContext eightByteOffsetReaderContext = eightByteOffsetReader
-            .createContext()) {
+        ChunkReaderContext eightByteOffsetReaderContext = 
eightByteOffsetReader.createContext()) {
       for (int i = 0; i < NUM_ENTRIES; i++) {
         Assert.assertEquals(fourByteOffsetReader.getString(i, 
fourByteOffsetReaderContext), expected[i]);
         Assert.assertEquals(eightByteOffsetReader.getString(i, 
eightByteOffsetReaderContext), expected[i]);
@@ -230,8 +225,8 @@ public class VarByteChunkSVForwardIndexTest {
     }
 
     int numDocsPerChunk = 
SingleValueVarByteRawIndexCreator.getNumDocsPerChunk(maxStringLengthInBytes);
-    try (VarByteChunkSVForwardIndexWriter writer = new 
VarByteChunkSVForwardIndexWriter(outFile, compressionType,
-        numDocs, numDocsPerChunk, maxStringLengthInBytes, 
BaseChunkSVForwardIndexWriter.CURRENT_VERSION)) {
+    try (VarByteChunkForwardIndexWriter writer = new 
VarByteChunkForwardIndexWriter(outFile, compressionType, numDocs,
+        numDocsPerChunk, maxStringLengthInBytes, 3)) {
       // NOTE: No need to test BYTES explicitly because STRING is handled as 
UTF-8 encoded bytes
       for (int i = 0; i < numDocs; i++) {
         writer.putString(expected[i]);
@@ -274,7 +269,7 @@ public class VarByteChunkSVForwardIndexTest {
     file.deleteOnExit();
     int docSize = 21475;
     byte[] value = StringUtils.repeat("a", docSize).getBytes(UTF_8);
-    try (VarByteChunkSVForwardIndexWriter writer = new 
VarByteChunkSVForwardIndexWriter(file,
+    try (VarByteChunkForwardIndexWriter writer = new 
VarByteChunkForwardIndexWriter(file,
         ChunkCompressionType.PASS_THROUGH, 100_001, 1000, docSize, 2)) {
       try {
         for (int i = 0; i < 100_000; i++) {
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java
index b7723f1294..d29c62cd34 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java
@@ -31,13 +31,13 @@ import 
org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
-import 
org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter;
 import 
org.apache.pinot.segment.local.segment.index.forward.ForwardIndexCreatorFactory;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
 import org.apache.pinot.segment.spi.datasource.DataSource;
 import org.apache.pinot.segment.spi.datasource.DataSourceMetadata;
+import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
 import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
@@ -319,7 +319,7 @@ public class DictionaryToRawIndexConverter {
 
     try (ForwardIndexCreator rawIndexCreator = ForwardIndexCreatorFactory
         .getRawIndexCreatorForSVColumn(newSegment, compressionType, column, 
storedType, numDocs, lengthOfLongestEntry,
-            false, BaseChunkSVForwardIndexWriter.DEFAULT_VERSION);
+            false, ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION);
         ForwardIndexReaderContext readerContext = 
forwardIndexReader.createContext()) {
       switch (storedType) {
         case INT:


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to