This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a1a2434  Remove BlockValIterator and directly read values from 
BlockValSet with docId (#5510)
a1a2434 is described below

commit a1a24349c821a1d962abc22c2594bdd12e52264e
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Tue Jun 9 16:47:24 2020 -0700

    Remove BlockValIterator and directly read values from BlockValSet with 
docId (#5510)
    
    When we read values from BlockValIterator, we don't use it as an iterator.
    Instead, we usually call skipTo(docId) followed by the actual read. This
    has the overhead of one extra method invocation, and is also counter
    intuitive.
    This PR enhanced the BlockValSet to provide APIs to directly read value
    for a given docId. This can provide better performance, and better
    abstraction of value reads.
    Note that the BlockValSet here is not actually for a block, but for a
    whole column. The following PRs will try to split the BlockValSet into
    multiple interfaces to differentiate block level value set and segment
    level value set.
---
 .../apache/pinot/core/common/BaseBlockValSet.java  |  67 +++++++--
 .../pinot/core/common/BlockMultiValIterator.java   |  50 ------
 .../pinot/core/common/BlockSingleValIterator.java  |  54 -------
 .../apache/pinot/core/common/BlockValIterator.java |  28 ----
 .../org/apache/pinot/core/common/BlockValSet.java  | 160 +++++++++++++-------
 .../org/apache/pinot/core/common/DataFetcher.java  |  60 ++++----
 .../pinot/core/minion/RawIndexConverter.java       |  13 +-
 .../core/operator/blocks/MultiValueBlock.java      |   5 +-
 .../core/operator/blocks/SingleValueBlock.java     |   5 +-
 .../dociditerators/MVScanDocIdIterator.java        |  14 +-
 .../dociditerators/SVScanDocIdIterator.java        |  57 +++----
 .../core/operator/docidsets/MVScanDocIdSet.java    |   2 +-
 .../core/operator/docidsets/SVScanDocIdSet.java    |   2 +-
 .../DictionaryBasedMultiValueIterator.java         | 108 -------------
 .../DictionaryBasedSingleValueIterator.java        |  88 -----------
 .../docvaliterators/MultiValueIterator.java        |  84 -----------
 .../docvaliterators/SingleValueIterator.java       |  84 -----------
 .../core/operator/docvalsets/MultiValueSet.java    |  19 +--
 .../core/operator/docvalsets/SingleValueSet.java   | 139 +++++++++--------
 .../core/common/RealtimeNoDictionaryTest.java      |  26 ++--
 .../mutable/MutableSegmentImplTest.java            |  76 +++++-----
 .../RealtimeSingleValueIteratorTest.java           | 167 ---------------------
 .../pinot/core/startree/v2/BaseStarTreeV2Test.java | 100 ++++++------
 .../apache/pinot/tools/StarTreeIndexViewer.java    |   9 --
 .../apache/pinot/tools/scan/query/Projection.java  |  19 +--
 .../tools/scan/query/SegmentQueryProcessor.java    |  57 ++++---
 .../converter/DictionaryToRawIndexConverter.java   |  48 +++---
 27 files changed, 482 insertions(+), 1059 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/BaseBlockValSet.java 
b/pinot-core/src/main/java/org/apache/pinot/core/common/BaseBlockValSet.java
index 252a4f0..44981e5 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/BaseBlockValSet.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/BaseBlockValSet.java
@@ -24,43 +24,92 @@ package org.apache.pinot.core.common;
 public abstract class BaseBlockValSet implements BlockValSet {
 
   @Override
-  public BlockValIterator iterator() {
+  public int getIntValue(int docId) {
     throw new UnsupportedOperationException();
   }
 
   @Override
-  public void getDictionaryIds(int[] inDocIds, int inStartPos, int 
inDocIdsSize, int[] outDictionaryIds,
-      int outStartPos) {
+  public long getLongValue(int docId) {
     throw new UnsupportedOperationException();
   }
 
   @Override
-  public void getIntValues(int[] inDocIds, int inStartPos, int inDocIdsSize, 
int[] outValues, int outStartPos) {
+  public float getFloatValue(int docId) {
     throw new UnsupportedOperationException();
   }
 
   @Override
-  public void getLongValues(int[] inDocIds, int inStartPos, int inDocIdsSize, 
long[] outValues, int outStartPos) {
+  public double getDoubleValue(int docId) {
     throw new UnsupportedOperationException();
   }
 
   @Override
-  public void getFloatValues(int[] inDocIds, int inStartPos, int inDocIdsSize, 
float[] outValues, int outStartPos) {
+  public String getStringValue(int docId) {
     throw new UnsupportedOperationException();
   }
 
   @Override
-  public void getDoubleValues(int[] inDocIds, int inStartPos, int 
inDocIdsSize, double[] outValues, int outStartPos) {
+  public byte[] getBytesValue(int docId) {
     throw new UnsupportedOperationException();
   }
 
   @Override
-  public void getStringValues(int[] inDocIds, int inStartPos, int 
inDocIdsSize, String[] outValues, int outStartPos) {
+  public int getIntValues(int docId, int[] valueBuffer) {
     throw new UnsupportedOperationException();
   }
 
   @Override
-  public void getBytesValues(int[] inDocIds, int inStartPos, int inDocIdsSize, 
byte[][] outValues, int outStartPos) {
+  public int getLongValues(int docId, long[] valueBuffer) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int getFloatValues(int docId, float[] valueBuffer) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int getDoubleValues(int docId, double[] valueBuffer) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int getStringValues(int docId, String[] valueBuffer) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void getDictionaryIds(int[] docIds, int length, int[] dictIdBuffer) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void getIntValues(int[] docIds, int length, int[] valueBuffer) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void getLongValues(int[] docIds, int length, long[] valueBuffer) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void getFloatValues(int[] docIds, int length, float[] valueBuffer) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void getDoubleValues(int[] docIds, int length, double[] valueBuffer) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void getStringValues(int[] docIds, int length, String[] valueBuffer) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void getBytesValues(int[] docIds, int length, byte[][] valueBuffer) {
     throw new UnsupportedOperationException();
   }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/BlockMultiValIterator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/common/BlockMultiValIterator.java
deleted file mode 100644
index 68c0fc5..0000000
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/common/BlockMultiValIterator.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.common;
-
-/**
- *
- *
- */
-public abstract class BlockMultiValIterator implements BlockValIterator {
-
-  public int nextCharVal(char[] charArray) {
-    throw new UnsupportedOperationException();
-  }
-
-  public int nextIntVal(int[] intArray) {
-    throw new UnsupportedOperationException();
-  }
-
-  public int nextLongVal(long[] longArray) {
-    throw new UnsupportedOperationException();
-  }
-
-  public int nextFloatVal(float[] floatArray) {
-    throw new UnsupportedOperationException();
-  }
-
-  public int nextDoubleVal(double[] doubleArray) {
-    throw new UnsupportedOperationException();
-  }
-
-  public int nextBytesArrayVal(byte[][] bytesArrays) {
-    throw new UnsupportedOperationException();
-  }
-}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/BlockSingleValIterator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/common/BlockSingleValIterator.java
deleted file mode 100644
index 5222189..0000000
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/common/BlockSingleValIterator.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.common;
-
-/**
- *
- * TODO: Split into two classes, one for iterator over data, another over 
dictionary id's.
- */
-public abstract class BlockSingleValIterator implements BlockValIterator {
-
-  char nextCharVal() {
-    throw new UnsupportedOperationException();
-  }
-
-  public int nextIntVal() {
-    throw new UnsupportedOperationException();
-  }
-
-  public float nextFloatVal() {
-    throw new UnsupportedOperationException();
-  }
-
-  public long nextLongVal() {
-    throw new UnsupportedOperationException();
-  }
-
-  public double nextDoubleVal() {
-    throw new UnsupportedOperationException();
-  }
-
-  public byte[] nextBytesVal() {
-    throw new UnsupportedOperationException();
-  }
-
-  public String nextStringVal() {
-    throw new UnsupportedOperationException();
-  }
-}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/BlockValIterator.java 
b/pinot-core/src/main/java/org/apache/pinot/core/common/BlockValIterator.java
deleted file mode 100644
index 776f425..0000000
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/common/BlockValIterator.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.common;
-
-public interface BlockValIterator {
-
-  boolean hasNext();
-
-  void skipTo(int docId);
-
-  void reset();
-}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/BlockValSet.java 
b/pinot-core/src/main/java/org/apache/pinot/core/common/BlockValSet.java
index f797694..f8bc2ea 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/BlockValSet.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/BlockValSet.java
@@ -21,10 +21,11 @@ package org.apache.pinot.core.common;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 
 
+/**
+ * TODO: Split BlockValSet into multiple interfaces. The docId based APIs do 
not apply to Block concept.
+ */
 public interface BlockValSet {
 
-  BlockValIterator iterator();
-
   DataType getValueType();
 
   boolean isSingleValue();
@@ -34,80 +35,125 @@ public interface BlockValSet {
    */
 
   /**
-   * Get dictionary Ids for the given docIds.
-   *
-   * @param inDocIds Input docIds
-   * @param inStartPos Start index in inDocIds
-   * @param inDocIdsSize Number of input doc ids
-   * @param outDictionaryIds Output array
-   * @param outStartPos Start position in outDictionaryIds
+   * NOTE: The following single value read APIs do not handle the data type 
conversion for performance concern. Caller
+   *       should always call the API that matches the data type of the {@code 
BlockValSet}.
    */
-  void getDictionaryIds(int[] inDocIds, int inStartPos, int inDocIdsSize, 
int[] outDictionaryIds, int outStartPos);
 
   /**
-   * Get Integer values for the given docIds.
-   *
-   * @param inDocIds Input docIds
-   * @param inStartPos Start index in inDocIds
-   * @param inDocIdsSize Number of input doc ids
-   * @param outValues Output array
-   * @param outStartPos Start position in outValues
+   * Returns the INT type single-value at the given document id.
+   * <p>NOTE: Dictionary id is handled as INT type.
    */
-  void getIntValues(int[] inDocIds, int inStartPos, int inDocIdsSize, int[] 
outValues, int outStartPos);
+  int getIntValue(int docId);
 
   /**
-   * Get long values for the given docIds.
-   *
-   * @param inDocIds Input docIds
-   * @param inStartPos Start index in inDocIds
-   * @param inDocIdsSize Number of input doc ids
-   * @param outValues Output array
-   * @param outStartPos Start position in outValues
+   * Returns the LONG type single-value at the given document id.
    */
-  void getLongValues(int[] inDocIds, int inStartPos, int inDocIdsSize, long[] 
outValues, int outStartPos);
+  long getLongValue(int docId);
 
   /**
-   * Get float values for the given docIds.
-   *
-   * @param inDocIds Input docIds
-   * @param inStartPos Start index in inDocIds
-   * @param inDocIdsSize Number of input doc ids
-   * @param outValues Output array
-   * @param outStartPos Start position in outValues
+   * Returns the FLOAT type single-value at the given document id.
    */
-  void getFloatValues(int[] inDocIds, int inStartPos, int inDocIdsSize, 
float[] outValues, int outStartPos);
+  float getFloatValue(int docId);
 
   /**
-   *
-   * @param inDocIds Input docIds
-   * @param inStartPos Start index in inDocIds
-   * @param inDocIdsSize Number of input doc ids
-   * @param outValues Output array
-   * @param outStartPos Start position in outValues
+   * Returns the DOUBLE type single-value at the given document id.
    */
-  void getDoubleValues(int[] inDocIds, int inStartPos, int inDocIdsSize, 
double[] outValues, int outStartPos);
+  double getDoubleValue(int docId);
 
   /**
-   * Get string values for the given docIds.
-   *
-   * @param inDocIds Input docIds
-   * @param inStartPos Start index in inDocIds
-   * @param inDocIdsSize Number of input doc ids
-   * @param outValues Output array
-   * @param outStartPos Start position in outValues
+   * Returns the STRING type single-value at the given document id.
    */
-  void getStringValues(int[] inDocIds, int inStartPos, int inDocIdsSize, 
String[] outValues, int outStartPos);
+  String getStringValue(int docId);
 
   /**
-   * Get byte[] values for the given docIds.
-   *
-   * @param inDocIds Input docIds
-   * @param inStartPos Start index in inDocIds
-   * @param inDocIdsSize Number of input doc ids
-   * @param outValues Output array
-   * @param outStartPos Start position in outValues
+   * Returns the BYTES type single-value at the given document id.
+   */
+  byte[] getBytesValue(int docId);
+
+  /**
+   * Reads the INT type multi-value at the given document id into the value 
buffer and returns the number of values in
+   * the multi-value entry.
+   * <p>The passed in value buffer should be large enough to hold all the 
values of a multi-value entry.
+   * <p>NOTE: Dictionary id is handled as INT type.
+   */
+  int getIntValues(int docId, int[] valueBuffer);
+
+  /**
+   * Reads the LONG type multi-value at the given document id into the value 
buffer and returns the number of values in
+   * the multi-value entry.
+   * <p>The passed in value buffer should be large enough to hold all the 
values of a multi-value entry.
+   */
+  int getLongValues(int docId, long[] valueBuffer);
+
+  /**
+   * Reads the FLOAT type multi-value at the given document id into the value 
buffer and returns the number of values
+   * in the multi-value entry.
+   * <p>The passed in value buffer should be large enough to hold all the 
values of a multi-value entry.
+   */
+  int getFloatValues(int docId, float[] valueBuffer);
+
+  /**
+   * Reads the DOUBLE type multi-value at the given document id into the value 
buffer and returns the number of values
+   * in the multi-value entry.
+   * <p>The passed in value buffer should be large enough to hold all the 
values of a multi-value entry.
+   */
+  int getDoubleValues(int docId, double[] valueBuffer);
+
+  /**
+   * Reads the STRING type multi-value at the given document id into the value 
buffer and returns the number of values
+   * in the multi-value entry.
+   * <p>The passed in value buffer should be large enough to hold all the 
values of a multi-value entry.
+   */
+  int getStringValues(int docId, String[] valueBuffer);
+
+  /**
+   * NOTE: The following batch value read APIs should be able to handle the 
data type conversion. Caller can call any
+   *       API regardless of the data type of the {@code BlockValSet}.
+   * TODO: Consider letting the caller handle the data type conversion because 
for different use cases, we might need to
+   *       convert data type differently.
+   */
+
+  /**
+   * Batch reads the dictionary ids at the given document ids of the given 
length into the dictionary id buffer.
+   * <p>The passed in dictionary id buffer size should be larger than or equal 
to the length.
+   */
+  void getDictionaryIds(int[] docIds, int length, int[] dictIdBuffer);
+
+  /**
+   * Batch reads the INT type single-values at the given document ids of the 
given length into the value buffer.
+   * <p>The passed in value buffer size should be larger than or equal to the 
length.
+   */
+  void getIntValues(int[] docIds, int length, int[] valueBuffer);
+
+  /**
+   * Batch reads the LONG type single-values at the given document ids of the 
given length into the value buffer.
+   * <p>The passed in value buffer size should be larger than or equal to the 
length.
+   */
+  void getLongValues(int[] docIds, int length, long[] valueBuffer);
+
+  /**
+   * Batch reads the FLOAT type single-values at the given document ids of the 
given length into the value buffer.
+   * <p>The passed in value buffer size should be larger than or equal to the 
length.
+   */
+  void getFloatValues(int[] docIds, int length, float[] valueBuffer);
+
+  /**
+   * Batch reads the DOUBLE type single-values at the given document ids of 
the given length into the value buffer.
+   * <p>The passed in value buffer size should be larger than or equal to the 
length.
+   */
+  void getDoubleValues(int[] docIds, int length, double[] valueBuffer);
+
+  /**
+   * Batch reads the STRING type single-values at the given document ids of 
the given length into the value buffer.
+   * <p>The passed in value buffer size should be larger than or equal to the 
length.
+   */
+  void getStringValues(int[] docIds, int length, String[] valueBuffer);
+
+  /**
+   * Batch reads the BYTES type single-values at the given document ids of the 
given length into the value buffer.
+   * <p>The passed in value buffer size should be larger than or equal to the 
length.
    */
-  void getBytesValues(int[] inDocIds, int inStartPos, int inDocIdsSize, 
byte[][] outValues, int outStartPos);
+  void getBytesValues(int[] docIds, int length, byte[][] valueBuffer);
 
   /**
    * SINGLE-VALUED COLUMN APIs
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/DataFetcher.java 
b/pinot-core/src/main/java/org/apache/pinot/core/common/DataFetcher.java
index 8d955a7..fc6799f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/DataFetcher.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/DataFetcher.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.common;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.pinot.core.operator.docvalsets.MultiValueSet;
 import org.apache.pinot.core.operator.docvalsets.SingleValueSet;
 import org.apache.pinot.core.plan.DocIdSetPlanNode;
 import org.apache.pinot.core.segment.index.readers.Dictionary;
@@ -38,10 +39,10 @@ public class DataFetcher {
       ThreadLocal.withInitial(() -> new 
int[DocIdSetPlanNode.MAX_DOC_PER_CALL]);
 
   private final Map<String, Dictionary> _dictionaryMap;
-  // For single-valued column
+  // For single-value columns
   private final Map<String, SingleValueSet> _singleValueSetMap;
-  // For multi-valued column
-  private final Map<String, BlockMultiValIterator> _blockMultiValIteratorMap;
+  // For multi-value columns
+  private final Map<String, MultiValueSet> _multiValueSetMap;
   private final int[] _reusableMVDictIds;
 
   /**
@@ -53,7 +54,7 @@ public class DataFetcher {
     int numColumns = dataSourceMap.size();
     _dictionaryMap = new HashMap<>(numColumns);
     _singleValueSetMap = new HashMap<>(numColumns);
-    _blockMultiValIteratorMap = new HashMap<>(numColumns);
+    _multiValueSetMap = new HashMap<>(numColumns);
 
     int maxNumValuesPerMVEntry = 0;
     for (Map.Entry<String, DataSource> entry : dataSourceMap.entrySet()) {
@@ -65,7 +66,7 @@ public class DataFetcher {
       if (dataSourceMetadata.isSingleValue()) {
         _singleValueSetMap.put(column, (SingleValueSet) blockValueSet);
       } else {
-        _blockMultiValIteratorMap.put(column, (BlockMultiValIterator) 
blockValueSet.iterator());
+        _multiValueSetMap.put(column, (MultiValueSet) blockValueSet);
         maxNumValuesPerMVEntry = Math.max(maxNumValuesPerMVEntry, 
dataSourceMetadata.getMaxNumValuesPerMVEntry());
       }
     }
@@ -86,7 +87,7 @@ public class DataFetcher {
    * @param outDictIds Buffer for output
    */
   public void fetchDictIds(String column, int[] inDocIds, int length, int[] 
outDictIds) {
-    _singleValueSetMap.get(column).getDictionaryIds(inDocIds, 0, length, 
outDictIds, 0);
+    _singleValueSetMap.get(column).getDictionaryIds(inDocIds, length, 
outDictIds);
   }
 
   /**
@@ -104,7 +105,7 @@ public class DataFetcher {
       fetchDictIds(column, inDocIds, length, dictIds);
       dictionary.readIntValues(dictIds, length, outValues);
     } else {
-      _singleValueSetMap.get(column).getIntValues(inDocIds, 0, length, 
outValues, 0);
+      _singleValueSetMap.get(column).getIntValues(inDocIds, length, outValues);
     }
   }
 
@@ -123,7 +124,7 @@ public class DataFetcher {
       fetchDictIds(column, inDocIds, length, dictIds);
       dictionary.readLongValues(dictIds, length, outValues);
     } else {
-      _singleValueSetMap.get(column).getLongValues(inDocIds, 0, length, 
outValues, 0);
+      _singleValueSetMap.get(column).getLongValues(inDocIds, length, 
outValues);
     }
   }
 
@@ -142,7 +143,7 @@ public class DataFetcher {
       fetchDictIds(column, inDocIds, length, dictIds);
       dictionary.readFloatValues(dictIds, length, outValues);
     } else {
-      _singleValueSetMap.get(column).getFloatValues(inDocIds, 0, length, 
outValues, 0);
+      _singleValueSetMap.get(column).getFloatValues(inDocIds, length, 
outValues);
     }
   }
 
@@ -161,7 +162,7 @@ public class DataFetcher {
       fetchDictIds(column, inDocIds, length, dictIds);
       dictionary.readDoubleValues(dictIds, length, outValues);
     } else {
-      _singleValueSetMap.get(column).getDoubleValues(inDocIds, 0, length, 
outValues, 0);
+      _singleValueSetMap.get(column).getDoubleValues(inDocIds, length, 
outValues);
     }
   }
 
@@ -180,7 +181,7 @@ public class DataFetcher {
       fetchDictIds(column, inDocIds, length, dictIds);
       dictionary.readStringValues(dictIds, length, outValues);
     } else {
-      _singleValueSetMap.get(column).getStringValues(inDocIds, 0, length, 
outValues, 0);
+      _singleValueSetMap.get(column).getStringValues(inDocIds, length, 
outValues);
     }
   }
 
@@ -199,7 +200,7 @@ public class DataFetcher {
       fetchDictIds(column, inDocIds, length, dictIds);
       dictionary.readBytesValues(dictIds, length, outValues);
     } else {
-      _singleValueSetMap.get(column).getBytesValues(inDocIds, 0, length, 
outValues, 0);
+      _singleValueSetMap.get(column).getBytesValues(inDocIds, length, 
outValues);
     }
   }
 
@@ -216,10 +217,9 @@ public class DataFetcher {
    * @param outDictIds Buffer for output
    */
   public void fetchDictIds(String column, int[] inDocIds, int length, int[][] 
outDictIds) {
-    BlockMultiValIterator blockMultiValIterator = 
_blockMultiValIteratorMap.get(column);
+    MultiValueSet multiValueSet = _multiValueSetMap.get(column);
     for (int i = 0; i < length; i++) {
-      blockMultiValIterator.skipTo(inDocIds[i]);
-      int numMultiValues = 
blockMultiValIterator.nextIntVal(_reusableMVDictIds);
+      int numMultiValues = multiValueSet.getIntValues(inDocIds[i], 
_reusableMVDictIds);
       outDictIds[i] = Arrays.copyOfRange(_reusableMVDictIds, 0, 
numMultiValues);
     }
   }
@@ -233,10 +233,9 @@ public class DataFetcher {
    * @param outValues Buffer for output
    */
   public void fetchIntValues(String column, int[] inDocIds, int length, 
int[][] outValues) {
-    BlockMultiValIterator blockMultiValIterator = 
_blockMultiValIteratorMap.get(column);
+    MultiValueSet multiValueSet = _multiValueSetMap.get(column);
     for (int i = 0; i < length; i++) {
-      blockMultiValIterator.skipTo(inDocIds[i]);
-      int numMultiValues = 
blockMultiValIterator.nextIntVal(_reusableMVDictIds);
+      int numMultiValues = multiValueSet.getIntValues(inDocIds[i], 
_reusableMVDictIds);
       outValues[i] = new int[numMultiValues];
       _dictionaryMap.get(column).readIntValues(_reusableMVDictIds, 
numMultiValues, outValues[i]);
     }
@@ -251,10 +250,9 @@ public class DataFetcher {
    * @param outValues Buffer for output
    */
   public void fetchLongValues(String column, int[] inDocIds, int length, 
long[][] outValues) {
-    BlockMultiValIterator blockMultiValIterator = 
_blockMultiValIteratorMap.get(column);
+    MultiValueSet multiValueSet = _multiValueSetMap.get(column);
     for (int i = 0; i < length; i++) {
-      blockMultiValIterator.skipTo(inDocIds[i]);
-      int numMultiValues = 
blockMultiValIterator.nextIntVal(_reusableMVDictIds);
+      int numMultiValues = multiValueSet.getIntValues(inDocIds[i], 
_reusableMVDictIds);
       outValues[i] = new long[numMultiValues];
       _dictionaryMap.get(column).readLongValues(_reusableMVDictIds, 
numMultiValues, outValues[i]);
     }
@@ -269,10 +267,9 @@ public class DataFetcher {
    * @param outValues Buffer for output
    */
   public void fetchFloatValues(String column, int[] inDocIds, int length, 
float[][] outValues) {
-    BlockMultiValIterator blockMultiValIterator = 
_blockMultiValIteratorMap.get(column);
+    MultiValueSet multiValueSet = _multiValueSetMap.get(column);
     for (int i = 0; i < length; i++) {
-      blockMultiValIterator.skipTo(inDocIds[i]);
-      int numMultiValues = 
blockMultiValIterator.nextIntVal(_reusableMVDictIds);
+      int numMultiValues = multiValueSet.getIntValues(inDocIds[i], 
_reusableMVDictIds);
       outValues[i] = new float[numMultiValues];
       _dictionaryMap.get(column).readFloatValues(_reusableMVDictIds, 
numMultiValues, outValues[i]);
     }
@@ -287,10 +284,9 @@ public class DataFetcher {
    * @param outValues Buffer for output
    */
   public void fetchDoubleValues(String column, int[] inDocIds, int length, 
double[][] outValues) {
-    BlockMultiValIterator blockMultiValIterator = 
_blockMultiValIteratorMap.get(column);
+    MultiValueSet multiValueSet = _multiValueSetMap.get(column);
     for (int i = 0; i < length; i++) {
-      blockMultiValIterator.skipTo(inDocIds[i]);
-      int numMultiValues = 
blockMultiValIterator.nextIntVal(_reusableMVDictIds);
+      int numMultiValues = multiValueSet.getIntValues(inDocIds[i], 
_reusableMVDictIds);
       outValues[i] = new double[numMultiValues];
       _dictionaryMap.get(column).readDoubleValues(_reusableMVDictIds, 
numMultiValues, outValues[i]);
     }
@@ -305,10 +301,9 @@ public class DataFetcher {
    * @param outValues Buffer for output
    */
   public void fetchStringValues(String column, int[] inDocIds, int length, 
String[][] outValues) {
-    BlockMultiValIterator blockMultiValIterator = 
_blockMultiValIteratorMap.get(column);
+    MultiValueSet multiValueSet = _multiValueSetMap.get(column);
     for (int i = 0; i < length; i++) {
-      blockMultiValIterator.skipTo(inDocIds[i]);
-      int numMultiValues = 
blockMultiValIterator.nextIntVal(_reusableMVDictIds);
+      int numMultiValues = multiValueSet.getIntValues(inDocIds[i], 
_reusableMVDictIds);
       outValues[i] = new String[numMultiValues];
       _dictionaryMap.get(column).readStringValues(_reusableMVDictIds, 
numMultiValues, outValues[i]);
     }
@@ -323,10 +318,9 @@ public class DataFetcher {
    * @param outNumValues Buffer for output
    */
   public void fetchNumValues(String column, int[] inDocIds, int length, int[] 
outNumValues) {
-    BlockMultiValIterator blockMultiValIterator = 
_blockMultiValIteratorMap.get(column);
+    MultiValueSet multiValueSet = _multiValueSetMap.get(column);
     for (int i = 0; i < length; i++) {
-      blockMultiValIterator.skipTo(inDocIds[i]);
-      outNumValues[i] = blockMultiValIterator.nextIntVal(_reusableMVDictIds);
+      outNumValues[i] = multiValueSet.getIntValues(inDocIds[i], 
_reusableMVDictIds);
     }
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java 
b/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java
index 0b5200d..0cfb813 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java
@@ -26,13 +26,13 @@ import 
org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.pinot.common.segment.ReadMode;
-import org.apache.pinot.core.common.BlockSingleValIterator;
 import org.apache.pinot.core.common.DataSource;
 import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
 import org.apache.pinot.core.io.compression.ChunkCompressorFactory;
 import org.apache.pinot.core.io.writer.impl.v1.BaseChunkSingleValueWriter;
+import org.apache.pinot.core.operator.docvalsets.SingleValueSet;
 import org.apache.pinot.core.segment.creator.SingleValueRawIndexCreator;
 import org.apache.pinot.core.segment.creator.impl.SegmentColumnarIndexCreator;
 import 
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
@@ -198,15 +198,14 @@ public class RawIndexConverter {
     DataSource dataSource = 
_originalImmutableSegment.getDataSource(columnName);
     Dictionary dictionary = dataSource.getDictionary();
     FieldSpec.DataType dataType = fieldSpec.getDataType();
+    int numDocs = _originalSegmentMetadata.getTotalDocs();
     int lengthOfLongestEntry = 
_originalSegmentMetadata.getColumnMetadataFor(columnName).getColumnMaxLength();
     try (SingleValueRawIndexCreator rawIndexCreator = 
SegmentColumnarIndexCreator
         .getRawIndexCreatorForColumn(_convertedIndexDir, 
ChunkCompressorFactory.CompressionType.SNAPPY, columnName,
-            dataType, _originalSegmentMetadata.getTotalDocs(), 
lengthOfLongestEntry, false, BaseChunkSingleValueWriter.DEFAULT_VERSION)) {
-      BlockSingleValIterator iterator = (BlockSingleValIterator) 
dataSource.nextBlock().getBlockValueSet().iterator();
-      int docId = 0;
-      while (iterator.hasNext()) {
-        int dictId = iterator.nextIntVal();
-        rawIndexCreator.index(docId++, dictionary.get(dictId));
+            dataType, numDocs, lengthOfLongestEntry, false, 
BaseChunkSingleValueWriter.DEFAULT_VERSION)) {
+      SingleValueSet valueSet = (SingleValueSet) 
dataSource.nextBlock().getBlockValueSet();
+      for (int docId = 0; docId < numDocs; docId++) {
+        rawIndexCreator.index(docId, 
dictionary.get(valueSet.getIntValue(docId)));
       }
     }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/MultiValueBlock.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/MultiValueBlock.java
index b0e33ed..825dcc8 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/MultiValueBlock.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/MultiValueBlock.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.core.operator.blocks;
 
-import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.core.common.Block;
 import org.apache.pinot.core.common.BlockDocIdSet;
 import org.apache.pinot.core.common.BlockDocIdValueSet;
@@ -27,15 +26,17 @@ import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.io.reader.SingleColumnMultiValueReader;
 import org.apache.pinot.core.operator.docvalsets.MultiValueSet;
 import org.apache.pinot.core.segment.index.readers.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec;
 
 
+@SuppressWarnings("rawtypes")
 public final class MultiValueBlock implements Block {
   private final BlockValSet _blockValSet;
   private final BlockMetadata _blockMetadata;
 
   public MultiValueBlock(SingleColumnMultiValueReader reader, int numDocs, int 
maxNumMultiValues,
       FieldSpec.DataType dataType, Dictionary dictionary) {
-    _blockValSet = new MultiValueSet(reader, numDocs, dataType);
+    _blockValSet = new MultiValueSet(reader, dataType);
     _blockMetadata = new BlockMetadataImpl(numDocs, false, maxNumMultiValues, 
dataType, dictionary);
   }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/SingleValueBlock.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/SingleValueBlock.java
index 70f5356..132e54e 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/SingleValueBlock.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/SingleValueBlock.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.core.operator.blocks;
 
-import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.core.common.Block;
 import org.apache.pinot.core.common.BlockDocIdSet;
 import org.apache.pinot.core.common.BlockDocIdValueSet;
@@ -27,8 +26,10 @@ import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.io.reader.SingleColumnSingleValueReader;
 import org.apache.pinot.core.operator.docvalsets.SingleValueSet;
 import org.apache.pinot.core.segment.index.readers.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec;
 
 
+@SuppressWarnings("rawtypes")
 public final class SingleValueBlock implements Block {
   private final SingleColumnSingleValueReader _reader;
   private final BlockValSet _blockValSet;
@@ -37,7 +38,7 @@ public final class SingleValueBlock implements Block {
   public SingleValueBlock(SingleColumnSingleValueReader reader, int numDocs, 
FieldSpec.DataType dataType,
       Dictionary dictionary) {
     _reader = reader;
-    _blockValSet = new SingleValueSet(reader, numDocs, dataType);
+    _blockValSet = new SingleValueSet(reader, dataType);
     _blockMetadata = new BlockMetadataImpl(numDocs, true, 0, dataType, 
dictionary);
   }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/MVScanDocIdIterator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/MVScanDocIdIterator.java
index e499b0b..8d6cbb7 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/MVScanDocIdIterator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/MVScanDocIdIterator.java
@@ -18,8 +18,8 @@
  */
 package org.apache.pinot.core.operator.dociditerators;
 
-import org.apache.pinot.core.common.BlockMultiValIterator;
 import org.apache.pinot.core.common.Constants;
+import org.apache.pinot.core.operator.docvalsets.MultiValueSet;
 import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
 import org.roaringbitmap.IntIterator;
 import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
@@ -32,17 +32,17 @@ import org.roaringbitmap.buffer.MutableRoaringBitmap;
  */
 public final class MVScanDocIdIterator implements ScanBasedDocIdIterator {
   private final PredicateEvaluator _predicateEvaluator;
-  private final BlockMultiValIterator _valueIterator;
+  private final MultiValueSet _valueSet;
   private final int _numDocs;
   private final int[] _dictIdBuffer;
 
   private int _nextDocId = 0;
   private long _numEntriesScanned = 0L;
 
-  public MVScanDocIdIterator(PredicateEvaluator predicateEvaluator, 
BlockMultiValIterator valueIterator, int numDocs,
+  public MVScanDocIdIterator(PredicateEvaluator predicateEvaluator, 
MultiValueSet valueSet, int numDocs,
       int maxNumEntriesPerValue) {
     _predicateEvaluator = predicateEvaluator;
-    _valueIterator = valueIterator;
+    _valueSet = valueSet;
     _numDocs = numDocs;
     _dictIdBuffer = new int[maxNumEntriesPerValue];
   }
@@ -51,7 +51,7 @@ public final class MVScanDocIdIterator implements 
ScanBasedDocIdIterator {
   public int next() {
     while (_nextDocId < _numDocs) {
       int nextDocId = _nextDocId++;
-      int length = _valueIterator.nextIntVal(_dictIdBuffer);
+      int length = _valueSet.getIntValues(nextDocId, _dictIdBuffer);
       _numEntriesScanned += length;
       if (_predicateEvaluator.applyMV(_dictIdBuffer, length)) {
         return nextDocId;
@@ -63,7 +63,6 @@ public final class MVScanDocIdIterator implements 
ScanBasedDocIdIterator {
   @Override
   public int advance(int targetDocId) {
     _nextDocId = targetDocId;
-    _valueIterator.skipTo(targetDocId);
     return next();
   }
 
@@ -73,8 +72,7 @@ public final class MVScanDocIdIterator implements 
ScanBasedDocIdIterator {
     IntIterator docIdIterator = docIds.getIntIterator();
     int nextDocId;
     while (docIdIterator.hasNext() && (nextDocId = docIdIterator.next()) < 
_numDocs) {
-      _valueIterator.skipTo(nextDocId);
-      int length = _valueIterator.nextIntVal(_dictIdBuffer);
+      int length = _valueSet.getIntValues(nextDocId, _dictIdBuffer);
       _numEntriesScanned += length;
       if (_predicateEvaluator.applyMV(_dictIdBuffer, length)) {
         result.add(nextDocId);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/SVScanDocIdIterator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/SVScanDocIdIterator.java
index 5f1fc9e..bf8d36d 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/SVScanDocIdIterator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/SVScanDocIdIterator.java
@@ -18,8 +18,8 @@
  */
 package org.apache.pinot.core.operator.dociditerators;
 
-import org.apache.pinot.core.common.BlockSingleValIterator;
 import org.apache.pinot.core.common.Constants;
+import org.apache.pinot.core.operator.docvalsets.SingleValueSet;
 import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
 import org.roaringbitmap.IntIterator;
 import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
@@ -32,16 +32,16 @@ import org.roaringbitmap.buffer.MutableRoaringBitmap;
  */
 public final class SVScanDocIdIterator implements ScanBasedDocIdIterator {
   private final PredicateEvaluator _predicateEvaluator;
-  private final BlockSingleValIterator _valueIterator;
+  private final SingleValueSet _valueSet;
   private final int _numDocs;
   private final ValueMatcher _valueMatcher;
 
   private int _nextDocId = 0;
   private long _numEntriesScanned = 0L;
 
-  public SVScanDocIdIterator(PredicateEvaluator predicateEvaluator, 
BlockSingleValIterator valueIterator, int numDocs) {
+  public SVScanDocIdIterator(PredicateEvaluator predicateEvaluator, 
SingleValueSet valueSet, int numDocs) {
     _predicateEvaluator = predicateEvaluator;
-    _valueIterator = valueIterator;
+    _valueSet = valueSet;
     _numDocs = numDocs;
     _valueMatcher = getValueMatcher();
   }
@@ -51,7 +51,7 @@ public final class SVScanDocIdIterator implements 
ScanBasedDocIdIterator {
     while (_nextDocId < _numDocs) {
       int nextDocId = _nextDocId++;
       _numEntriesScanned++;
-      if (_valueMatcher.doesNextValueMatch()) {
+      if (_valueMatcher.doesValueMatch(nextDocId)) {
         return nextDocId;
       }
     }
@@ -61,7 +61,6 @@ public final class SVScanDocIdIterator implements 
ScanBasedDocIdIterator {
   @Override
   public int advance(int targetDocId) {
     _nextDocId = targetDocId;
-    _valueIterator.skipTo(targetDocId);
     return next();
   }
 
@@ -71,9 +70,8 @@ public final class SVScanDocIdIterator implements 
ScanBasedDocIdIterator {
     IntIterator docIdIterator = docIds.getIntIterator();
     int nextDocId;
     while (docIdIterator.hasNext() && (nextDocId = docIdIterator.next()) < 
_numDocs) {
-      _valueIterator.skipTo(nextDocId);
       _numEntriesScanned++;
-      if (_valueMatcher.doesNextValueMatch()) {
+      if (_valueMatcher.doesValueMatch(nextDocId)) {
         result.add(nextDocId);
       }
     }
@@ -104,56 +102,59 @@ public final class SVScanDocIdIterator implements 
ScanBasedDocIdIterator {
     }
   }
 
-  private static abstract class ValueMatcher {
+  private interface ValueMatcher {
 
-    abstract boolean doesNextValueMatch();
+    /**
+     * Returns {@code true} if the value for the given document id matches the 
predicate, {@code false} Otherwise.
+     */
+    boolean doesValueMatch(int docId);
   }
 
-  private class IntMatcher extends ValueMatcher {
+  private class IntMatcher implements ValueMatcher {
 
     @Override
-    boolean doesNextValueMatch() {
-      return _predicateEvaluator.applySV(_valueIterator.nextIntVal());
+    public boolean doesValueMatch(int docId) {
+      return _predicateEvaluator.applySV(_valueSet.getIntValue(docId));
     }
   }
 
-  private class LongMatcher extends ValueMatcher {
+  private class LongMatcher implements ValueMatcher {
 
     @Override
-    boolean doesNextValueMatch() {
-      return _predicateEvaluator.applySV(_valueIterator.nextLongVal());
+    public boolean doesValueMatch(int docId) {
+      return _predicateEvaluator.applySV(_valueSet.getLongValue(docId));
     }
   }
 
-  private class FloatMatcher extends ValueMatcher {
+  private class FloatMatcher implements ValueMatcher {
 
     @Override
-    boolean doesNextValueMatch() {
-      return _predicateEvaluator.applySV(_valueIterator.nextFloatVal());
+    public boolean doesValueMatch(int docId) {
+      return _predicateEvaluator.applySV(_valueSet.getFloatValue(docId));
     }
   }
 
-  private class DoubleMatcher extends ValueMatcher {
+  private class DoubleMatcher implements ValueMatcher {
 
     @Override
-    boolean doesNextValueMatch() {
-      return _predicateEvaluator.applySV(_valueIterator.nextDoubleVal());
+    public boolean doesValueMatch(int docId) {
+      return _predicateEvaluator.applySV(_valueSet.getDoubleValue(docId));
     }
   }
 
-  private class StringMatcher extends ValueMatcher {
+  private class StringMatcher implements ValueMatcher {
 
     @Override
-    boolean doesNextValueMatch() {
-      return _predicateEvaluator.applySV(_valueIterator.nextStringVal());
+    public boolean doesValueMatch(int docId) {
+      return _predicateEvaluator.applySV(_valueSet.getStringValue(docId));
     }
   }
 
-  private class BytesMatcher extends ValueMatcher {
+  private class BytesMatcher implements ValueMatcher {
 
     @Override
-    boolean doesNextValueMatch() {
-      return _predicateEvaluator.applySV(_valueIterator.nextBytesVal());
+    public boolean doesValueMatch(int docId) {
+      return _predicateEvaluator.applySV(_valueSet.getBytesValue(docId));
     }
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/MVScanDocIdSet.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/MVScanDocIdSet.java
index 459145b..a2baa67 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/MVScanDocIdSet.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/MVScanDocIdSet.java
@@ -28,7 +28,7 @@ public final class MVScanDocIdSet implements 
FilterBlockDocIdSet {
 
   public MVScanDocIdSet(PredicateEvaluator predicateEvaluator, MultiValueSet 
valueSet, int numDocs,
       int maxNumEntriesPerValue) {
-    _docIdIterator = new MVScanDocIdIterator(predicateEvaluator, 
valueSet.iterator(), numDocs, maxNumEntriesPerValue);
+    _docIdIterator = new MVScanDocIdIterator(predicateEvaluator, valueSet, 
numDocs, maxNumEntriesPerValue);
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/SVScanDocIdSet.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/SVScanDocIdSet.java
index 910bfaf..2fb3e59 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/SVScanDocIdSet.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/SVScanDocIdSet.java
@@ -27,7 +27,7 @@ public final class SVScanDocIdSet implements 
FilterBlockDocIdSet {
   private final SVScanDocIdIterator _docIdIterator;
 
   public SVScanDocIdSet(PredicateEvaluator predicateEvaluator, SingleValueSet 
valueSet, int numDocs) {
-    _docIdIterator = new SVScanDocIdIterator(predicateEvaluator, 
valueSet.iterator(), numDocs);
+    _docIdIterator = new SVScanDocIdIterator(predicateEvaluator, valueSet, 
numDocs);
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/DictionaryBasedMultiValueIterator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/DictionaryBasedMultiValueIterator.java
deleted file mode 100644
index 9f3dc2a..0000000
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/DictionaryBasedMultiValueIterator.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.operator.docvaliterators;
-
-import org.apache.pinot.core.common.BlockMultiValIterator;
-import org.apache.pinot.core.common.BlockSingleValIterator;
-import org.apache.pinot.core.io.reader.ReaderContext;
-import org.apache.pinot.core.io.reader.SingleColumnMultiValueReader;
-import org.apache.pinot.core.io.reader.SingleColumnSingleValueReader;
-import org.apache.pinot.core.segment.index.readers.Dictionary;
-
-
-@SuppressWarnings("unchecked")
-public final class DictionaryBasedMultiValueIterator extends 
BlockMultiValIterator {
-
-  private final SingleColumnMultiValueReader _reader;
-  private final int _numDocs;
-  private final ReaderContext _context;
-  private final Dictionary _dictionary;
-  private final int[] _dictIds;
-
-  private int _nextDocId;
-
-  public DictionaryBasedMultiValueIterator(SingleColumnMultiValueReader 
reader, Dictionary dictionary, int numDocs,
-      int maxLength) {
-    _reader = reader;
-    _numDocs = numDocs;
-    _context = _reader.createContext();
-    _dictionary = dictionary;
-    _dictIds = new int[maxLength];
-  }
-
-  @Override
-  public int nextIntVal(int[] intArray) {
-    int length = _reader.getIntArray(_nextDocId++, _dictIds, _context);
-    for (int i = 0; i < length; i++) {
-      intArray[i] = _dictionary.getIntValue(_dictIds[i]);
-    }
-    return length;
-  }
-
-  @Override
-  public int nextDoubleVal(double[] doubleArray) {
-    int length = _reader.getIntArray(_nextDocId++, _dictIds, _context);
-    for (int i = 0; i < length; i++) {
-      doubleArray[i] = _dictionary.getDoubleValue(_dictIds[i]);
-    }
-    return length;
-  }
-
-  @Override
-  public int nextFloatVal(float[] floatArray) {
-    int length = _reader.getIntArray(_nextDocId++, _dictIds, _context);
-    for (int i = 0; i < length; i++) {
-      floatArray[i] = _dictionary.getFloatValue(_dictIds[i]);
-    }
-    return length;
-  }
-
-  @Override
-  public int nextLongVal(long[] longArray) {
-    int length = _reader.getIntArray(_nextDocId++, _dictIds, _context);
-    for (int i = 0; i < length; i++) {
-      longArray[i] = _dictionary.getLongValue(_dictIds[i]);
-    }
-    return length;
-  }
-
-  @Override
-  public int nextBytesArrayVal(byte[][] bytesArrays) {
-    int length = _reader.getIntArray(_nextDocId++, _dictIds, _context);
-    for (int i = 0; i < length; i++) {
-      bytesArrays[i] = _dictionary.getBytesValue(_dictIds[i]);
-    }
-    return length;
-  }
-
-  @Override
-  public boolean hasNext() {
-    return _nextDocId < _numDocs;
-  }
-
-  @Override
-  public void skipTo(int docId) {
-    _nextDocId = docId;
-  }
-
-  @Override
-  public void reset() {
-    _nextDocId = 0;
-  }
-}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/DictionaryBasedSingleValueIterator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/DictionaryBasedSingleValueIterator.java
deleted file mode 100644
index 34855e0..0000000
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/DictionaryBasedSingleValueIterator.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.operator.docvaliterators;
-
-import org.apache.pinot.core.common.BlockSingleValIterator;
-import org.apache.pinot.core.io.reader.ReaderContext;
-import org.apache.pinot.core.io.reader.SingleColumnSingleValueReader;
-import org.apache.pinot.core.segment.index.readers.Dictionary;
-
-
-@SuppressWarnings("unchecked")
-public final class DictionaryBasedSingleValueIterator extends 
BlockSingleValIterator {
-
-  private final SingleColumnSingleValueReader _reader;
-  private final int _numDocs;
-  private final ReaderContext _context;
-  private final Dictionary _dictionary;
-
-  private int _nextDocId;
-
-  public DictionaryBasedSingleValueIterator(SingleColumnSingleValueReader 
reader, Dictionary dictionary, int numDocs) {
-    _reader = reader;
-    _numDocs = numDocs;
-    _context = _reader.createContext();
-    _dictionary = dictionary;
-  }
-
-  @Override
-  public int nextIntVal() {
-    return _dictionary.getIntValue(_reader.getInt(_nextDocId++, _context));
-  }
-
-  @Override
-  public long nextLongVal() {
-    return _dictionary.getLongValue(_reader.getInt(_nextDocId++, _context));
-  }
-
-  @Override
-  public float nextFloatVal() {
-    return _dictionary.getFloatValue(_reader.getInt(_nextDocId++, _context));
-  }
-
-  @Override
-  public double nextDoubleVal() {
-    return _dictionary.getDoubleValue(_reader.getInt(_nextDocId++, _context));
-  }
-
-  @Override
-  public String nextStringVal() {
-    return _dictionary.getStringValue(_reader.getInt(_nextDocId++, _context));
-  }
-
-  @Override
-  public byte[] nextBytesVal() {
-    return _dictionary.getBytesValue(_reader.getInt(_nextDocId++, _context));
-  }
-
-  @Override
-  public boolean hasNext() {
-    return _nextDocId < _numDocs;
-  }
-
-  @Override
-  public void skipTo(int docId) {
-    _nextDocId = docId;
-  }
-
-  @Override
-  public void reset() {
-    _nextDocId = 0;
-  }
-}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/MultiValueIterator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/MultiValueIterator.java
deleted file mode 100644
index 4529c84..0000000
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/MultiValueIterator.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.operator.docvaliterators;
-
-import org.apache.pinot.core.common.BlockMultiValIterator;
-import org.apache.pinot.core.io.reader.ReaderContext;
-import org.apache.pinot.core.io.reader.SingleColumnMultiValueReader;
-
-
-@SuppressWarnings("unchecked")
-public final class MultiValueIterator extends BlockMultiValIterator {
-  private final SingleColumnMultiValueReader _reader;
-  private final int _numDocs;
-  private final ReaderContext _context;
-
-  private int _nextDocId;
-
-  public MultiValueIterator(SingleColumnMultiValueReader reader, int numDocs) {
-    _reader = reader;
-    _numDocs = numDocs;
-    _context = _reader.createContext();
-  }
-
-  @Override
-  public int nextIntVal(int[] intArray) {
-    return _reader.getIntArray(_nextDocId++, intArray, _context);
-  }
-
-  @Override
-  public int nextCharVal(char[] charArray) {
-    return _reader.getCharArray(_nextDocId++, charArray);
-  }
-
-  @Override
-  public int nextDoubleVal(double[] doubleArray) {
-    return _reader.getDoubleArray(_nextDocId++, doubleArray);
-  }
-
-  @Override
-  public int nextFloatVal(float[] floatArray) {
-    return _reader.getFloatArray(_nextDocId++, floatArray);
-  }
-
-  @Override
-  public int nextLongVal(long[] longArray) {
-    return _reader.getLongArray(_nextDocId++, longArray);
-  }
-
-  @Override
-  public int nextBytesArrayVal(byte[][] bytesArrays) {
-     return _reader.getBytesArray(_nextDocId++, bytesArrays);
-  }
-
-  @Override
-  public boolean hasNext() {
-    return _nextDocId < _numDocs;
-  }
-
-  @Override
-  public void skipTo(int docId) {
-    _nextDocId = docId;
-  }
-
-  @Override
-  public void reset() {
-    _nextDocId = 0;
-  }
-}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/SingleValueIterator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/SingleValueIterator.java
deleted file mode 100644
index 4bb39e4..0000000
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/SingleValueIterator.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.operator.docvaliterators;
-
-import org.apache.pinot.core.common.BlockSingleValIterator;
-import org.apache.pinot.core.io.reader.ReaderContext;
-import org.apache.pinot.core.io.reader.SingleColumnSingleValueReader;
-
-
-@SuppressWarnings("unchecked")
-public final class SingleValueIterator extends BlockSingleValIterator {
-  private final SingleColumnSingleValueReader _reader;
-  private final int _numDocs;
-  private final ReaderContext _context;
-
-  private int _nextDocId;
-
-  public SingleValueIterator(SingleColumnSingleValueReader reader, int 
numDocs) {
-    _reader = reader;
-    _numDocs = numDocs;
-    _context = _reader.createContext();
-  }
-
-  @Override
-  public int nextIntVal() {
-    return _reader.getInt(_nextDocId++, _context);
-  }
-
-  @Override
-  public long nextLongVal() {
-    return _reader.getLong(_nextDocId++, _context);
-  }
-
-  @Override
-  public float nextFloatVal() {
-    return _reader.getFloat(_nextDocId++, _context);
-  }
-
-  @Override
-  public double nextDoubleVal() {
-    return _reader.getDouble(_nextDocId++, _context);
-  }
-
-  @Override
-  public String nextStringVal() {
-    return _reader.getString(_nextDocId++, _context);
-  }
-
-  @Override
-  public byte[] nextBytesVal() {
-    return _reader.getBytes(_nextDocId++, _context);
-  }
-
-  @Override
-  public boolean hasNext() {
-    return _nextDocId < _numDocs;
-  }
-
-  @Override
-  public void skipTo(int docId) {
-    _nextDocId = docId;
-  }
-
-  @Override
-  public void reset() {
-    _nextDocId = 0;
-  }
-}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/MultiValueSet.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/MultiValueSet.java
index ba0bbe6..1ce6cca 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/MultiValueSet.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/MultiValueSet.java
@@ -19,28 +19,24 @@
 package org.apache.pinot.core.operator.docvalsets;
 
 import org.apache.pinot.core.common.BaseBlockValSet;
+import org.apache.pinot.core.io.reader.ReaderContext;
 import org.apache.pinot.core.io.reader.SingleColumnMultiValueReader;
-import org.apache.pinot.core.operator.docvaliterators.MultiValueIterator;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 
 
+@SuppressWarnings({"rawtypes", "unchecked"})
 public final class MultiValueSet extends BaseBlockValSet {
   private final SingleColumnMultiValueReader _reader;
-  private final int _numDocs;
+  private final ReaderContext _readerContext;
   private final DataType _dataType;
 
-  public MultiValueSet(SingleColumnMultiValueReader reader, int numDocs, 
DataType dataType) {
+  public MultiValueSet(SingleColumnMultiValueReader reader, DataType dataType) 
{
     _reader = reader;
-    _numDocs = numDocs;
+    _readerContext = reader.createContext();
     _dataType = dataType;
   }
 
   @Override
-  public MultiValueIterator iterator() {
-    return new MultiValueIterator(_reader, _numDocs);
-  }
-
-  @Override
   public DataType getValueType() {
     return _dataType;
   }
@@ -49,4 +45,9 @@ public final class MultiValueSet extends BaseBlockValSet {
   public boolean isSingleValue() {
     return false;
   }
+
+  @Override
+  public int getIntValues(int docId, int[] valueBuffer) {
+    return _reader.getIntArray(docId, valueBuffer, _readerContext);
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/SingleValueSet.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/SingleValueSet.java
index f4041df..9a649bc 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/SingleValueSet.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/SingleValueSet.java
@@ -21,28 +21,22 @@ package org.apache.pinot.core.operator.docvalsets;
 import org.apache.pinot.core.common.BaseBlockValSet;
 import org.apache.pinot.core.io.reader.ReaderContext;
 import org.apache.pinot.core.io.reader.SingleColumnSingleValueReader;
-import org.apache.pinot.core.operator.docvaliterators.SingleValueIterator;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 
 
-@SuppressWarnings("unchecked")
+@SuppressWarnings({"rawtypes", "unchecked"})
 public final class SingleValueSet extends BaseBlockValSet {
   private final SingleColumnSingleValueReader _reader;
-  private final int _numDocs;
+  private final ReaderContext _readerContext;
   private final DataType _dataType;
 
-  public SingleValueSet(SingleColumnSingleValueReader reader, int numDocs, 
DataType dataType) {
+  public SingleValueSet(SingleColumnSingleValueReader reader, DataType 
dataType) {
     _reader = reader;
-    _numDocs = numDocs;
+    _readerContext = reader.createContext();
     _dataType = dataType;
   }
 
   @Override
-  public SingleValueIterator iterator() {
-    return new SingleValueIterator(_reader, _numDocs);
-  }
-
-  @Override
   public DataType getValueType() {
     return _dataType;
   }
@@ -53,122 +47,139 @@ public final class SingleValueSet extends BaseBlockValSet 
{
   }
 
   @Override
-  public void getIntValues(int[] inDocIds, int inStartPos, int inDocIdsSize, 
int[] outValues, int outStartPos) {
-    int inEndPos = inStartPos + inDocIdsSize;
-    ReaderContext context = _reader.createContext();
+  public int getIntValue(int docId) {
+    return _reader.getInt(docId, _readerContext);
+  }
+
+  @Override
+  public long getLongValue(int docId) {
+    return _reader.getLong(docId, _readerContext);
+  }
+
+  @Override
+  public float getFloatValue(int docId) {
+    return _reader.getFloat(docId, _readerContext);
+  }
+
+  @Override
+  public double getDoubleValue(int docId) {
+    return _reader.getDouble(docId, _readerContext);
+  }
+
+  @Override
+  public String getStringValue(int docId) {
+    return _reader.getString(docId, _readerContext);
+  }
+
+  @Override
+  public byte[] getBytesValue(int docId) {
+    return _reader.getBytes(docId, _readerContext);
+  }
+
+  @Override
+  public void getDictionaryIds(int[] docIds, int length, int[] dictIdBuffer) {
+    _reader.readValues(docIds, 0, length, dictIdBuffer, 0);
+  }
+
+  @Override
+  public void getIntValues(int[] docIds, int length, int[] valueBuffer) {
     if (_dataType == DataType.INT) {
-      for (int i = inStartPos; i < inEndPos; i++) {
-        outValues[outStartPos++] = _reader.getInt(inDocIds[i], context);
+      for (int i = 0; i < length; i++) {
+        valueBuffer[i] = _reader.getInt(docIds[i], _readerContext);
       }
     } else {
-      throw new UnsupportedOperationException();
+      throw new IllegalStateException(String.format("Cannot read %s as INT", 
_dataType));
     }
   }
 
   @Override
-  public void getLongValues(int[] inDocIds, int inStartPos, int inDocIdsSize, 
long[] outValues, int outStartPos) {
-    int inEndPos = inStartPos + inDocIdsSize;
-    ReaderContext context = _reader.createContext();
+  public void getLongValues(int[] docIds, int length, long[] valueBuffer) {
     switch (_dataType) {
       case INT:
-        for (int i = inStartPos; i < inEndPos; i++) {
-          outValues[outStartPos++] = _reader.getInt(inDocIds[i], context);
+        for (int i = 0; i < length; i++) {
+          valueBuffer[i] = _reader.getInt(docIds[i], _readerContext);
         }
         break;
       case LONG:
-        for (int i = inStartPos; i < inEndPos; i++) {
-          outValues[outStartPos++] = _reader.getLong(inDocIds[i], context);
+        for (int i = 0; i < length; i++) {
+          valueBuffer[i] = _reader.getLong(docIds[i], _readerContext);
         }
         break;
       default:
-        throw new UnsupportedOperationException();
+        throw new IllegalStateException(String.format("Cannot read %s as 
LONG", _dataType));
     }
   }
 
   @Override
-  public void getFloatValues(int[] inDocIds, int inStartPos, int inDocIdsSize, 
float[] outValues, int outStartPos) {
-    int inEndPos = inStartPos + inDocIdsSize;
-    ReaderContext context = _reader.createContext();
+  public void getFloatValues(int[] docIds, int length, float[] valueBuffer) {
     switch (_dataType) {
       case INT:
-        for (int i = inStartPos; i < inEndPos; i++) {
-          outValues[outStartPos++] = _reader.getInt(inDocIds[i], context);
+        for (int i = 0; i < length; i++) {
+          valueBuffer[i] = _reader.getInt(docIds[i], _readerContext);
         }
         break;
       case LONG:
-        for (int i = inStartPos; i < inEndPos; i++) {
-          outValues[outStartPos++] = _reader.getLong(inDocIds[i], context);
+        for (int i = 0; i < length; i++) {
+          valueBuffer[i] = _reader.getLong(docIds[i], _readerContext);
         }
         break;
       case FLOAT:
-        for (int i = inStartPos; i < inEndPos; i++) {
-          outValues[outStartPos++] = _reader.getFloat(inDocIds[i], context);
+        for (int i = 0; i < length; i++) {
+          valueBuffer[i] = _reader.getFloat(docIds[i], _readerContext);
         }
         break;
       default:
-        throw new UnsupportedOperationException();
+        throw new IllegalStateException(String.format("Cannot read %s as 
FLOAT", _dataType));
     }
   }
 
   @Override
-  public void getDoubleValues(int[] inDocIds, int inStartPos, int 
inDocIdsSize, double[] outValues, int outStartPos) {
-    int inEndPos = inStartPos + inDocIdsSize;
-    ReaderContext context = _reader.createContext();
+  public void getDoubleValues(int[] docIds, int length, double[] valueBuffer) {
     switch (_dataType) {
       case INT:
-        for (int i = inStartPos; i < inEndPos; i++) {
-          outValues[outStartPos++] = _reader.getInt(inDocIds[i], context);
+        for (int i = 0; i < length; i++) {
+          valueBuffer[i] = _reader.getInt(docIds[i], _readerContext);
         }
         break;
       case LONG:
-        for (int i = inStartPos; i < inEndPos; i++) {
-          outValues[outStartPos++] = _reader.getLong(inDocIds[i], context);
+        for (int i = 0; i < length; i++) {
+          valueBuffer[i] = _reader.getLong(docIds[i], _readerContext);
         }
         break;
       case FLOAT:
-        for (int i = inStartPos; i < inEndPos; i++) {
-          outValues[outStartPos++] = _reader.getFloat(inDocIds[i], context);
+        for (int i = 0; i < length; i++) {
+          valueBuffer[i] = _reader.getFloat(docIds[i], _readerContext);
         }
         break;
       case DOUBLE:
-        for (int i = inStartPos; i < inEndPos; i++) {
-          outValues[outStartPos++] = _reader.getDouble(inDocIds[i], context);
+        for (int i = 0; i < length; i++) {
+          valueBuffer[i] = _reader.getDouble(docIds[i], _readerContext);
         }
         break;
       default:
-        throw new UnsupportedOperationException();
+        throw new IllegalStateException(String.format("Cannot read %s as 
DOUBLE", _dataType));
     }
   }
 
   @Override
-  public void getStringValues(int[] inDocIds, int inStartPos, int 
inDocIdsSize, String[] outValues, int outStartPos) {
-    int inEndPos = inStartPos + inDocIdsSize;
-    ReaderContext context = _reader.createContext();
+  public void getStringValues(int[] docIds, int length, String[] valueBuffer) {
     if (_dataType == DataType.STRING) {
-      for (int i = inStartPos; i < inEndPos; i++) {
-        outValues[outStartPos++] = _reader.getString(inDocIds[i], context);
+      for (int i = 0; i < length; i++) {
+        valueBuffer[i] = _reader.getString(docIds[i], _readerContext);
       }
     } else {
-      throw new UnsupportedOperationException();
+      throw new IllegalStateException(String.format("Cannot read %s as 
STRING", _dataType));
     }
   }
 
   @Override
-  public void getBytesValues(int[] inDocIds, int inStartPos, int inDocIdsSize, 
byte[][] outValues, int outStartPos) {
-    int inEndPos = inStartPos + inDocIdsSize;
-    ReaderContext context = _reader.createContext();
+  public void getBytesValues(int[] docIds, int length, byte[][] valueBuffer) {
     if (_dataType.equals(DataType.BYTES)) {
-      for (int i = inStartPos; i < inEndPos; i++) {
-        outValues[outStartPos++] = _reader.getBytes(inDocIds[i], context);
+      for (int i = 0; i < length; i++) {
+        valueBuffer[i] = _reader.getBytes(docIds[i], _readerContext);
       }
     } else {
-      throw new UnsupportedOperationException();
+      throw new IllegalStateException(String.format("Cannot read %s as BYTES", 
_dataType));
     }
   }
-
-  @Override
-  public void getDictionaryIds(int[] inDocIds, int inStartPos, int 
inDocIdsSize, int[] outDictionaryIds,
-      int outStartPos) {
-    _reader.readValues(inDocIds, inStartPos, inDocIdsSize, outDictionaryIds, 
outStartPos);
-  }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/common/RealtimeNoDictionaryTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/common/RealtimeNoDictionaryTest.java
index a0a5b95..79c9e53 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/common/RealtimeNoDictionaryTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/common/RealtimeNoDictionaryTest.java
@@ -110,17 +110,19 @@ public class RealtimeNoDictionaryTest {
 
     Map<String, DataSource> dataSourceBlock = new HashMap<>();
     dataSourceBlock.put(INT_COL_NAME,
-        new MutableDataSource(intSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, 
intRawIndex, null, null, null,null, null));
+        new MutableDataSource(intSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, 
intRawIndex, null, null, null, null, null));
     dataSourceBlock.put(LONG_COL_NAME,
-        new MutableDataSource(longSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, 
longRawIndex, null, null, null,null, null));
+        new MutableDataSource(longSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, 
longRawIndex, null, null, null, null, null));
     dataSourceBlock.put(FLOAT_COL_NAME,
-        new MutableDataSource(floatSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, 
floatRawIndex, null, null, null,null, null));
+        new MutableDataSource(floatSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, 
floatRawIndex, null, null, null, null, null));
     dataSourceBlock.put(DOUBLE_COL_NAME,
-        new MutableDataSource(doubleSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, 
doubleRawIndex, null, null, null,null, null));
+        new MutableDataSource(doubleSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, 
doubleRawIndex, null, null, null, null,
+            null));
     dataSourceBlock.put(STRING_COL_NAME,
-        new MutableDataSource(stringSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, 
stringRawIndex, null, null, null,null, null));
+        new MutableDataSource(stringSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, 
stringRawIndex, null, null, null, null,
+            null));
     dataSourceBlock.put(BYTES_COL_NAME,
-        new MutableDataSource(bytesSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, 
bytesRawIndex, null, null, null,null, null));
+        new MutableDataSource(bytesSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, 
bytesRawIndex, null, null, null, null, null));
 
     return new DataFetcher(dataSourceBlock);
   }
@@ -204,7 +206,7 @@ public class RealtimeNoDictionaryTest {
         int[] intValues = new int[NUM_ROWS];
         dataFetcher.fetchIntValues(LONG_COL_NAME, docIds, numDocIds, 
intValues);
         Assert.fail("Expected exception converting long to int");
-      } catch (UnsupportedOperationException e) {
+      } catch (IllegalStateException e) {
         // We should see an exception
       }
 
@@ -245,7 +247,7 @@ public class RealtimeNoDictionaryTest {
         int[] intValues = new int[NUM_ROWS];
         dataFetcher.fetchIntValues(FLOAT_COL_NAME, docIds, numDocIds, 
intValues);
         Assert.fail("Expected exception converting float to int");
-      } catch (UnsupportedOperationException e) {
+      } catch (IllegalStateException e) {
         // We should see an exception
       }
 
@@ -253,7 +255,7 @@ public class RealtimeNoDictionaryTest {
       try {
         dataFetcher.fetchLongValues(FLOAT_COL_NAME, docIds, numDocIds, 
longValues);
         Assert.fail("Expected exception converting float to long");
-      } catch (UnsupportedOperationException e) {
+      } catch (IllegalStateException e) {
         // We should see an exception
       }
 
@@ -288,7 +290,7 @@ public class RealtimeNoDictionaryTest {
         int[] intValues = new int[NUM_ROWS];
         dataFetcher.fetchIntValues(DOUBLE_COL_NAME, docIds, numDocIds, 
intValues);
         Assert.fail("Expected exception converting double to int");
-      } catch (UnsupportedOperationException e) {
+      } catch (IllegalStateException e) {
         // We should see an exception
       }
 
@@ -296,7 +298,7 @@ public class RealtimeNoDictionaryTest {
       try {
         dataFetcher.fetchLongValues(DOUBLE_COL_NAME, docIds, numDocIds, 
longValues);
         Assert.fail("Expected exception converting double to long");
-      } catch (UnsupportedOperationException e) {
+      } catch (IllegalStateException e) {
         // We should see an exception
       }
 
@@ -304,7 +306,7 @@ public class RealtimeNoDictionaryTest {
       try {
         dataFetcher.fetchFloatValues(DOUBLE_COL_NAME, docIds, numDocIds, 
floatValues);
         Assert.fail("Expected exception converting double to float");
-      } catch (UnsupportedOperationException e) {
+      } catch (IllegalStateException e) {
         // We should see an exception
       }
 
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTest.java
index 07a71a3..67ff4e1 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTest.java
@@ -24,13 +24,13 @@ import java.util.Collections;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.segment.ReadMode;
 import org.apache.pinot.common.utils.CommonConstants;
-import org.apache.pinot.core.common.BlockMultiValIterator;
-import org.apache.pinot.core.common.BlockSingleValIterator;
 import org.apache.pinot.core.common.DataSource;
 import org.apache.pinot.core.common.DataSourceMetadata;
 import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.core.operator.docvalsets.MultiValueSet;
+import org.apache.pinot.core.operator.docvalsets.SingleValueSet;
 import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
 import 
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.core.segment.index.metadata.SegmentMetadata;
@@ -49,6 +49,8 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import static org.testng.Assert.assertEquals;
+
 
 public class MutableSegmentImplTest {
   private static final String AVRO_FILE = "data/test_data-mv.avro";
@@ -100,24 +102,24 @@ public class MutableSegmentImplTest {
   public void testMetadata() {
     SegmentMetadata actualSegmentMetadata = 
_mutableSegmentImpl.getSegmentMetadata();
     SegmentMetadata expectedSegmentMetadata = 
_immutableSegment.getSegmentMetadata();
-    Assert.assertEquals(actualSegmentMetadata.getTotalDocs(), 
expectedSegmentMetadata.getTotalDocs());
+    assertEquals(actualSegmentMetadata.getTotalDocs(), 
expectedSegmentMetadata.getTotalDocs());
 
     // assert that the last indexed timestamp is close to what we expect
     long actualTs = 
_mutableSegmentImpl.getSegmentMetadata().getLastIndexedTimestamp();
     Assert.assertTrue(actualTs >= _startTimeMs);
     Assert.assertTrue(actualTs <= _lastIndexedTs);
 
-    
Assert.assertEquals(_mutableSegmentImpl.getSegmentMetadata().getLatestIngestionTimestamp(),
 _lastIngestionTimeMs);
+    
assertEquals(_mutableSegmentImpl.getSegmentMetadata().getLatestIngestionTimestamp(),
 _lastIngestionTimeMs);
 
     for (FieldSpec fieldSpec : _schema.getAllFieldSpecs()) {
       String column = fieldSpec.getName();
       DataSourceMetadata actualDataSourceMetadata = 
_mutableSegmentImpl.getDataSource(column).getDataSourceMetadata();
       DataSourceMetadata expectedDataSourceMetadata = 
_immutableSegment.getDataSource(column).getDataSourceMetadata();
-      Assert.assertEquals(actualDataSourceMetadata.getDataType(), 
expectedDataSourceMetadata.getDataType());
-      Assert.assertEquals(actualDataSourceMetadata.isSingleValue(), 
expectedDataSourceMetadata.isSingleValue());
-      Assert.assertEquals(actualDataSourceMetadata.getNumDocs(), 
expectedDataSourceMetadata.getNumDocs());
+      assertEquals(actualDataSourceMetadata.getDataType(), 
expectedDataSourceMetadata.getDataType());
+      assertEquals(actualDataSourceMetadata.isSingleValue(), 
expectedDataSourceMetadata.isSingleValue());
+      assertEquals(actualDataSourceMetadata.getNumDocs(), 
expectedDataSourceMetadata.getNumDocs());
       if (!expectedDataSourceMetadata.isSingleValue()) {
-        
Assert.assertEquals(actualDataSourceMetadata.getMaxNumValuesPerMVEntry(),
+        assertEquals(actualDataSourceMetadata.getMaxNumValuesPerMVEntry(),
             expectedDataSourceMetadata.getMaxNumValuesPerMVEntry());
       }
     }
@@ -131,26 +133,26 @@ public class MutableSegmentImplTest {
         DataSource actualDataSource = 
_mutableSegmentImpl.getDataSource(column);
         DataSource expectedDataSource = 
_immutableSegment.getDataSource(column);
 
+        int actualNumDocs = 
actualDataSource.getDataSourceMetadata().getNumDocs();
+        int expectedNumDocs = 
expectedDataSource.getDataSourceMetadata().getNumDocs();
+        assertEquals(actualNumDocs, expectedNumDocs);
+
         Dictionary actualDictionary = actualDataSource.getDictionary();
         Dictionary expectedDictionary = expectedDataSource.getDictionary();
-        Assert.assertEquals(actualDictionary.length(), 
expectedDictionary.length());
+        assertEquals(actualDictionary.length(), expectedDictionary.length());
 
-        BlockSingleValIterator actualSVIterator =
-            (BlockSingleValIterator) 
actualDataSource.nextBlock().getBlockValueSet().iterator();
-        BlockSingleValIterator expectedSVIterator =
-            (BlockSingleValIterator) 
expectedDataSource.nextBlock().getBlockValueSet().iterator();
-
-        while (expectedSVIterator.hasNext()) {
-          Assert.assertTrue(actualSVIterator.hasNext());
+        // Allow the segment name to be different
+        if 
(column.equals(CommonConstants.Segment.BuiltInVirtualColumn.SEGMENTNAME)) {
+          continue;
+        }
 
-          int actualDictId = actualSVIterator.nextIntVal();
-          int expectedDictId = expectedSVIterator.nextIntVal();
-          // Only allow the default segment name to be different
-          if 
(!column.equals(CommonConstants.Segment.BuiltInVirtualColumn.SEGMENTNAME)) {
-            Assert.assertEquals(actualDictionary.get(actualDictId), 
expectedDictionary.get(expectedDictId));
-          }
+        SingleValueSet actualValueSet = (SingleValueSet) 
actualDataSource.nextBlock().getBlockValueSet();
+        SingleValueSet expectedValueSet = (SingleValueSet) 
expectedDataSource.nextBlock().getBlockValueSet();
+        for (int docId = 0; docId < expectedNumDocs; docId++) {
+          int actualDictId = actualValueSet.getIntValue(docId);
+          int expectedDictId = expectedValueSet.getIntValue(docId);
+          assertEquals(actualDictionary.get(actualDictId), 
expectedDictionary.get(expectedDictId));
         }
-        Assert.assertFalse(actualSVIterator.hasNext());
       }
     }
   }
@@ -163,31 +165,29 @@ public class MutableSegmentImplTest {
         DataSource actualDataSource = 
_mutableSegmentImpl.getDataSource(column);
         DataSource expectedDataSource = 
_immutableSegment.getDataSource(column);
 
+        int actualNumDocs = 
actualDataSource.getDataSourceMetadata().getNumDocs();
+        int expectedNumDocs = 
expectedDataSource.getDataSourceMetadata().getNumDocs();
+        assertEquals(actualNumDocs, expectedNumDocs);
+
         Dictionary actualDictionary = actualDataSource.getDictionary();
         Dictionary expectedDictionary = expectedDataSource.getDictionary();
-        Assert.assertEquals(actualDictionary.length(), 
expectedDictionary.length());
-
-        BlockMultiValIterator actualMVIterator =
-            (BlockMultiValIterator) 
actualDataSource.nextBlock().getBlockValueSet().iterator();
-        BlockMultiValIterator expectedMVIterator =
-            (BlockMultiValIterator) 
expectedDataSource.nextBlock().getBlockValueSet().iterator();
+        assertEquals(actualDictionary.length(), expectedDictionary.length());
 
         int maxNumValuesPerMVEntry = 
expectedDataSource.getDataSourceMetadata().getMaxNumValuesPerMVEntry();
         int[] actualDictIds = new int[maxNumValuesPerMVEntry];
         int[] expectedDictIds = new int[maxNumValuesPerMVEntry];
 
-        while (expectedMVIterator.hasNext()) {
-          Assert.assertTrue(actualMVIterator.hasNext());
-
-          int actualNumMultiValues = 
actualMVIterator.nextIntVal(actualDictIds);
-          int expectedNumMultiValues = 
expectedMVIterator.nextIntVal(expectedDictIds);
-          Assert.assertEquals(actualNumMultiValues, expectedNumMultiValues);
+        MultiValueSet actualValueSet = (MultiValueSet) 
actualDataSource.nextBlock().getBlockValueSet();
+        MultiValueSet expectedValueSet = (MultiValueSet) 
expectedDataSource.nextBlock().getBlockValueSet();
+        for (int docId = 0; docId < expectedNumDocs; docId++) {
+          int actualLength = actualValueSet.getIntValues(docId, actualDictIds);
+          int expectedLength = expectedValueSet.getIntValues(docId, 
expectedDictIds);
+          assertEquals(actualLength, expectedLength);
 
-          for (int i = 0; i < expectedNumMultiValues; i++) {
-            Assert.assertEquals(actualDictionary.get(actualDictIds[i]), 
expectedDictionary.get(expectedDictIds[i]));
+          for (int i = 0; i < expectedLength; i++) {
+            assertEquals(actualDictionary.get(actualDictIds[i]), 
expectedDictionary.get(expectedDictIds[i]));
           }
         }
-        Assert.assertFalse(actualMVIterator.hasNext());
       }
     }
   }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/operator/docvaliterators/RealtimeSingleValueIteratorTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/operator/docvaliterators/RealtimeSingleValueIteratorTest.java
deleted file mode 100644
index 02f3a04..0000000
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/operator/docvaliterators/RealtimeSingleValueIteratorTest.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.operator.docvaliterators;
-
-import java.io.IOException;
-import java.util.Random;
-import org.apache.pinot.core.io.readerwriter.PinotDataBufferMemoryManager;
-import 
org.apache.pinot.core.io.readerwriter.impl.FixedByteSingleColumnSingleValueReaderWriter;
-import org.apache.pinot.core.io.writer.impl.DirectMemoryManager;
-import org.testng.Assert;
-import org.testng.annotations.AfterTest;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-
-public class RealtimeSingleValueIteratorTest {
-  private static final int NUM_ROWS = 1000;
-  private static final long RANDOM_SEED = System.currentTimeMillis();
-  private static final Random RANDOM = new Random(RANDOM_SEED);
-  private int[] _intVals = new int[NUM_ROWS];
-  private long[] _longVals = new long[NUM_ROWS];
-  private float[] _floatVals = new float[NUM_ROWS];
-  private double[] _doubleVals = new double[NUM_ROWS];
-  FixedByteSingleColumnSingleValueReaderWriter _intReader;
-  FixedByteSingleColumnSingleValueReaderWriter _longReader;
-  FixedByteSingleColumnSingleValueReaderWriter _floatReader;
-  FixedByteSingleColumnSingleValueReaderWriter _doubleReader;
-  private PinotDataBufferMemoryManager _memoryManager;
-
-  @BeforeClass
-  public void setUp() {
-    _memoryManager = new 
DirectMemoryManager(RealtimeSingleValueIteratorTest.class.getName());
-    _intReader =
-        new 
FixedByteSingleColumnSingleValueReaderWriter(RANDOM.nextInt(NUM_ROWS) + 1, 
Integer.BYTES, _memoryManager,
-            "intReader");
-    _longReader =
-        new 
FixedByteSingleColumnSingleValueReaderWriter(RANDOM.nextInt(NUM_ROWS) + 1, 
Long.BYTES, _memoryManager,
-            "longReader");
-    _floatReader =
-        new 
FixedByteSingleColumnSingleValueReaderWriter(RANDOM.nextInt(NUM_ROWS) + 1, 
Float.BYTES, _memoryManager,
-            "floatReader");
-    _doubleReader =
-        new 
FixedByteSingleColumnSingleValueReaderWriter(RANDOM.nextInt(NUM_ROWS) + 1, 
Double.BYTES, _memoryManager,
-            "doubleReader");
-
-    for (int i = 0; i < NUM_ROWS; i++) {
-      _intVals[i] = RANDOM.nextInt();
-      _intReader.setInt(i, _intVals[i]);
-      _longVals[i] = RANDOM.nextLong();
-      _longReader.setLong(i, _longVals[i]);
-      _floatVals[i] = RANDOM.nextFloat();
-      _floatReader.setFloat(i, _floatVals[i]);
-      _doubleVals[i] = RANDOM.nextDouble();
-      _doubleReader.setDouble(i, _doubleVals[i]);
-    }
-  }
-
-  @Test
-  public void testIntReader()
-      throws Exception {
-    try {
-      SingleValueIterator iterator = new SingleValueIterator(_intReader, 
NUM_ROWS);
-      // Test all values
-      iterator.reset();
-      for (int i = 0; i < NUM_ROWS; i++) {
-        Assert.assertEquals(iterator.nextIntVal(), _intVals[i], " at row " + 
i);
-      }
-
-      final int startDocId = RANDOM.nextInt(NUM_ROWS);
-      iterator.skipTo(startDocId);
-      for (int i = startDocId; i < NUM_ROWS; i++) {
-        Assert.assertEquals(iterator.nextIntVal(), _intVals[i], " at row " + 
i);
-      }
-    } catch (Throwable t) {
-      t.printStackTrace();
-      Assert.fail("Failed with seed " + RANDOM_SEED);
-    }
-  }
-
-  @Test
-  public void testLongReader()
-      throws Exception {
-    try {
-      SingleValueIterator iterator = new SingleValueIterator(_longReader, 
NUM_ROWS);
-      // Test all values
-      iterator.reset();
-      for (int i = 0; i < NUM_ROWS; i++) {
-        Assert.assertEquals(iterator.nextLongVal(), _longVals[i], " at row " + 
i);
-      }
-
-      final int startDocId = RANDOM.nextInt(NUM_ROWS);
-      iterator.skipTo(startDocId);
-      for (int i = startDocId; i < NUM_ROWS; i++) {
-        Assert.assertEquals(iterator.nextLongVal(), _longVals[i], " at row " + 
i);
-      }
-    } catch (Throwable t) {
-      t.printStackTrace();
-      Assert.fail("Failed with seed " + RANDOM_SEED);
-    }
-  }
-
-  @Test
-  public void testFloatReader()
-      throws Exception {
-    try {
-      SingleValueIterator iterator = new SingleValueIterator(_floatReader, 
NUM_ROWS);
-      // Test all values
-      iterator.reset();
-      for (int i = 0; i < NUM_ROWS; i++) {
-        Assert.assertEquals(iterator.nextFloatVal(), _floatVals[i], " at row " 
+ i);
-      }
-
-      final int startDocId = RANDOM.nextInt(NUM_ROWS);
-      iterator.skipTo(startDocId);
-      for (int i = startDocId; i < NUM_ROWS; i++) {
-        Assert.assertEquals(iterator.nextFloatVal(), _floatVals[i], " at row " 
+ i);
-      }
-    } catch (Throwable t) {
-      t.printStackTrace();
-      Assert.fail("Failed with seed " + RANDOM_SEED);
-    }
-  }
-
-  @Test
-  public void testDoubleReader()
-      throws Exception {
-    try {
-      SingleValueIterator iterator = new SingleValueIterator(_doubleReader, 
NUM_ROWS);
-      // Test all values
-      iterator.reset();
-      for (int i = 0; i < NUM_ROWS; i++) {
-        Assert.assertEquals(iterator.nextDoubleVal(), _doubleVals[i], " at row 
" + i);
-      }
-
-      final int startDocId = RANDOM.nextInt(NUM_ROWS);
-      iterator.skipTo(startDocId);
-      for (int i = startDocId; i < NUM_ROWS; i++) {
-        Assert.assertEquals(iterator.nextDoubleVal(), _doubleVals[i], " at row 
" + i);
-      }
-    } catch (Throwable t) {
-      t.printStackTrace();
-      Assert.fail("Failed with seed " + RANDOM_SEED);
-    }
-  }
-
-  @AfterTest
-  public void tearDown()
-      throws IOException {
-    _memoryManager.close();
-  }
-}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java
index daa5ddb..5c4c864 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java
@@ -38,7 +38,6 @@ import org.apache.pinot.common.segment.ReadMode;
 import org.apache.pinot.common.utils.request.FilterQueryTree;
 import org.apache.pinot.common.utils.request.RequestUtils;
 import org.apache.pinot.core.common.BlockDocIdIterator;
-import org.apache.pinot.core.common.BlockSingleValIterator;
 import org.apache.pinot.core.common.Constants;
 import org.apache.pinot.core.common.DataSource;
 import org.apache.pinot.core.data.aggregator.ValueAggregator;
@@ -46,6 +45,7 @@ import 
org.apache.pinot.core.data.readers.GenericRowRecordReader;
 import org.apache.pinot.core.indexsegment.IndexSegment;
 import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.core.operator.docvalsets.SingleValueSet;
 import org.apache.pinot.core.plan.FilterPlanNode;
 import org.apache.pinot.core.plan.PlanNode;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
@@ -214,45 +214,43 @@ abstract class BaseStarTreeV2Test<R, A> {
     } else {
       starTreeFilterPlanNode = new StarTreeFilterPlanNode(_starTreeV2, 
rootFilterNode, groupByColumnSet, null);
     }
-    List<BlockSingleValIterator> starTreeAggregationColumnValueIterators = new 
ArrayList<>(numAggregations);
+    List<SingleValueSet> starTreeAggregationColumnValueSets = new 
ArrayList<>(numAggregations);
     for (AggregationFunctionColumnPair aggregationFunctionColumnPair : 
functionColumnPairs) {
-      starTreeAggregationColumnValueIterators.add(
-          (BlockSingleValIterator) 
_starTreeV2.getDataSource(aggregationFunctionColumnPair.toColumnName()).nextBlock()
-              .getBlockValueSet().iterator());
+      starTreeAggregationColumnValueSets.add(
+          (SingleValueSet) 
_starTreeV2.getDataSource(aggregationFunctionColumnPair.toColumnName()).nextBlock()
+              .getBlockValueSet());
     }
-    List<BlockSingleValIterator> starTreeGroupByColumnValueIterators = new 
ArrayList<>(numGroupByColumns);
+    List<SingleValueSet> starTreeGroupByColumnValueSets = new 
ArrayList<>(numGroupByColumns);
     for (String groupByColumn : groupByColumns) {
-      starTreeGroupByColumnValueIterators.add(
-          (BlockSingleValIterator) 
_starTreeV2.getDataSource(groupByColumn).nextBlock().getBlockValueSet().iterator());
+      starTreeGroupByColumnValueSets
+          .add((SingleValueSet) 
_starTreeV2.getDataSource(groupByColumn).nextBlock().getBlockValueSet());
     }
     Map<List<Integer>, List<Object>> starTreeResult =
-        computeStarTreeResult(starTreeFilterPlanNode, 
starTreeAggregationColumnValueIterators,
-            starTreeGroupByColumnValueIterators);
+        computeStarTreeResult(starTreeFilterPlanNode, 
starTreeAggregationColumnValueSets,
+            starTreeGroupByColumnValueSets);
 
     // Extract values without star-tree
     PlanNode nonStarTreeFilterPlanNode = new FilterPlanNode(_indexSegment, 
brokerRequest);
-    List<BlockSingleValIterator> nonStarTreeAggregationColumnValueIterators = 
new ArrayList<>(numAggregations);
+    List<SingleValueSet> nonStarTreeAggregationColumnValueSets = new 
ArrayList<>(numAggregations);
     List<Dictionary> nonStarTreeAggregationColumnDictionaries = new 
ArrayList<>(numAggregations);
     for (AggregationFunctionColumnPair aggregationFunctionColumnPair : 
functionColumnPairs) {
       if (aggregationFunctionColumnPair.getFunctionType() == 
AggregationFunctionType.COUNT) {
-        nonStarTreeAggregationColumnValueIterators.add(null);
+        nonStarTreeAggregationColumnValueSets.add(null);
         nonStarTreeAggregationColumnDictionaries.add(null);
       } else {
         DataSource dataSource = 
_indexSegment.getDataSource(aggregationFunctionColumnPair.getColumn());
-        nonStarTreeAggregationColumnValueIterators
-            .add((BlockSingleValIterator) 
dataSource.nextBlock().getBlockValueSet().iterator());
+        nonStarTreeAggregationColumnValueSets.add((SingleValueSet) 
dataSource.nextBlock().getBlockValueSet());
         
nonStarTreeAggregationColumnDictionaries.add(dataSource.getDictionary());
       }
     }
-    List<BlockSingleValIterator> nonStarTreeGroupByColumnValueIterators = new 
ArrayList<>(numGroupByColumns);
+    List<SingleValueSet> nonStarTreeGroupByColumnValueSets = new 
ArrayList<>(numGroupByColumns);
     for (String groupByColumn : groupByColumns) {
-      nonStarTreeGroupByColumnValueIterators.add(
-          (BlockSingleValIterator) 
_indexSegment.getDataSource(groupByColumn).nextBlock().getBlockValueSet()
-              .iterator());
+      nonStarTreeGroupByColumnValueSets
+          .add((SingleValueSet) 
_indexSegment.getDataSource(groupByColumn).nextBlock().getBlockValueSet());
     }
     Map<List<Integer>, List<Object>> nonStarTreeResult =
-        computeNonStarTreeResult(nonStarTreeFilterPlanNode, 
nonStarTreeAggregationColumnValueIterators,
-            nonStarTreeAggregationColumnDictionaries, 
nonStarTreeGroupByColumnValueIterators);
+        computeNonStarTreeResult(nonStarTreeFilterPlanNode, 
nonStarTreeAggregationColumnValueSets,
+            nonStarTreeAggregationColumnDictionaries, 
nonStarTreeGroupByColumnValueSets);
 
     // Assert results
     assertEquals(starTreeResult.size(), nonStarTreeResult.size());
@@ -269,46 +267,42 @@ abstract class BaseStarTreeV2Test<R, A> {
 
   @SuppressWarnings("unchecked")
   private Map<List<Integer>, List<Object>> computeStarTreeResult(PlanNode 
starTreeFilterPlanNode,
-      List<BlockSingleValIterator> aggregationColumnValueIterators,
-      List<BlockSingleValIterator> groupByColumnValueIterators) {
+      List<SingleValueSet> aggregationColumnValueSets, List<SingleValueSet> 
groupByColumnValueSets) {
     Map<List<Integer>, List<Object>> result = new HashMap<>();
-    int numAggregations = aggregationColumnValueIterators.size();
-    int numGroupByColumns = groupByColumnValueIterators.size();
+    int numAggregations = aggregationColumnValueSets.size();
+    int numGroupByColumns = groupByColumnValueSets.size();
     BlockDocIdIterator docIdIterator = 
starTreeFilterPlanNode.run().nextBlock().getBlockDocIdSet().iterator();
     int docId;
     while ((docId = docIdIterator.next()) != Constants.EOF) {
       // Array of dictionary Ids (zero-length array for non-group-by queries)
       List<Integer> group = new ArrayList<>(numGroupByColumns);
-      for (BlockSingleValIterator valueIterator : groupByColumnValueIterators) 
{
-        valueIterator.skipTo(docId);
-        group.add(valueIterator.nextIntVal());
+      for (SingleValueSet valueSet : groupByColumnValueSets) {
+        group.add(valueSet.getIntValue(docId));
       }
       List<Object> values = result.computeIfAbsent(group, k -> new 
ArrayList<>(numAggregations));
       if (values.isEmpty()) {
-        for (BlockSingleValIterator valueIterator : 
aggregationColumnValueIterators) {
-          valueIterator.skipTo(docId);
-          values.add(getNextAggregatedValue(valueIterator));
+        for (SingleValueSet valueSet : aggregationColumnValueSets) {
+          values.add(getAggregatedValue(valueSet, docId));
         }
       } else {
         for (int i = 0; i < numAggregations; i++) {
           Object value = values.get(i);
-          BlockSingleValIterator valueIterator = 
aggregationColumnValueIterators.get(i);
-          valueIterator.skipTo(docId);
-          values.set(i, _valueAggregator.applyAggregatedValue(value, 
getNextAggregatedValue(valueIterator)));
+          SingleValueSet valueSet = aggregationColumnValueSets.get(i);
+          values.set(i, _valueAggregator.applyAggregatedValue(value, 
getAggregatedValue(valueSet, docId)));
         }
       }
     }
     return result;
   }
 
-  private Object getNextAggregatedValue(BlockSingleValIterator valueIterator) {
+  private Object getAggregatedValue(SingleValueSet valueSet, int docId) {
     switch (_aggregatedValueType) {
       case LONG:
-        return valueIterator.nextLongVal();
+        return valueSet.getLongValue(docId);
       case DOUBLE:
-        return valueIterator.nextDoubleVal();
+        return valueSet.getDoubleValue(docId);
       case BYTES:
-        return 
_valueAggregator.deserializeAggregatedValue(valueIterator.nextBytesVal());
+        return 
_valueAggregator.deserializeAggregatedValue(valueSet.getBytesValue(docId));
       default:
         throw new IllegalStateException();
     }
@@ -316,43 +310,40 @@ abstract class BaseStarTreeV2Test<R, A> {
 
   @SuppressWarnings("unchecked")
   private Map<List<Integer>, List<Object>> computeNonStarTreeResult(PlanNode 
nonStarTreeFilterPlanNode,
-      List<BlockSingleValIterator> aggregationColumnValueIterators, 
List<Dictionary> aggregationColumnDictionaries,
-      List<BlockSingleValIterator> groupByColumnValueIterators) {
+      List<SingleValueSet> aggregationColumnValueSets, List<Dictionary> 
aggregationColumnDictionaries,
+      List<SingleValueSet> groupByColumnValueSets) {
     Map<List<Integer>, List<Object>> result = new HashMap<>();
-    int numAggregations = aggregationColumnValueIterators.size();
-    int numGroupByColumns = groupByColumnValueIterators.size();
+    int numAggregations = aggregationColumnValueSets.size();
+    int numGroupByColumns = groupByColumnValueSets.size();
     BlockDocIdIterator docIdIterator = 
nonStarTreeFilterPlanNode.run().nextBlock().getBlockDocIdSet().iterator();
     int docId;
     while ((docId = docIdIterator.next()) != Constants.EOF) {
       // Array of dictionary Ids (zero-length array for non-group-by queries)
       List<Integer> group = new ArrayList<>(numGroupByColumns);
-      for (BlockSingleValIterator valueIterator : groupByColumnValueIterators) 
{
-        valueIterator.skipTo(docId);
-        group.add(valueIterator.nextIntVal());
+      for (SingleValueSet valueSet : groupByColumnValueSets) {
+        group.add(valueSet.getIntValue(docId));
       }
       List<Object> values = result.computeIfAbsent(group, k -> new 
ArrayList<>(numAggregations));
       if (values.isEmpty()) {
         for (int i = 0; i < numAggregations; i++) {
-          BlockSingleValIterator valueIterator = 
aggregationColumnValueIterators.get(i);
-          if (valueIterator == null) {
+          SingleValueSet valueSet = aggregationColumnValueSets.get(i);
+          if (valueSet == null) {
             // COUNT aggregation function
             values.add(1L);
           } else {
-            valueIterator.skipTo(docId);
-            Object rawValue = getNextRawValue(valueIterator, 
aggregationColumnDictionaries.get(i));
+            Object rawValue = getNextRawValue(valueSet, docId, 
aggregationColumnDictionaries.get(i));
             values.add(_valueAggregator.getInitialAggregatedValue(rawValue));
           }
         }
       } else {
         for (int i = 0; i < numAggregations; i++) {
           Object value = values.get(i);
-          BlockSingleValIterator valueIterator = 
aggregationColumnValueIterators.get(i);
-          if (valueIterator == null) {
+          SingleValueSet valueSet = aggregationColumnValueSets.get(i);
+          if (valueSet == null) {
             // COUNT aggregation function
             value = (Long) value + 1;
           } else {
-            valueIterator.skipTo(docId);
-            Object rawValue = getNextRawValue(valueIterator, 
aggregationColumnDictionaries.get(i));
+            Object rawValue = getNextRawValue(valueSet, docId, 
aggregationColumnDictionaries.get(i));
             value = _valueAggregator.applyRawValue(value, rawValue);
           }
           values.set(i, value);
@@ -362,9 +353,8 @@ abstract class BaseStarTreeV2Test<R, A> {
     return result;
   }
 
-  private Object getNextRawValue(BlockSingleValIterator valueIterator, 
Dictionary dictionary) {
-    int dictId = valueIterator.nextIntVal();
-    return dictionary.get(dictId);
+  private Object getNextRawValue(SingleValueSet valueSet, int docId, 
Dictionary dictionary) {
+    return dictionary.get(valueSet.getIntValue(docId));
   }
 
   abstract ValueAggregator<R, A> getValueAggregator();
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/StarTreeIndexViewer.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/StarTreeIndexViewer.java
index 05625cf..2afd8db 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/StarTreeIndexViewer.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/StarTreeIndexViewer.java
@@ -39,9 +39,6 @@ import javax.ws.rs.core.MediaType;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.common.segment.ReadMode;
-import org.apache.pinot.core.common.Block;
-import org.apache.pinot.core.common.BlockSingleValIterator;
-import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.common.DataSource;
 import org.apache.pinot.core.indexsegment.IndexSegment;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
@@ -91,22 +88,16 @@ public class StarTreeIndexViewer {
   static int MAX_CHILDREN = 100;
   private List<String> _dimensionNames;
   private Map<String, Dictionary> dictionaries;
-  private Map<String, BlockSingleValIterator> valueIterators;
 
   public StarTreeIndexViewer(File segmentDir)
       throws Exception {
     IndexSegment indexSegment = ImmutableSegmentLoader.load(segmentDir, 
ReadMode.heap);
 
     dictionaries = new HashMap<>();
-    valueIterators = new HashMap<>();
     SegmentMetadataImpl metadata = new SegmentMetadataImpl(segmentDir);
 
     for (String columnName : metadata.getAllColumns()) {
       DataSource dataSource = indexSegment.getDataSource(columnName);
-      Block block = dataSource.nextBlock();
-      BlockValSet blockValSet = block.getBlockValueSet();
-      BlockSingleValIterator itr = (BlockSingleValIterator) 
blockValSet.iterator();
-      valueIterators.put(columnName, itr);
       dictionaries.put(columnName, dataSource.getDictionary());
     }
     StarTree tree = indexSegment.getStarTrees().get(0).getStarTree();
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/scan/query/Projection.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/scan/query/Projection.java
index a4fcdfa..e14aaf3 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/scan/query/Projection.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/scan/query/Projection.java
@@ -26,9 +26,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.commons.lang.ArrayUtils;
-import org.apache.pinot.core.common.BlockMultiValIterator;
-import org.apache.pinot.core.common.BlockSingleValIterator;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
+import org.apache.pinot.core.operator.docvalsets.MultiValueSet;
+import org.apache.pinot.core.operator.docvalsets.SingleValueSet;
 import org.apache.pinot.core.query.utils.Pair;
 import org.apache.pinot.core.segment.index.metadata.ColumnMetadata;
 import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
@@ -79,23 +79,20 @@ public class Projection {
     for (Pair pair : _columnList) {
       String column = (String) pair.getFirst();
       if (!_mvColumns.contains(column)) {
-        BlockSingleValIterator bvIter =
-            (BlockSingleValIterator) 
_immutableSegment.getDataSource(column).nextBlock().getBlockValueSet().iterator();
+        SingleValueSet valueSet =
+            (SingleValueSet) 
_immutableSegment.getDataSource(column).nextBlock().getBlockValueSet();
 
         int rowId = 0;
         for (Integer docId : _filteredDocIds) {
-          bvIter.skipTo(docId);
-          resultTable.add(rowId++, bvIter.nextIntVal());
+          resultTable.add(rowId++, valueSet.getIntValue(docId));
         }
       } else {
-        BlockMultiValIterator bvIter =
-            (BlockMultiValIterator) 
_immutableSegment.getDataSource(column).nextBlock().getBlockValueSet().iterator();
+        MultiValueSet valueSet = (MultiValueSet) 
_immutableSegment.getDataSource(column).nextBlock().getBlockValueSet();
 
         int rowId = 0;
-        for (Integer docId : _filteredDocIds) {
-          bvIter.skipTo(docId);
+        for (int docId : _filteredDocIds) {
           int[] dictIds = _mvColumnArrayMap.get(column);
-          int numMVValues = bvIter.nextIntVal(dictIds);
+          int numMVValues = valueSet.getIntValues(docId, dictIds);
 
           dictIds = Arrays.copyOf(dictIds, numMVValues);
           resultTable.add(rowId++, ArrayUtils.toObject(dictIds));
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/scan/query/SegmentQueryProcessor.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/scan/query/SegmentQueryProcessor.java
index 2d03d91..0f260e9 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/scan/query/SegmentQueryProcessor.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/scan/query/SegmentQueryProcessor.java
@@ -32,10 +32,11 @@ import org.apache.pinot.common.request.GroupBy;
 import org.apache.pinot.common.segment.ReadMode;
 import org.apache.pinot.common.utils.request.FilterQueryTree;
 import org.apache.pinot.common.utils.request.RequestUtils;
-import org.apache.pinot.core.common.BlockMultiValIterator;
-import org.apache.pinot.core.common.BlockSingleValIterator;
+import org.apache.pinot.core.common.DataSource;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.core.operator.docvalsets.MultiValueSet;
+import org.apache.pinot.core.operator.docvalsets.SingleValueSet;
 import org.apache.pinot.core.query.utils.Pair;
 import org.apache.pinot.core.segment.index.metadata.ColumnMetadata;
 import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
@@ -265,32 +266,42 @@ class SegmentQueryProcessor {
 
   private List<Integer> evaluatePredicate(List<Integer> inputDocIds, String 
column, PredicateFilter predicateFilter) {
     List<Integer> result = new ArrayList<>();
+    DataSource dataSource = _immutableSegment.getDataSource(column);
     if (!_mvColumns.contains(column)) {
-      BlockSingleValIterator bvIter =
-          (BlockSingleValIterator) 
_immutableSegment.getDataSource(column).nextBlock().getBlockValueSet().iterator();
-
-      int i = 0;
-      while (bvIter.hasNext() && (inputDocIds == null || i < 
inputDocIds.size())) {
-        int docId = (inputDocIds != null) ? inputDocIds.get(i++) : i++;
-        bvIter.skipTo(docId);
-        if (predicateFilter.apply(bvIter.nextIntVal())) {
-          result.add(docId);
+      SingleValueSet valueSet = (SingleValueSet) 
dataSource.nextBlock().getBlockValueSet();
+
+      if (inputDocIds != null) {
+        for (int docId : inputDocIds) {
+          if (predicateFilter.apply(valueSet.getIntValue(docId))) {
+            result.add(docId);
+          }
+        }
+      } else {
+        int numDocs = dataSource.getDataSourceMetadata().getNumDocs();
+        for (int docId = 0; docId < numDocs; docId++) {
+          if (predicateFilter.apply(valueSet.getIntValue(docId))) {
+            result.add(docId);
+          }
         }
       }
     } else {
-      BlockMultiValIterator bvIter =
-          (BlockMultiValIterator) 
_immutableSegment.getDataSource(column).nextBlock().getBlockValueSet().iterator();
-
-      int i = 0;
-      while (bvIter.hasNext() && (inputDocIds == null || i < 
inputDocIds.size())) {
-        int docId = (inputDocIds != null) ? inputDocIds.get(i++) : i++;
-        bvIter.skipTo(docId);
+      MultiValueSet valueSet = (MultiValueSet) 
dataSource.nextBlock().getBlockValueSet();
+      int[] dictIds = _mvColumnArrayMap.get(column);
 
-        int[] dictIds = _mvColumnArrayMap.get(column);
-        int numMVValues = bvIter.nextIntVal(dictIds);
-
-        if (predicateFilter.apply(dictIds, numMVValues)) {
-          result.add(docId);
+      if (inputDocIds != null) {
+        for (int docId : inputDocIds) {
+          int length = valueSet.getIntValues(docId, dictIds);
+          if (predicateFilter.apply(dictIds, length)) {
+            result.add(docId);
+          }
+        }
+      } else {
+        int numDocs = dataSource.getDataSourceMetadata().getNumDocs();
+        for (int docId = 0; docId < numDocs; docId++) {
+          int length = valueSet.getIntValues(docId, dictIds);
+          if (predicateFilter.apply(dictIds, length)) {
+            result.add(docId);
+          }
         }
       }
     }
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java
index 6627ab5..d80a8b4 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java
@@ -30,13 +30,13 @@ import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.segment.ReadMode;
 import org.apache.pinot.common.utils.StringUtil;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
-import org.apache.pinot.core.common.BlockSingleValIterator;
 import org.apache.pinot.core.common.DataSource;
 import org.apache.pinot.core.common.DataSourceMetadata;
 import org.apache.pinot.core.indexsegment.IndexSegment;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
 import org.apache.pinot.core.io.compression.ChunkCompressorFactory;
 import org.apache.pinot.core.io.writer.impl.v1.BaseChunkSingleValueWriter;
+import org.apache.pinot.core.operator.docvalsets.SingleValueSet;
 import org.apache.pinot.core.segment.creator.SingleValueRawIndexCreator;
 import org.apache.pinot.core.segment.creator.impl.SegmentColumnarIndexCreator;
 import org.apache.pinot.core.segment.creator.impl.V1Constants;
@@ -298,31 +298,27 @@ public class DictionaryToRawIndexConverter {
       return;
     }
 
-    int totalDocs = segment.getSegmentMetadata().getTotalDocs();
-    BlockSingleValIterator bvIter = (BlockSingleValIterator) 
dataSource.nextBlock().getBlockValueSet().iterator();
-
-    FieldSpec.DataType dataType = dataSourceMetadata.getDataType();
-    int lengthOfLongestEntry =
-        (dataType == FieldSpec.DataType.STRING) ? 
getLengthOfLongestEntry(bvIter, dictionary) : -1;
-
     ChunkCompressorFactory.CompressionType compressionType =
         ChunkCompressorFactory.CompressionType.valueOf(_compressionType);
-    SingleValueRawIndexCreator rawIndexCreator = SegmentColumnarIndexCreator
-        .getRawIndexCreatorForColumn(newSegment, compressionType, column, 
dataType, totalDocs, lengthOfLongestEntry, false,
-            BaseChunkSingleValueWriter.DEFAULT_VERSION);
-
-    int docId = 0;
-    bvIter.reset();
-    while (bvIter.hasNext()) {
-      int dictId = bvIter.nextIntVal();
-      Object value = dictionary.get(dictId);
-      rawIndexCreator.index(docId++, value);
-
-      if (docId % 1000000 == 0) {
-        LOGGER.info("Converted {} records.", docId);
+    FieldSpec.DataType dataType = dataSourceMetadata.getDataType();
+    int totalDocs = segment.getSegmentMetadata().getTotalDocs();
+    int lengthOfLongestEntry = (dataType == FieldSpec.DataType.STRING) ? 
getLengthOfLongestEntry(dictionary) : -1;
+    SingleValueSet valueSet = (SingleValueSet) 
dataSource.nextBlock().getBlockValueSet();
+
+    try (SingleValueRawIndexCreator rawIndexCreator = 
SegmentColumnarIndexCreator
+        .getRawIndexCreatorForColumn(newSegment, compressionType, column, 
dataType, totalDocs, lengthOfLongestEntry,
+            false, BaseChunkSingleValueWriter.DEFAULT_VERSION)) {
+      for (int docId = 0; docId < totalDocs; docId++) {
+        int dictId = valueSet.getIntValue(docId);
+        Object value = dictionary.get(dictId);
+        rawIndexCreator.index(docId++, value);
+
+        if (docId % 1000000 == 0) {
+          LOGGER.info("Converted {} records.", docId);
+        }
       }
     }
-    rawIndexCreator.close();
+
     deleteForwardIndex(newSegment.getParentFile(), column, 
dataSourceMetadata.isSorted());
   }
 
@@ -345,16 +341,14 @@ public class DictionaryToRawIndexConverter {
 
   /**
    * Helper method to get the length
-   * @param bvIter Data source blockvalset iterator
    * @param dictionary Column dictionary
    * @return Length of longest entry
    */
-  private int getLengthOfLongestEntry(BlockSingleValIterator bvIter, 
Dictionary dictionary) {
+  private int getLengthOfLongestEntry(Dictionary dictionary) {
     int lengthOfLongestEntry = 0;
 
-    bvIter.reset();
-    while (bvIter.hasNext()) {
-      int dictId = bvIter.nextIntVal();
+    int length = dictionary.length();
+    for (int dictId = 0; dictId < length; dictId++) {
       String value = (String) dictionary.get(dictId);
       lengthOfLongestEntry = Math.max(lengthOfLongestEntry, 
StringUtil.encodeUtf8(value).length);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to