This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch bytes_on_heap
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit e89d27d1ecb2729f1412a46239b94f85b44fe676
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Tue Jun 14 18:04:56 2022 -0700

    Optimize dictionary lookup for IN clause
---
 .../request/context/predicate/BaseInPredicate.java | 158 +++++++++++++++++++++
 .../request/context/predicate/InPredicate.java     |  10 +-
 .../request/context/predicate/NotInPredicate.java  |  10 +-
 .../predicate/InPredicateEvaluatorFactory.java     |  84 +++++------
 .../predicate/NotInPredicateEvaluatorFactory.java  |  84 +++++------
 .../operator/filter/predicate/PredicateUtils.java  |  88 ++++++++++++
 .../BigDecimalOffHeapMutableDictionary.java        |   5 +
 .../BigDecimalOnHeapMutableDictionary.java         |   5 +
 .../dictionary/BytesOffHeapMutableDictionary.java  |   5 +
 .../dictionary/BytesOnHeapMutableDictionary.java   |   5 +
 .../dictionary/DoubleOffHeapMutableDictionary.java |   5 +
 .../dictionary/DoubleOnHeapMutableDictionary.java  |   5 +
 .../dictionary/FloatOffHeapMutableDictionary.java  |   5 +
 .../dictionary/FloatOnHeapMutableDictionary.java   |   5 +
 .../dictionary/IntOffHeapMutableDictionary.java    |   5 +
 .../dictionary/IntOnHeapMutableDictionary.java     |   5 +
 .../dictionary/LongOffHeapMutableDictionary.java   |   5 +
 .../dictionary/LongOnHeapMutableDictionary.java    |   5 +
 .../DefaultNullValueVirtualColumnProvider.java     |   4 +
 .../index/column/PhysicalColumnIndexContainer.java |  25 ++--
 .../index/readers/BaseImmutableDictionary.java     |   2 +-
 .../index/readers/BigDecimalDictionary.java        |  15 +-
 .../segment/index/readers/BytesDictionary.java     |   5 +
 ...java => ConstantValueBigDecimalDictionary.java} |  55 +++----
 .../readers/ConstantValueBytesDictionary.java      |  11 ++
 .../readers/ConstantValueDoubleDictionary.java     |  10 ++
 .../readers/ConstantValueFloatDictionary.java      |  10 ++
 .../index/readers/ConstantValueIntDictionary.java  |  10 ++
 .../index/readers/ConstantValueLongDictionary.java |  10 ++
 .../readers/ConstantValueStringDictionary.java     |   5 +
 .../segment/index/readers/DoubleDictionary.java    |   8 +-
 .../segment/index/readers/FloatDictionary.java     |   8 +-
 .../local/segment/index/readers/IntDictionary.java |   8 +-
 .../segment/index/readers/LongDictionary.java      |   8 +-
 ...ionary.java => OnHeapBigDecimalDictionary.java} |  50 ++++---
 ...sDictionary.java => OnHeapBytesDictionary.java} |  50 +++++--
 .../segment/index/readers/OnHeapDictionary.java    |  32 -----
 .../index/readers/OnHeapDoubleDictionary.java      |  21 +--
 .../index/readers/OnHeapFloatDictionary.java       |  21 +--
 .../segment/index/readers/OnHeapIntDictionary.java |  21 +--
 .../index/readers/OnHeapLongDictionary.java        |  21 +--
 .../index/readers/OnHeapStringDictionary.java      |  22 +--
 .../segment/index/readers/StringDictionary.java    |   8 +-
 .../index/readers/ImmutableDictionaryTest.java     |  95 +++++++------
 .../ImmutableDictionaryTypeConversionTest.java     | 100 +++++++------
 .../pinot/segment/spi/index/reader/Dictionary.java |  49 +++++--
 .../java/org/apache/pinot/spi/utils/ByteArray.java |  45 +++---
 47 files changed, 833 insertions(+), 390 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/request/context/predicate/BaseInPredicate.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/request/context/predicate/BaseInPredicate.java
new file mode 100644
index 0000000000..c9ce11b5de
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/request/context/predicate/BaseInPredicate.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.request.context.predicate;
+
+import java.math.BigDecimal;
+import java.util.List;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.spi.utils.BooleanUtils;
+import org.apache.pinot.spi.utils.ByteArray;
+import org.apache.pinot.spi.utils.BytesUtils;
+import org.apache.pinot.spi.utils.TimestampUtils;
+
+
+/**
+ * Base predicate for IN and NOT_IN.
+ */
+public abstract class BaseInPredicate extends BasePredicate {
+  protected final List<String> _values;
+
+  // Cache the parsed values
+  private int[] _intValues;
+  private long[] _longValues;
+  private float[] _floatValues;
+  private double[] _doubleValues;
+  private BigDecimal[] _bigDecimalValues;
+  private int[] _booleanValues;
+  private long[] _timestampValues;
+  private ByteArray[] _bytesValues;
+
+  public BaseInPredicate(ExpressionContext lhs, List<String> values) {
+    super(lhs);
+    _values = values;
+  }
+
+  public List<String> getValues() {
+    return _values;
+  }
+
+  public int[] getIntValues() {
+    int[] intValues = _intValues;
+    if (intValues == null) {
+      int numValues = _values.size();
+      intValues = new int[numValues];
+      for (int i = 0; i < numValues; i++) {
+        intValues[i] = Integer.parseInt(_values.get(i));
+      }
+      _intValues = intValues;
+    }
+    return intValues;
+  }
+
+  public long[] getLongValues() {
+    long[] longValues = _longValues;
+    if (longValues == null) {
+      int numValues = _values.size();
+      longValues = new long[numValues];
+      for (int i = 0; i < numValues; i++) {
+        longValues[i] = Long.parseLong(_values.get(i));
+      }
+      _longValues = longValues;
+    }
+    return longValues;
+  }
+
+  public float[] getFloatValues() {
+    float[] floatValues = _floatValues;
+    if (floatValues == null) {
+      int numValues = _values.size();
+      floatValues = new float[numValues];
+      for (int i = 0; i < numValues; i++) {
+        floatValues[i] = Float.parseFloat(_values.get(i));
+      }
+      _floatValues = floatValues;
+    }
+    return floatValues;
+  }
+
+  public double[] getDoubleValues() {
+    double[] doubleValues = _doubleValues;
+    if (doubleValues == null) {
+      int numValues = _values.size();
+      doubleValues = new double[numValues];
+      for (int i = 0; i < numValues; i++) {
+        doubleValues[i] = Double.parseDouble(_values.get(i));
+      }
+      _doubleValues = doubleValues;
+    }
+    return doubleValues;
+  }
+
+  public BigDecimal[] getBigDecimalValues() {
+    BigDecimal[] bigDecimalValues = _bigDecimalValues;
+    if (bigDecimalValues == null) {
+      int numValues = _values.size();
+      bigDecimalValues = new BigDecimal[numValues];
+      for (int i = 0; i < numValues; i++) {
+        bigDecimalValues[i] = new BigDecimal(_values.get(i));
+      }
+      _bigDecimalValues = bigDecimalValues;
+    }
+    return bigDecimalValues;
+  }
+
+  public int[] getBooleanValues() {
+    int[] booleanValues = _booleanValues;
+    if (booleanValues == null) {
+      int numValues = _values.size();
+      booleanValues = new int[numValues];
+      for (int i = 0; i < numValues; i++) {
+        booleanValues[i] = BooleanUtils.toInt(_values.get(i));
+      }
+      _booleanValues = booleanValues;
+    }
+    return booleanValues;
+  }
+
+  public long[] getTimestampValues() {
+    long[] timestampValues = _timestampValues;
+    if (timestampValues == null) {
+      int numValues = _values.size();
+      timestampValues = new long[numValues];
+      for (int i = 0; i < numValues; i++) {
+        timestampValues[i] = TimestampUtils.toMillisSinceEpoch(_values.get(i));
+      }
+      _timestampValues = timestampValues;
+    }
+    return timestampValues;
+  }
+
+  public ByteArray[] getBytesValues() {
+    ByteArray[] bigDecimalValues = _bytesValues;
+    if (bigDecimalValues == null) {
+      int numValues = _values.size();
+      bigDecimalValues = new ByteArray[numValues];
+      for (int i = 0; i < numValues; i++) {
+        bigDecimalValues[i] = BytesUtils.toByteArray(_values.get(i));
+      }
+      _bytesValues = bigDecimalValues;
+    }
+    return bigDecimalValues;
+  }
+}
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/request/context/predicate/InPredicate.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/request/context/predicate/InPredicate.java
index 18abc7eacf..793b525bab 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/request/context/predicate/InPredicate.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/request/context/predicate/InPredicate.java
@@ -26,12 +26,10 @@ import 
org.apache.pinot.common.request.context.ExpressionContext;
 /**
  * Predicate for IN.
  */
-public class InPredicate extends BasePredicate {
-  private final List<String> _values;
+public class InPredicate extends BaseInPredicate {
 
   public InPredicate(ExpressionContext lhs, List<String> values) {
-    super(lhs);
-    _values = values;
+    super(lhs, values);
   }
 
   @Override
@@ -39,10 +37,6 @@ public class InPredicate extends BasePredicate {
     return Type.IN;
   }
 
-  public List<String> getValues() {
-    return _values;
-  }
-
   @Override
   public boolean equals(Object o) {
     if (this == o) {
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/request/context/predicate/NotInPredicate.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/request/context/predicate/NotInPredicate.java
index 07398a9fcf..2fb1e016ff 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/request/context/predicate/NotInPredicate.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/request/context/predicate/NotInPredicate.java
@@ -26,12 +26,10 @@ import 
org.apache.pinot.common.request.context.ExpressionContext;
 /**
  * Predicate for NOT_IN.
  */
-public class NotInPredicate extends BasePredicate {
-  private final List<String> _values;
+public class NotInPredicate extends BaseInPredicate {
 
   public NotInPredicate(ExpressionContext lhs, List<String> values) {
-    super(lhs);
-    _values = values;
+    super(lhs, values);
   }
 
   @Override
@@ -39,10 +37,6 @@ public class NotInPredicate extends BasePredicate {
     return Type.NOT_IN;
   }
 
-  public List<String> getValues() {
-    return _values;
-  }
-
   @Override
   public boolean equals(Object o) {
     if (this == o) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/InPredicateEvaluatorFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/InPredicateEvaluatorFactory.java
index 4724fefbaf..28bc32656b 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/InPredicateEvaluatorFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/InPredicateEvaluatorFactory.java
@@ -28,6 +28,7 @@ import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
 import it.unimi.dsi.fastutil.longs.LongSet;
 import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
 import java.math.BigDecimal;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
@@ -35,10 +36,7 @@ import 
org.apache.pinot.common.request.context.predicate.InPredicate;
 import org.apache.pinot.common.utils.HashUtil;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
-import org.apache.pinot.spi.utils.BooleanUtils;
 import org.apache.pinot.spi.utils.ByteArray;
-import org.apache.pinot.spi.utils.BytesUtils;
-import org.apache.pinot.spi.utils.TimestampUtils;
 
 
 /**
@@ -70,67 +68,80 @@ public class InPredicateEvaluatorFactory {
    */
   public static BaseRawValueBasedPredicateEvaluator 
newRawValueBasedEvaluator(InPredicate inPredicate,
       DataType dataType) {
-    List<String> values = inPredicate.getValues();
-    int hashSetSize = HashUtil.getMinHashSetSize(values.size());
     switch (dataType) {
       case INT: {
-        IntSet matchingValues = new IntOpenHashSet(hashSetSize);
-        for (String value : values) {
-          matchingValues.add(Integer.parseInt(value));
+        int[] intValues = inPredicate.getIntValues();
+        IntSet matchingValues = new 
IntOpenHashSet(HashUtil.getMinHashSetSize(intValues.length));
+        for (int value : intValues) {
+          matchingValues.add(value);
         }
         return new IntRawValueBasedInPredicateEvaluator(inPredicate, 
matchingValues);
       }
       case LONG: {
-        LongSet matchingValues = new LongOpenHashSet(hashSetSize);
-        for (String value : values) {
-          matchingValues.add(Long.parseLong(value));
+        long[] longValues = inPredicate.getLongValues();
+        LongSet matchingValues = new 
LongOpenHashSet(HashUtil.getMinHashSetSize(longValues.length));
+        for (long value : longValues) {
+          matchingValues.add(value);
         }
         return new LongRawValueBasedInPredicateEvaluator(inPredicate, 
matchingValues);
       }
       case FLOAT: {
-        FloatSet matchingValues = new FloatOpenHashSet(hashSetSize);
-        for (String value : values) {
-          matchingValues.add(Float.parseFloat(value));
+        float[] floatValues = inPredicate.getFloatValues();
+        FloatSet matchingValues = new 
FloatOpenHashSet(HashUtil.getMinHashSetSize(floatValues.length));
+        for (float value : floatValues) {
+          matchingValues.add(value);
         }
         return new FloatRawValueBasedInPredicateEvaluator(inPredicate, 
matchingValues);
       }
       case DOUBLE: {
-        DoubleSet matchingValues = new DoubleOpenHashSet(hashSetSize);
-        for (String value : values) {
-          matchingValues.add(Double.parseDouble(value));
+        double[] doubleValues = inPredicate.getDoubleValues();
+        DoubleSet matchingValues = new 
DoubleOpenHashSet(HashUtil.getMinHashSetSize(doubleValues.length));
+        for (double value : doubleValues) {
+          matchingValues.add(value);
         }
         return new DoubleRawValueBasedInPredicateEvaluator(inPredicate, 
matchingValues);
       }
       case BIG_DECIMAL: {
-        TreeSet<BigDecimal> matchingValues = new TreeSet<>();
-        for (String value : values) {
-          matchingValues.add(new BigDecimal(value));
-        }
+        BigDecimal[] bigDecimalValues = inPredicate.getBigDecimalValues();
+        // NOTE: Use TreeSet because BigDecimal's compareTo() is not 
consistent with equals()
+        //       E.g. compareTo(3.0, 3) returns 0 but equals(3.0, 3) returns 
false
+        TreeSet<BigDecimal> matchingValues = new 
TreeSet<>(Arrays.asList(bigDecimalValues));
         return new BigDecimalRawValueBasedInPredicateEvaluator(inPredicate, 
matchingValues);
       }
       case BOOLEAN: {
-        IntSet matchingValues = new IntOpenHashSet(hashSetSize);
-        for (String value : values) {
-          matchingValues.add(BooleanUtils.toInt(value));
+        int[] booleanValues = inPredicate.getBooleanValues();
+        IntSet matchingValues = new 
IntOpenHashSet(HashUtil.getMinHashSetSize(booleanValues.length));
+        for (int value : booleanValues) {
+          matchingValues.add(value);
         }
         return new IntRawValueBasedInPredicateEvaluator(inPredicate, 
matchingValues);
       }
       case TIMESTAMP: {
-        LongSet matchingValues = new LongOpenHashSet(hashSetSize);
-        for (String value : values) {
-          matchingValues.add(TimestampUtils.toMillisSinceEpoch(value));
+        long[] timestampValues = inPredicate.getTimestampValues();
+        LongSet matchingValues = new 
LongOpenHashSet(HashUtil.getMinHashSetSize(timestampValues.length));
+        for (long value : timestampValues) {
+          matchingValues.add(value);
         }
         return new LongRawValueBasedInPredicateEvaluator(inPredicate, 
matchingValues);
       }
       case STRING: {
-        Set<String> matchingValues = new ObjectOpenHashSet<>(hashSetSize);
-        matchingValues.addAll(values);
+        List<String> stringValues = inPredicate.getValues();
+        Set<String> matchingValues = new 
ObjectOpenHashSet<>(HashUtil.getMinHashSetSize(stringValues.size()));
+        // NOTE: Add value-by-value to avoid overhead
+        for (String value : stringValues) {
+          //noinspection UseBulkOperation
+          matchingValues.add(value);
+        }
         return new StringRawValueBasedInPredicateEvaluator(inPredicate, 
matchingValues);
       }
       case BYTES: {
-        Set<ByteArray> matchingValues = new ObjectOpenHashSet<>(hashSetSize);
-        for (String value : values) {
-          matchingValues.add(BytesUtils.toByteArray(value));
+        ByteArray[] bytesValues = inPredicate.getBytesValues();
+        Set<ByteArray> matchingValues = new 
ObjectOpenHashSet<>(HashUtil.getMinHashSetSize(bytesValues.length));
+        // NOTE: Add value-by-value to avoid overhead
+        //noinspection ManualArrayToCollectionCopy
+        for (ByteArray value : bytesValues) {
+          //noinspection UseBulkOperation
+          matchingValues.add(value);
         }
         return new BytesRawValueBasedInPredicateEvaluator(inPredicate, 
matchingValues);
       }
@@ -146,14 +157,7 @@ public class InPredicateEvaluatorFactory {
 
     DictionaryBasedInPredicateEvaluator(InPredicate inPredicate, Dictionary 
dictionary, DataType dataType) {
       super(inPredicate);
-      List<String> values = inPredicate.getValues();
-      _matchingDictIdSet = new 
IntOpenHashSet(HashUtil.getMinHashSetSize(values.size()));
-      for (String value : values) {
-        int dictId = dictionary.indexOf(PredicateUtils.getStoredValue(value, 
dataType));
-        if (dictId >= 0) {
-          _matchingDictIdSet.add(dictId);
-        }
-      }
+      _matchingDictIdSet = PredicateUtils.getDictIdSet(inPredicate, 
dictionary, dataType);
       _numMatchingDictIds = _matchingDictIdSet.size();
       if (_numMatchingDictIds == 0) {
         _alwaysFalse = true;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/NotInPredicateEvaluatorFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/NotInPredicateEvaluatorFactory.java
index 3407754ee9..74bd681521 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/NotInPredicateEvaluatorFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/NotInPredicateEvaluatorFactory.java
@@ -28,6 +28,7 @@ import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
 import it.unimi.dsi.fastutil.longs.LongSet;
 import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
 import java.math.BigDecimal;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
@@ -35,10 +36,7 @@ import 
org.apache.pinot.common.request.context.predicate.NotInPredicate;
 import org.apache.pinot.common.utils.HashUtil;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
-import org.apache.pinot.spi.utils.BooleanUtils;
 import org.apache.pinot.spi.utils.ByteArray;
-import org.apache.pinot.spi.utils.BytesUtils;
-import org.apache.pinot.spi.utils.TimestampUtils;
 
 
 /**
@@ -70,67 +68,80 @@ public class NotInPredicateEvaluatorFactory {
    */
   public static BaseRawValueBasedPredicateEvaluator 
newRawValueBasedEvaluator(NotInPredicate notInPredicate,
       DataType dataType) {
-    List<String> values = notInPredicate.getValues();
-    int hashSetSize = HashUtil.getMinHashSetSize(values.size());
     switch (dataType) {
       case INT: {
-        IntSet nonMatchingValues = new IntOpenHashSet(hashSetSize);
-        for (String value : values) {
-          nonMatchingValues.add(Integer.parseInt(value));
+        int[] intValues = notInPredicate.getIntValues();
+        IntSet nonMatchingValues = new 
IntOpenHashSet(HashUtil.getMinHashSetSize(intValues.length));
+        for (int value : intValues) {
+          nonMatchingValues.add(value);
         }
         return new IntRawValueBasedNotInPredicateEvaluator(notInPredicate, 
nonMatchingValues);
       }
       case LONG: {
-        LongSet nonMatchingValues = new LongOpenHashSet(hashSetSize);
-        for (String value : values) {
-          nonMatchingValues.add(Long.parseLong(value));
+        long[] longValues = notInPredicate.getLongValues();
+        LongSet nonMatchingValues = new 
LongOpenHashSet(HashUtil.getMinHashSetSize(longValues.length));
+        for (long value : longValues) {
+          nonMatchingValues.add(value);
         }
         return new LongRawValueBasedNotInPredicateEvaluator(notInPredicate, 
nonMatchingValues);
       }
       case FLOAT: {
-        FloatSet nonMatchingValues = new FloatOpenHashSet(hashSetSize);
-        for (String value : values) {
-          nonMatchingValues.add(Float.parseFloat(value));
+        float[] floatValues = notInPredicate.getFloatValues();
+        FloatSet nonMatchingValues = new 
FloatOpenHashSet(HashUtil.getMinHashSetSize(floatValues.length));
+        for (float value : floatValues) {
+          nonMatchingValues.add(value);
         }
         return new FloatRawValueBasedNotInPredicateEvaluator(notInPredicate, 
nonMatchingValues);
       }
       case DOUBLE: {
-        DoubleSet nonMatchingValues = new DoubleOpenHashSet(hashSetSize);
-        for (String value : values) {
-          nonMatchingValues.add(Double.parseDouble(value));
+        double[] doubleValues = notInPredicate.getDoubleValues();
+        DoubleSet nonMatchingValues = new 
DoubleOpenHashSet(HashUtil.getMinHashSetSize(doubleValues.length));
+        for (double value : doubleValues) {
+          nonMatchingValues.add(value);
         }
         return new DoubleRawValueBasedNotInPredicateEvaluator(notInPredicate, 
nonMatchingValues);
       }
       case BIG_DECIMAL: {
-        TreeSet<BigDecimal> nonMatchingValues = new TreeSet<>();
-        for (String value : values) {
-          nonMatchingValues.add(new BigDecimal(value));
-        }
+        BigDecimal[] bigDecimalValues = notInPredicate.getBigDecimalValues();
+        // NOTE: Use TreeSet because BigDecimal's compareTo() is not 
consistent with equals()
+        //       E.g. compareTo(3.0, 3) returns 0 but equals(3.0, 3) returns 
false
+        TreeSet<BigDecimal> nonMatchingValues = new 
TreeSet<>(Arrays.asList(bigDecimalValues));
         return new 
BigDecimalRawValueBasedNotInPredicateEvaluator(notInPredicate, 
nonMatchingValues);
       }
       case BOOLEAN: {
-        IntSet nonMatchingValues = new IntOpenHashSet(hashSetSize);
-        for (String value : values) {
-          nonMatchingValues.add(BooleanUtils.toInt(value));
+        int[] booleanValues = notInPredicate.getBooleanValues();
+        IntSet nonMatchingValues = new 
IntOpenHashSet(HashUtil.getMinHashSetSize(booleanValues.length));
+        for (int value : booleanValues) {
+          nonMatchingValues.add(value);
         }
         return new IntRawValueBasedNotInPredicateEvaluator(notInPredicate, 
nonMatchingValues);
       }
       case TIMESTAMP: {
-        LongSet nonMatchingValues = new LongOpenHashSet(hashSetSize);
-        for (String value : values) {
-          nonMatchingValues.add(TimestampUtils.toMillisSinceEpoch(value));
+        long[] timestampValues = notInPredicate.getTimestampValues();
+        LongSet nonMatchingValues = new 
LongOpenHashSet(HashUtil.getMinHashSetSize(timestampValues.length));
+        for (long value : timestampValues) {
+          nonMatchingValues.add(value);
         }
         return new LongRawValueBasedNotInPredicateEvaluator(notInPredicate, 
nonMatchingValues);
       }
       case STRING: {
-        Set<String> nonMatchingValues = new ObjectOpenHashSet<>(hashSetSize);
-        nonMatchingValues.addAll(values);
+        List<String> stringValues = notInPredicate.getValues();
+        Set<String> nonMatchingValues = new 
ObjectOpenHashSet<>(HashUtil.getMinHashSetSize(stringValues.size()));
+        // NOTE: Add value-by-value to avoid overhead
+        for (String value : stringValues) {
+          //noinspection UseBulkOperation
+          nonMatchingValues.add(value);
+        }
         return new StringRawValueBasedNotInPredicateEvaluator(notInPredicate, 
nonMatchingValues);
       }
       case BYTES: {
-        Set<ByteArray> nonMatchingValues = new 
ObjectOpenHashSet<>(hashSetSize);
-        for (String value : values) {
-          nonMatchingValues.add(BytesUtils.toByteArray(value));
+        ByteArray[] bytesValues = notInPredicate.getBytesValues();
+        Set<ByteArray> nonMatchingValues = new 
ObjectOpenHashSet<>(HashUtil.getMinHashSetSize(bytesValues.length));
+        // NOTE: Add value-by-value to avoid overhead
+        //noinspection ManualArrayToCollectionCopy
+        for (ByteArray value : bytesValues) {
+          //noinspection UseBulkOperation
+          nonMatchingValues.add(value);
         }
         return new BytesRawValueBasedNotInPredicateEvaluator(notInPredicate, 
nonMatchingValues);
       }
@@ -148,14 +159,7 @@ public class NotInPredicateEvaluatorFactory {
 
     DictionaryBasedNotInPredicateEvaluator(NotInPredicate notInPredicate, 
Dictionary dictionary, DataType dataType) {
       super(notInPredicate);
-      List<String> values = notInPredicate.getValues();
-      _nonMatchingDictIdSet = new 
IntOpenHashSet(HashUtil.getMinHashSetSize(values.size()));
-      for (String value : values) {
-        int dictId = dictionary.indexOf(PredicateUtils.getStoredValue(value, 
dataType));
-        if (dictId >= 0) {
-          _nonMatchingDictIdSet.add(dictId);
-        }
-      }
+      _nonMatchingDictIdSet = PredicateUtils.getDictIdSet(notInPredicate, 
dictionary, dataType);
       _numNonMatchingDictIds = _nonMatchingDictIdSet.size();
       if (_numNonMatchingDictIds == 0) {
         _alwaysTrue = true;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/PredicateUtils.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/PredicateUtils.java
index e7bb700b01..19ca4c1176 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/PredicateUtils.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/PredicateUtils.java
@@ -18,8 +18,16 @@
  */
 package org.apache.pinot.core.operator.filter.predicate;
 
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import java.math.BigDecimal;
+import java.util.List;
+import org.apache.pinot.common.request.context.predicate.BaseInPredicate;
+import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.utils.BooleanUtils;
+import org.apache.pinot.spi.utils.ByteArray;
 import org.apache.pinot.spi.utils.TimestampUtils;
 
 
@@ -27,6 +35,10 @@ public class PredicateUtils {
   private PredicateUtils() {
   }
 
+  // Bound the initial dictionary id set size to prevent over-allocating when 
a lot of values do not exist in the
+  // dictionary
+  private static final int MAX_INITIAL_DICT_ID_SET_SIZE = 1000;
+
   /**
    * Converts the given predicate value to the stored value based on the data 
type.
    */
@@ -54,4 +66,80 @@ public class PredicateUtils {
   public static String getStoredTimestampValue(String timestampValue) {
     return Long.toString(TimestampUtils.toMillisSinceEpoch(timestampValue));
   }
+
+  /**
+   * Returns a dictionary id set of the values in the given IN/NOT_IN 
predicate.
+   */
+  public static IntSet getDictIdSet(BaseInPredicate inPredicate, Dictionary 
dictionary, DataType dataType) {
+    List<String> values = inPredicate.getValues();
+    int hashSetSize = Integer.min(HashUtil.getMinHashSetSize(values.size()), 
MAX_INITIAL_DICT_ID_SET_SIZE);
+    IntSet dictIdSet = new IntOpenHashSet(hashSetSize);
+    switch (dataType.getStoredType()) {
+      case INT:
+        int[] intValues = inPredicate.getIntValues();
+        for (int value : intValues) {
+          int dictId = dictionary.indexOf(value);
+          if (dictId >= 0) {
+            dictIdSet.add(dictId);
+          }
+        }
+        break;
+      case LONG:
+        long[] longValues = inPredicate.getLongValues();
+        for (long value : longValues) {
+          int dictId = dictionary.indexOf(value);
+          if (dictId >= 0) {
+            dictIdSet.add(dictId);
+          }
+        }
+        break;
+      case FLOAT:
+        float[] floatValues = inPredicate.getFloatValues();
+        for (float value : floatValues) {
+          int dictId = dictionary.indexOf(value);
+          if (dictId >= 0) {
+            dictIdSet.add(dictId);
+          }
+        }
+        break;
+      case DOUBLE:
+        double[] doubleValues = inPredicate.getDoubleValues();
+        for (double value : doubleValues) {
+          int dictId = dictionary.indexOf(value);
+          if (dictId >= 0) {
+            dictIdSet.add(dictId);
+          }
+        }
+        break;
+      case BIG_DECIMAL:
+        BigDecimal[] bigDecimalValues = inPredicate.getBigDecimalValues();
+        for (BigDecimal value : bigDecimalValues) {
+          int dictId = dictionary.indexOf(value);
+          if (dictId >= 0) {
+            dictIdSet.add(dictId);
+          }
+        }
+        break;
+      case STRING:
+        for (String value : values) {
+          int dictId = dictionary.indexOf(value);
+          if (dictId >= 0) {
+            dictIdSet.add(dictId);
+          }
+        }
+        break;
+      case BYTES:
+        ByteArray[] bytesValues = inPredicate.getBytesValues();
+        for (ByteArray value : bytesValues) {
+          int dictId = dictionary.indexOf(value);
+          if (dictId >= 0) {
+            dictIdSet.add(dictId);
+          }
+        }
+        break;
+      default:
+        throw new IllegalStateException("Unsupported data type: " + dataType);
+    }
+    return dictIdSet;
+  }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/BigDecimalOffHeapMutableDictionary.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/BigDecimalOffHeapMutableDictionary.java
index 9350de77d4..6e39aff860 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/BigDecimalOffHeapMutableDictionary.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/BigDecimalOffHeapMutableDictionary.java
@@ -181,6 +181,11 @@ public class BigDecimalOffHeapMutableDictionary extends 
BaseOffHeapMutableDictio
     return getDictId(bigDecimalValue, 
BigDecimalUtils.serialize(bigDecimalValue));
   }
 
+  @Override
+  public int indexOf(BigDecimal bigDecimalValue) {
+    return getDictId(bigDecimalValue, 
BigDecimalUtils.serialize(bigDecimalValue));
+  }
+
   public BigDecimal get(int dictId) {
     return getBigDecimalValue(dictId);
   }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/BigDecimalOnHeapMutableDictionary.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/BigDecimalOnHeapMutableDictionary.java
index 0631bdf000..8e608485fd 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/BigDecimalOnHeapMutableDictionary.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/BigDecimalOnHeapMutableDictionary.java
@@ -168,6 +168,11 @@ public class BigDecimalOnHeapMutableDictionary extends 
BaseOnHeapMutableDictiona
     return getDictId(new BigDecimal(stringValue));
   }
 
+  @Override
+  public int indexOf(BigDecimal bigDecimalValue) {
+    return getDictId(bigDecimalValue);
+  }
+
   @Override
   public int getIntValue(int dictId) {
     return getBigDecimalValue(dictId).intValue();
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/BytesOffHeapMutableDictionary.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/BytesOffHeapMutableDictionary.java
index 1ad89ef13e..b465860c47 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/BytesOffHeapMutableDictionary.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/BytesOffHeapMutableDictionary.java
@@ -146,6 +146,11 @@ public class BytesOffHeapMutableDictionary extends 
BaseOffHeapMutableDictionary
     return getDictId(new ByteArray(bytesValue), bytesValue);
   }
 
+  @Override
+  public int indexOf(ByteArray bytesValue) {
+    return getDictId(bytesValue, bytesValue.getBytes());
+  }
+
   @Override
   public byte[] get(int dictId) {
     return getBytesValue(dictId);
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/BytesOnHeapMutableDictionary.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/BytesOnHeapMutableDictionary.java
index ade4e664eb..a2bd871827 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/BytesOnHeapMutableDictionary.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/BytesOnHeapMutableDictionary.java
@@ -125,6 +125,11 @@ public class BytesOnHeapMutableDictionary extends 
BaseOnHeapMutableDictionary {
     return getDictId(BytesUtils.toByteArray(stringValue));
   }
 
+  @Override
+  public int indexOf(ByteArray bytesValue) {
+    return getDictId(bytesValue);
+  }
+
   @Override
   public byte[] get(int dictId) {
     return getBytesValue(dictId);
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/DoubleOffHeapMutableDictionary.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/DoubleOffHeapMutableDictionary.java
index f16fcadb16..0b1035c455 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/DoubleOffHeapMutableDictionary.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/DoubleOffHeapMutableDictionary.java
@@ -180,6 +180,11 @@ public class DoubleOffHeapMutableDictionary extends 
BaseOffHeapMutableDictionary
     return getDictId(Double.valueOf(stringValue), null);
   }
 
+  @Override
+  public int indexOf(double doubleValue) {
+    return getDictId(doubleValue, null);
+  }
+
   public Double get(int dictId) {
     return getDoubleValue(dictId);
   }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/DoubleOnHeapMutableDictionary.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/DoubleOnHeapMutableDictionary.java
index fd5f2ecce7..c46700b63e 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/DoubleOnHeapMutableDictionary.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/DoubleOnHeapMutableDictionary.java
@@ -167,6 +167,11 @@ public class DoubleOnHeapMutableDictionary extends 
BaseOnHeapMutableDictionary {
     return getDictId(Double.valueOf(stringValue));
   }
 
+  @Override
+  public int indexOf(double doubleValue) {
+    return getDictId(doubleValue);
+  }
+
   @Override
   public int getIntValue(int dictId) {
     return (int) getDoubleValue(dictId);
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/FloatOffHeapMutableDictionary.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/FloatOffHeapMutableDictionary.java
index e94866287f..cefb8c9f63 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/FloatOffHeapMutableDictionary.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/FloatOffHeapMutableDictionary.java
@@ -180,6 +180,11 @@ public class FloatOffHeapMutableDictionary extends 
BaseOffHeapMutableDictionary
     return getDictId(Float.valueOf(stringValue), null);
   }
 
+  @Override
+  public int indexOf(float floatValue) {
+    return getDictId(floatValue, null);
+  }
+
   public Float get(int dictId) {
     return getFloatValue(dictId);
   }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/FloatOnHeapMutableDictionary.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/FloatOnHeapMutableDictionary.java
index 7f6efa41f9..0cba5ad605 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/FloatOnHeapMutableDictionary.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/FloatOnHeapMutableDictionary.java
@@ -167,6 +167,11 @@ public class FloatOnHeapMutableDictionary extends 
BaseOnHeapMutableDictionary {
     return getDictId(Float.valueOf(stringValue));
   }
 
+  @Override
+  public int indexOf(float floatValue) {
+    return getDictId(floatValue);
+  }
+
   @Override
   public int getIntValue(int dictId) {
     return (int) getFloatValue(dictId);
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/IntOffHeapMutableDictionary.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/IntOffHeapMutableDictionary.java
index 3adc427886..9c6a7ad310 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/IntOffHeapMutableDictionary.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/IntOffHeapMutableDictionary.java
@@ -180,6 +180,11 @@ public class IntOffHeapMutableDictionary extends 
BaseOffHeapMutableDictionary {
     return getDictId(Integer.valueOf(stringValue), null);
   }
 
+  @Override
+  public int indexOf(int intValue) {
+    return getDictId(intValue, null);
+  }
+
   public Integer get(int dictId) {
     return getIntValue(dictId);
   }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/IntOnHeapMutableDictionary.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/IntOnHeapMutableDictionary.java
index 5302c588c1..f374385187 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/IntOnHeapMutableDictionary.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/IntOnHeapMutableDictionary.java
@@ -167,6 +167,11 @@ public class IntOnHeapMutableDictionary extends 
BaseOnHeapMutableDictionary {
     return getDictId(Integer.valueOf(stringValue));
   }
 
+  @Override
+  public int indexOf(int intValue) {
+    return getDictId(intValue);
+  }
+
   @Override
   public int getIntValue(int dictId) {
     return (Integer) get(dictId);
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/LongOffHeapMutableDictionary.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/LongOffHeapMutableDictionary.java
index 640be25344..e6824c0734 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/LongOffHeapMutableDictionary.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/LongOffHeapMutableDictionary.java
@@ -180,6 +180,11 @@ public class LongOffHeapMutableDictionary extends 
BaseOffHeapMutableDictionary {
     return getDictId(Long.valueOf(stringValue), null);
   }
 
+  @Override
+  public int indexOf(long longValue) {
+    return getDictId(longValue, null);
+  }
+
   @Override
   public Long get(int dictId) {
     return getLongValue(dictId);
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/LongOnHeapMutableDictionary.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/LongOnHeapMutableDictionary.java
index 49b791decb..b74997dbec 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/LongOnHeapMutableDictionary.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/LongOnHeapMutableDictionary.java
@@ -167,6 +167,11 @@ public class LongOnHeapMutableDictionary extends 
BaseOnHeapMutableDictionary {
     return getDictId(Long.valueOf(stringValue));
   }
 
+  @Override
+  public int indexOf(long longValue) {
+    return getDictId(longValue);
+  }
+
   @Override
   public int getIntValue(int dictId) {
     return (int) getLongValue(dictId);
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/DefaultNullValueVirtualColumnProvider.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/DefaultNullValueVirtualColumnProvider.java
index 23ed46d2c9..5909ed27a0 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/DefaultNullValueVirtualColumnProvider.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/DefaultNullValueVirtualColumnProvider.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pinot.segment.local.segment.index.column;
 
+import java.math.BigDecimal;
+import 
org.apache.pinot.segment.local.segment.index.readers.ConstantValueBigDecimalDictionary;
 import 
org.apache.pinot.segment.local.segment.index.readers.ConstantValueBytesDictionary;
 import 
org.apache.pinot.segment.local.segment.index.readers.ConstantValueDoubleDictionary;
 import 
org.apache.pinot.segment.local.segment.index.readers.ConstantValueFloatDictionary;
@@ -61,6 +63,8 @@ public class DefaultNullValueVirtualColumnProvider extends 
BaseVirtualColumnProv
         return new ConstantValueFloatDictionary((float) 
fieldSpec.getDefaultNullValue());
       case DOUBLE:
         return new ConstantValueDoubleDictionary((double) 
fieldSpec.getDefaultNullValue());
+      case BIG_DECIMAL:
+        return new ConstantValueBigDecimalDictionary((BigDecimal) 
fieldSpec.getDefaultNullValue());
       case STRING:
         return new ConstantValueStringDictionary((String) 
fieldSpec.getDefaultNullValue());
       case BYTES:
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java
index 16a1ee2a5c..209f2040e8 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java
@@ -31,6 +31,8 @@ import 
org.apache.pinot.segment.local.segment.index.readers.FloatDictionary;
 import org.apache.pinot.segment.local.segment.index.readers.IntDictionary;
 import org.apache.pinot.segment.local.segment.index.readers.LongDictionary;
 import 
org.apache.pinot.segment.local.segment.index.readers.NullValueVectorReaderImpl;
+import 
org.apache.pinot.segment.local.segment.index.readers.OnHeapBigDecimalDictionary;
+import 
org.apache.pinot.segment.local.segment.index.readers.OnHeapBytesDictionary;
 import 
org.apache.pinot.segment.local.segment.index.readers.OnHeapDoubleDictionary;
 import 
org.apache.pinot.segment.local.segment.index.readers.OnHeapFloatDictionary;
 import 
org.apache.pinot.segment.local.segment.index.readers.OnHeapIntDictionary;
@@ -232,37 +234,32 @@ public final class PhysicalColumnIndexContainer 
implements ColumnIndexContainer
     int length = metadata.getCardinality();
     switch (dataType.getStoredType()) {
       case INT:
-        return (loadOnHeap) ? new OnHeapIntDictionary(dictionaryBuffer, length)
+        return loadOnHeap ? new OnHeapIntDictionary(dictionaryBuffer, length)
             : new IntDictionary(dictionaryBuffer, length);
-
       case LONG:
-        return (loadOnHeap) ? new OnHeapLongDictionary(dictionaryBuffer, 
length)
+        return loadOnHeap ? new OnHeapLongDictionary(dictionaryBuffer, length)
             : new LongDictionary(dictionaryBuffer, length);
-
       case FLOAT:
-        return (loadOnHeap) ? new OnHeapFloatDictionary(dictionaryBuffer, 
length)
+        return loadOnHeap ? new OnHeapFloatDictionary(dictionaryBuffer, length)
             : new FloatDictionary(dictionaryBuffer, length);
-
       case DOUBLE:
-        return (loadOnHeap) ? new OnHeapDoubleDictionary(dictionaryBuffer, 
length)
+        return loadOnHeap ? new OnHeapDoubleDictionary(dictionaryBuffer, 
length)
             : new DoubleDictionary(dictionaryBuffer, length);
-
       case BIG_DECIMAL:
         int numBytesPerValue = metadata.getColumnMaxLength();
-        return new BigDecimalDictionary(dictionaryBuffer, length, 
numBytesPerValue);
-
+        return loadOnHeap ? new OnHeapBigDecimalDictionary(dictionaryBuffer, 
length, numBytesPerValue)
+            : new BigDecimalDictionary(dictionaryBuffer, length, 
numBytesPerValue);
       case STRING:
         numBytesPerValue = metadata.getColumnMaxLength();
         byte paddingByte = (byte) metadata.getPaddingCharacter();
         return loadOnHeap ? new OnHeapStringDictionary(dictionaryBuffer, 
length, numBytesPerValue, paddingByte)
             : new StringDictionary(dictionaryBuffer, length, numBytesPerValue, 
paddingByte);
-
       case BYTES:
         numBytesPerValue = metadata.getColumnMaxLength();
-        return new BytesDictionary(dictionaryBuffer, length, numBytesPerValue);
-
+        return loadOnHeap ? new OnHeapBytesDictionary(dictionaryBuffer, 
length, numBytesPerValue)
+            : new BytesDictionary(dictionaryBuffer, length, numBytesPerValue);
       default:
-        throw new IllegalStateException("Illegal data type for dictionary: " + 
dataType);
+        throw new IllegalStateException("Unsupported data type for dictionary: 
" + dataType);
     }
   }
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BaseImmutableDictionary.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BaseImmutableDictionary.java
index f1702179b4..835cdc105b 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BaseImmutableDictionary.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BaseImmutableDictionary.java
@@ -86,7 +86,7 @@ public abstract class BaseImmutableDictionary implements 
Dictionary {
   }
 
   protected final int normalizeIndex(int index) {
-    return (index >= 0) ? index : Dictionary.NULL_VALUE_INDEX;
+    return index >= 0 ? index : NULL_VALUE_INDEX;
   }
 
   @Override
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BigDecimalDictionary.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BigDecimalDictionary.java
index 1aba0dde3a..8fcd622f7d 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BigDecimalDictionary.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BigDecimalDictionary.java
@@ -39,6 +39,11 @@ public class BigDecimalDictionary extends 
BaseImmutableDictionary {
     return DataType.BIG_DECIMAL;
   }
 
+  @Override
+  public int indexOf(BigDecimal bigDecimalValue) {
+    return binarySearch(bigDecimalValue);
+  }
+
   @Override
   public int insertionIndexOf(String stringValue) {
     return binarySearch(new BigDecimal(stringValue));
@@ -61,22 +66,22 @@ public class BigDecimalDictionary extends 
BaseImmutableDictionary {
 
   @Override
   public int getIntValue(int dictId) {
-    return get(dictId).intValue();
+    return getBigDecimal(dictId).intValue();
   }
 
   @Override
   public long getLongValue(int dictId) {
-    return get(dictId).longValue();
+    return getBigDecimal(dictId).longValue();
   }
 
   @Override
   public float getFloatValue(int dictId) {
-    return get(dictId).floatValue();
+    return getBigDecimal(dictId).floatValue();
   }
 
   @Override
   public double getDoubleValue(int dictId) {
-    return get(dictId).doubleValue();
+    return getBigDecimal(dictId).doubleValue();
   }
 
   @Override
@@ -86,7 +91,7 @@ public class BigDecimalDictionary extends 
BaseImmutableDictionary {
 
   @Override
   public String getStringValue(int dictId) {
-    return get(dictId).toPlainString();
+    return getBigDecimal(dictId).toPlainString();
   }
 
   @Override
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BytesDictionary.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BytesDictionary.java
index ad78a724ae..0fc8049d7d 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BytesDictionary.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BytesDictionary.java
@@ -40,6 +40,11 @@ public class BytesDictionary extends BaseImmutableDictionary 
{
     return DataType.BYTES;
   }
 
+  @Override
+  public int indexOf(ByteArray bytesValue) {
+    return normalizeIndex(binarySearch(bytesValue.getBytes()));
+  }
+
   @Override
   public int insertionIndexOf(String stringValue) {
     return binarySearch(BytesUtils.toBytes(stringValue));
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueStringDictionary.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueBigDecimalDictionary.java
similarity index 62%
copy from 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueStringDictionary.java
copy to 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueBigDecimalDictionary.java
index 6e0e854766..2e74bb5422 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueStringDictionary.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueBigDecimalDictionary.java
@@ -20,89 +20,94 @@ package 
org.apache.pinot.segment.local.segment.index.readers;
 
 import java.math.BigDecimal;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
-import org.apache.pinot.spi.utils.BytesUtils;
 
 
 /**
- * Dictionary of a single string value.
+ * Dictionary of a single BIG_DECIMAL value.
  */
-public class ConstantValueStringDictionary extends BaseImmutableDictionary {
-  private final String _value;
+public class ConstantValueBigDecimalDictionary extends BaseImmutableDictionary 
{
+  private final BigDecimal _value;
 
-  public ConstantValueStringDictionary(String value) {
+  public ConstantValueBigDecimalDictionary(BigDecimal value) {
     super(1);
     _value = value;
   }
 
   @Override
   public DataType getValueType() {
-    return DataType.STRING;
+    return DataType.BIG_DECIMAL;
+  }
+
+  @Override
+  public int indexOf(String stringValue) {
+    // Use compareTo instead of equals because
+    return new BigDecimal(stringValue).compareTo(_value) == 0 ? 0 : 
NULL_VALUE_INDEX;
+  }
+
+  @Override
+  public int indexOf(BigDecimal bigDecimalValue) {
+    return bigDecimalValue.compareTo(_value) == 0 ? 0 : NULL_VALUE_INDEX;
   }
 
   @Override
   public int insertionIndexOf(String stringValue) {
-    int result = stringValue.compareTo(_value);
-    if (result < 0) {
+    int compareResult = new BigDecimal(stringValue).compareTo(_value);
+    if (compareResult < 0) {
       return -1;
     }
-    if (result > 0) {
+    if (compareResult > 0) {
       return -2;
     }
     return 0;
   }
 
   @Override
-  public String getMinVal() {
+  public BigDecimal getMinVal() {
     return _value;
   }
 
   @Override
-  public String getMaxVal() {
+  public BigDecimal getMaxVal() {
     return _value;
   }
 
   @Override
-  public String[] getSortedValues() {
-    return new String[]{_value};
+  public BigDecimal[] getSortedValues() {
+    return new BigDecimal[]{_value};
   }
 
   @Override
-  public String get(int dictId) {
+  public BigDecimal get(int dictId) {
     return _value;
   }
 
   @Override
   public int getIntValue(int dictId) {
-    return Integer.parseInt(_value);
+    return _value.intValue();
   }
 
   @Override
   public long getLongValue(int dictId) {
-    return Long.parseLong(_value);
+    return _value.longValue();
   }
 
   @Override
   public float getFloatValue(int dictId) {
-    return Float.parseFloat(_value);
+    return _value.floatValue();
   }
 
   @Override
   public double getDoubleValue(int dictId) {
-    return Double.parseDouble(_value);
+    return _value.doubleValue();
   }
 
   @Override
   public BigDecimal getBigDecimalValue(int dictId) {
-    return new BigDecimal(_value);
-  }
-
-  @Override
-  public String getStringValue(int dictId) {
     return _value;
   }
 
   @Override
-  public byte[] getBytesValue(int dictId) {
-    return BytesUtils.toBytes(_value);
+  public String getStringValue(int dictId) {
+    return _value.toPlainString();
   }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueBytesDictionary.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueBytesDictionary.java
index 1011c854f8..f6fa9e261d 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueBytesDictionary.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueBytesDictionary.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.segment.local.segment.index.readers;
 
 import java.math.BigDecimal;
+import java.util.Arrays;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.utils.BigDecimalUtils;
 import org.apache.pinot.spi.utils.ByteArray;
@@ -41,6 +42,16 @@ public class ConstantValueBytesDictionary extends 
BaseImmutableDictionary {
     return DataType.BYTES;
   }
 
+  @Override
+  public int indexOf(String stringValue) {
+    return Arrays.equals(BytesUtils.toBytes(stringValue), _value) ? 0 : 
NULL_VALUE_INDEX;
+  }
+
+  @Override
+  public int indexOf(ByteArray bytesValue) {
+    return Arrays.equals(bytesValue.getBytes(), _value) ? 0 : NULL_VALUE_INDEX;
+  }
+
   @Override
   public int insertionIndexOf(String stringValue) {
     int result = ByteArray.compare(BytesUtils.toBytes(stringValue), _value);
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueDoubleDictionary.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueDoubleDictionary.java
index d484bec2f2..5630903b17 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueDoubleDictionary.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueDoubleDictionary.java
@@ -38,6 +38,16 @@ public class ConstantValueDoubleDictionary extends 
BaseImmutableDictionary {
     return DataType.DOUBLE;
   }
 
+  @Override
+  public int indexOf(String stringValue) {
+    return Double.parseDouble(stringValue) == _value ? 0 : NULL_VALUE_INDEX;
+  }
+
+  @Override
+  public int indexOf(double doubleValue) {
+    return doubleValue == _value ? 0 : NULL_VALUE_INDEX;
+  }
+
   @Override
   public int insertionIndexOf(String stringValue) {
     double doubleValue = Double.parseDouble(stringValue);
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueFloatDictionary.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueFloatDictionary.java
index 7db856074f..6e92d59dcc 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueFloatDictionary.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueFloatDictionary.java
@@ -38,6 +38,16 @@ public class ConstantValueFloatDictionary extends 
BaseImmutableDictionary {
     return DataType.FLOAT;
   }
 
+  @Override
+  public int indexOf(String stringValue) {
+    return Float.parseFloat(stringValue) == _value ? 0 : NULL_VALUE_INDEX;
+  }
+
+  @Override
+  public int indexOf(float floatValue) {
+    return floatValue == _value ? 0 : NULL_VALUE_INDEX;
+  }
+
   @Override
   public int insertionIndexOf(String stringValue) {
     float floatValue = Float.parseFloat(stringValue);
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueIntDictionary.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueIntDictionary.java
index e7366422f6..2cbc96d460 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueIntDictionary.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueIntDictionary.java
@@ -38,6 +38,16 @@ public class ConstantValueIntDictionary extends 
BaseImmutableDictionary {
     return DataType.INT;
   }
 
+  @Override
+  public int indexOf(String stringValue) {
+    return Integer.parseInt(stringValue) == _value ? 0 : NULL_VALUE_INDEX;
+  }
+
+  @Override
+  public int indexOf(int intValue) {
+    return intValue == _value ? 0 : NULL_VALUE_INDEX;
+  }
+
   @Override
   public int insertionIndexOf(String stringValue) {
     int intValue = Integer.parseInt(stringValue);
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueLongDictionary.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueLongDictionary.java
index 94c50eb14f..a972dc154b 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueLongDictionary.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueLongDictionary.java
@@ -38,6 +38,16 @@ public class ConstantValueLongDictionary extends 
BaseImmutableDictionary {
     return DataType.LONG;
   }
 
+  @Override
+  public int indexOf(String stringValue) {
+    return Long.parseLong(stringValue) == _value ? 0 : NULL_VALUE_INDEX;
+  }
+
+  @Override
+  public int indexOf(long longValue) {
+    return longValue == _value ? 0 : NULL_VALUE_INDEX;
+  }
+
   @Override
   public int insertionIndexOf(String stringValue) {
     long longValue = Long.parseLong(stringValue);
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueStringDictionary.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueStringDictionary.java
index 6e0e854766..5139aa17ea 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueStringDictionary.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueStringDictionary.java
@@ -39,6 +39,11 @@ public class ConstantValueStringDictionary extends 
BaseImmutableDictionary {
     return DataType.STRING;
   }
 
+  @Override
+  public int indexOf(String stringValue) {
+    return stringValue.equals(_value) ? 0 : NULL_VALUE_INDEX;
+  }
+
   @Override
   public int insertionIndexOf(String stringValue) {
     int result = stringValue.compareTo(_value);
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/DoubleDictionary.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/DoubleDictionary.java
index 62777a29ee..005ab557f7 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/DoubleDictionary.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/DoubleDictionary.java
@@ -30,8 +30,8 @@ public class DoubleDictionary extends BaseImmutableDictionary 
{
   }
 
   @Override
-  public int insertionIndexOf(String stringValue) {
-    return binarySearch(Double.parseDouble(stringValue));
+  public DataType getValueType() {
+    return DataType.DOUBLE;
   }
 
   @Override
@@ -40,8 +40,8 @@ public class DoubleDictionary extends BaseImmutableDictionary 
{
   }
 
   @Override
-  public DataType getValueType() {
-    return DataType.DOUBLE;
+  public int insertionIndexOf(String stringValue) {
+    return binarySearch(Double.parseDouble(stringValue));
   }
 
   @Override
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/FloatDictionary.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/FloatDictionary.java
index 774ff5b932..df97837a12 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/FloatDictionary.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/FloatDictionary.java
@@ -30,8 +30,8 @@ public class FloatDictionary extends BaseImmutableDictionary {
   }
 
   @Override
-  public int insertionIndexOf(String stringValue) {
-    return binarySearch(Float.parseFloat(stringValue));
+  public DataType getValueType() {
+    return DataType.FLOAT;
   }
 
   @Override
@@ -40,8 +40,8 @@ public class FloatDictionary extends BaseImmutableDictionary {
   }
 
   @Override
-  public DataType getValueType() {
-    return DataType.FLOAT;
+  public int insertionIndexOf(String stringValue) {
+    return binarySearch(Float.parseFloat(stringValue));
   }
 
   @Override
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/IntDictionary.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/IntDictionary.java
index 57173f6e04..aea1ff4c1a 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/IntDictionary.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/IntDictionary.java
@@ -30,8 +30,8 @@ public class IntDictionary extends BaseImmutableDictionary {
   }
 
   @Override
-  public int insertionIndexOf(String stringValue) {
-    return binarySearch(Integer.parseInt(stringValue));
+  public DataType getValueType() {
+    return DataType.INT;
   }
 
   @Override
@@ -40,8 +40,8 @@ public class IntDictionary extends BaseImmutableDictionary {
   }
 
   @Override
-  public DataType getValueType() {
-    return DataType.INT;
+  public int insertionIndexOf(String stringValue) {
+    return binarySearch(Integer.parseInt(stringValue));
   }
 
   @Override
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/LongDictionary.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/LongDictionary.java
index f5f2b13b81..d57504c042 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/LongDictionary.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/LongDictionary.java
@@ -30,8 +30,8 @@ public class LongDictionary extends BaseImmutableDictionary {
   }
 
   @Override
-  public int insertionIndexOf(String stringValue) {
-    return binarySearch(Long.parseLong(stringValue));
+  public DataType getValueType() {
+    return DataType.LONG;
   }
 
   @Override
@@ -40,8 +40,8 @@ public class LongDictionary extends BaseImmutableDictionary {
   }
 
   @Override
-  public DataType getValueType() {
-    return DataType.LONG;
+  public int insertionIndexOf(String stringValue) {
+    return binarySearch(Long.parseLong(stringValue));
   }
 
   @Override
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BigDecimalDictionary.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapBigDecimalDictionary.java
similarity index 55%
copy from 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BigDecimalDictionary.java
copy to 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapBigDecimalDictionary.java
index 1aba0dde3a..00d071e117 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BigDecimalDictionary.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapBigDecimalDictionary.java
@@ -19,19 +19,32 @@
 package org.apache.pinot.segment.local.segment.index.readers;
 
 import java.math.BigDecimal;
+import java.util.Arrays;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.utils.BigDecimalUtils;
 
 
 /**
- * Extension of {@link BaseImmutableDictionary} that implements immutable 
dictionary for BigDecimal type.
+ * Implementation of BIG_DECIMAL dictionary that cache all values on-heap.
+ * <p>This is useful for BIG_DECIMAL columns that:
+ * <ul>
+ *   <li>Has low cardinality BIG_DECIMAL dictionary where memory footprint 
on-heap is acceptably small</li>
+ *   <li>Is heavily queried</li>
+ * </ul>
  */
-public class BigDecimalDictionary extends BaseImmutableDictionary {
+public class OnHeapBigDecimalDictionary extends BaseImmutableDictionary {
+  // NOTE: Always do binary search because BigDecimal's compareTo() is not 
consistent with equals()
+  //       E.g. compareTo(3.0, 3) returns 0 but equals(3.0, 3) returns false
+  private final BigDecimal[] _dictIdToVal;
 
-  public BigDecimalDictionary(PinotDataBuffer dataBuffer, int length, int 
numBytesPerValue) {
-    // Works with VarLengthValueBuffer only.
+  public OnHeapBigDecimalDictionary(PinotDataBuffer dataBuffer, int length, 
int numBytesPerValue) {
     super(dataBuffer, length, numBytesPerValue, (byte) 0);
+
+    _dictIdToVal = new BigDecimal[length];
+    for (int dictId = 0; dictId < length; dictId++) {
+      _dictIdToVal[dictId] = getBigDecimal(dictId);
+    }
   }
 
   @Override
@@ -40,57 +53,52 @@ public class BigDecimalDictionary extends 
BaseImmutableDictionary {
   }
 
   @Override
-  public int insertionIndexOf(String stringValue) {
-    return binarySearch(new BigDecimal(stringValue));
-  }
-
-  @Override
-  public BigDecimal getMinVal() {
-    return BigDecimalUtils.deserialize(getBytes(0));
+  public int indexOf(BigDecimal bigDecimalValue) {
+    return normalizeIndex(Arrays.binarySearch(_dictIdToVal, bigDecimalValue));
   }
 
   @Override
-  public BigDecimal getMaxVal() {
-    return BigDecimalUtils.deserialize(getBytes(length() - 1));
+  public int insertionIndexOf(String stringValue) {
+    return Arrays.binarySearch(_dictIdToVal, new BigDecimal(stringValue));
   }
 
   @Override
   public BigDecimal get(int dictId) {
-    return getBigDecimal(dictId);
+    return _dictIdToVal[dictId];
   }
 
   @Override
   public int getIntValue(int dictId) {
-    return get(dictId).intValue();
+    return _dictIdToVal[dictId].intValue();
   }
 
   @Override
   public long getLongValue(int dictId) {
-    return get(dictId).longValue();
+    return _dictIdToVal[dictId].longValue();
   }
 
   @Override
   public float getFloatValue(int dictId) {
-    return get(dictId).floatValue();
+    return _dictIdToVal[dictId].floatValue();
   }
 
   @Override
   public double getDoubleValue(int dictId) {
-    return get(dictId).doubleValue();
+    return _dictIdToVal[dictId].doubleValue();
   }
 
   @Override
   public BigDecimal getBigDecimalValue(int dictId) {
-    return getBigDecimal(dictId);
+    return _dictIdToVal[dictId];
   }
 
   @Override
   public String getStringValue(int dictId) {
-    return get(dictId).toPlainString();
+    return _dictIdToVal[dictId].toPlainString();
   }
 
   @Override
   public byte[] getBytesValue(int dictId) {
-    return getBytes(dictId);
+    return BigDecimalUtils.serialize(_dictIdToVal[dictId]);
   }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BytesDictionary.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapBytesDictionary.java
similarity index 55%
copy from 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BytesDictionary.java
copy to 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapBytesDictionary.java
index ad78a724ae..7ba5ef51e6 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BytesDictionary.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapBytesDictionary.java
@@ -18,7 +18,10 @@
  */
 package org.apache.pinot.segment.local.segment.index.readers;
 
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
 import java.math.BigDecimal;
+import java.util.Arrays;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.utils.BigDecimalUtils;
@@ -27,12 +30,29 @@ import org.apache.pinot.spi.utils.BytesUtils;
 
 
 /**
- * Extension of {@link BaseImmutableDictionary} that implements immutable 
dictionary for byte[] type.
+ * Implementation of BYTES dictionary that cache all values on-heap.
+ * <p>This is useful for BYTES columns that:
+ * <ul>
+ *   <li>Has low cardinality BYTES dictionary where memory footprint on-heap 
is acceptably small</li>
+ *   <li>Is heavily queried</li>
+ * </ul>
  */
-public class BytesDictionary extends BaseImmutableDictionary {
+public class OnHeapBytesDictionary extends BaseImmutableDictionary {
+  private final Object2IntOpenHashMap<ByteArray> _valToDictId;
+  private final ByteArray[] _dictIdToVal;
 
-  public BytesDictionary(PinotDataBuffer dataBuffer, int length, int 
numBytesPerValue) {
+  public OnHeapBytesDictionary(PinotDataBuffer dataBuffer, int length, int 
numBytesPerValue) {
     super(dataBuffer, length, numBytesPerValue, (byte) 0);
+
+    _valToDictId = new Object2IntOpenHashMap<>(length);
+    _valToDictId.defaultReturnValue(Dictionary.NULL_VALUE_INDEX);
+    _dictIdToVal = new ByteArray[length];
+
+    for (int dictId = 0; dictId < length; dictId++) {
+      ByteArray value = new ByteArray(getBytes(dictId));
+      _dictIdToVal[dictId] = value;
+      _valToDictId.put(value, dictId);
+    }
   }
 
   @Override
@@ -41,28 +61,30 @@ public class BytesDictionary extends 
BaseImmutableDictionary {
   }
 
   @Override
-  public int insertionIndexOf(String stringValue) {
-    return binarySearch(BytesUtils.toBytes(stringValue));
+  public int indexOf(String stringValue) {
+    return _valToDictId.getInt(BytesUtils.toByteArray(stringValue));
   }
 
   @Override
-  public ByteArray getMinVal() {
-    return new ByteArray(getBytes(0));
+  public int indexOf(ByteArray bytesValue) {
+    return _valToDictId.getInt(bytesValue);
   }
 
   @Override
-  public ByteArray getMaxVal() {
-    return new ByteArray(getBytes(length() - 1));
+  public int insertionIndexOf(String stringValue) {
+    ByteArray byteArray = BytesUtils.toByteArray(stringValue);
+    int index = _valToDictId.getInt(byteArray);
+    return (index != Dictionary.NULL_VALUE_INDEX) ? index : 
Arrays.binarySearch(_dictIdToVal, byteArray);
   }
 
   @Override
   public byte[] get(int dictId) {
-    return getBytes(dictId);
+    return _dictIdToVal[dictId].getBytes();
   }
 
   @Override
   public Object getInternal(int dictId) {
-    return new ByteArray(getBytes(dictId));
+    return _dictIdToVal[dictId];
   }
 
   @Override
@@ -87,16 +109,16 @@ public class BytesDictionary extends 
BaseImmutableDictionary {
 
   @Override
   public BigDecimal getBigDecimalValue(int dictId) {
-    return BigDecimalUtils.deserialize(getBytes(dictId));
+    return BigDecimalUtils.deserialize(_dictIdToVal[dictId].getBytes());
   }
 
   @Override
   public String getStringValue(int dictId) {
-    return BytesUtils.toHexString(getBytes(dictId));
+    return BytesUtils.toHexString(_dictIdToVal[dictId].getBytes());
   }
 
   @Override
   public byte[] getBytesValue(int dictId) {
-    return getBytes(dictId);
+    return _dictIdToVal[dictId].getBytes();
   }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapDictionary.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapDictionary.java
deleted file mode 100644
index 0390397a9c..0000000000
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapDictionary.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.segment.local.segment.index.readers;
-
-import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
-
-
-/**
- * Base class for on-heap immutable dictionary.
- */
-public abstract class OnHeapDictionary extends BaseImmutableDictionary {
-
-  protected OnHeapDictionary(PinotDataBuffer dataBuffer, int length, int 
numBytesPerValue, byte paddingByte) {
-    super(dataBuffer, length, numBytesPerValue, paddingByte);
-  }
-}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapDoubleDictionary.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapDoubleDictionary.java
index 4b93443c4b..61e06499aa 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapDoubleDictionary.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapDoubleDictionary.java
@@ -35,7 +35,7 @@ import org.apache.pinot.spi.data.FieldSpec.DataType;
  * </ul>
  * <p>This helps avoid creation of double from byte[].
  */
-public class OnHeapDoubleDictionary extends OnHeapDictionary {
+public class OnHeapDoubleDictionary extends BaseImmutableDictionary {
   private final Double2IntOpenHashMap _valToDictId;
   private final double[] _dictIdToVal;
 
@@ -60,13 +60,6 @@ public class OnHeapDoubleDictionary extends OnHeapDictionary 
{
     }
   }
 
-  @Override
-  public int insertionIndexOf(String stringValue) {
-    double doubleValue = Double.parseDouble(stringValue);
-    int index = _valToDictId.get(doubleValue);
-    return (index != Dictionary.NULL_VALUE_INDEX) ? index : 
Arrays.binarySearch(_dictIdToVal, doubleValue);
-  }
-
   @Override
   public DataType getValueType() {
     return DataType.DOUBLE;
@@ -77,6 +70,18 @@ public class OnHeapDoubleDictionary extends OnHeapDictionary 
{
     return _valToDictId.get(Double.parseDouble(stringValue));
   }
 
+  @Override
+  public int indexOf(double doubleValue) {
+    return _valToDictId.get(doubleValue);
+  }
+
+  @Override
+  public int insertionIndexOf(String stringValue) {
+    double doubleValue = Double.parseDouble(stringValue);
+    int index = _valToDictId.get(doubleValue);
+    return (index != Dictionary.NULL_VALUE_INDEX) ? index : 
Arrays.binarySearch(_dictIdToVal, doubleValue);
+  }
+
   @Override
   public Double get(int dictId) {
     return _dictIdToVal[dictId];
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapFloatDictionary.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapFloatDictionary.java
index 5a6e0ff380..581e61018c 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapFloatDictionary.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapFloatDictionary.java
@@ -35,7 +35,7 @@ import org.apache.pinot.spi.data.FieldSpec.DataType;
  * </ul>
  * <p>This helps avoid creation of float from byte[].
  */
-public class OnHeapFloatDictionary extends OnHeapDictionary {
+public class OnHeapFloatDictionary extends BaseImmutableDictionary {
   private final Float2IntOpenHashMap _valToDictId;
   private final float[] _dictIdToVal;
 
@@ -60,13 +60,6 @@ public class OnHeapFloatDictionary extends OnHeapDictionary {
     }
   }
 
-  @Override
-  public int insertionIndexOf(String stringValue) {
-    float floatValue = Float.parseFloat(stringValue);
-    int index = _valToDictId.get(floatValue);
-    return (index != Dictionary.NULL_VALUE_INDEX) ? index : 
Arrays.binarySearch(_dictIdToVal, floatValue);
-  }
-
   @Override
   public DataType getValueType() {
     return DataType.FLOAT;
@@ -77,6 +70,18 @@ public class OnHeapFloatDictionary extends OnHeapDictionary {
     return _valToDictId.get(Float.parseFloat(stringValue));
   }
 
+  @Override
+  public int indexOf(float floatValue) {
+    return _valToDictId.get(floatValue);
+  }
+
+  @Override
+  public int insertionIndexOf(String stringValue) {
+    float floatValue = Float.parseFloat(stringValue);
+    int index = _valToDictId.get(floatValue);
+    return (index != Dictionary.NULL_VALUE_INDEX) ? index : 
Arrays.binarySearch(_dictIdToVal, floatValue);
+  }
+
   @Override
   public Float get(int dictId) {
     return _dictIdToVal[dictId];
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapIntDictionary.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapIntDictionary.java
index a839622114..19cc196ade 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapIntDictionary.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapIntDictionary.java
@@ -35,7 +35,7 @@ import org.apache.pinot.spi.data.FieldSpec.DataType;
  * </ul>
  * <p>This helps avoid creation of int from byte[].
  */
-public class OnHeapIntDictionary extends OnHeapDictionary {
+public class OnHeapIntDictionary extends BaseImmutableDictionary {
   private final Int2IntOpenHashMap _valToDictId;
   private final int[] _dictIdToVal;
 
@@ -60,13 +60,6 @@ public class OnHeapIntDictionary extends OnHeapDictionary {
     }
   }
 
-  @Override
-  public int insertionIndexOf(String stringValue) {
-    int intValue = Integer.parseInt(stringValue);
-    int index = _valToDictId.get(intValue);
-    return (index != Dictionary.NULL_VALUE_INDEX) ? index : 
Arrays.binarySearch(_dictIdToVal, intValue);
-  }
-
   @Override
   public DataType getValueType() {
     return DataType.INT;
@@ -77,6 +70,18 @@ public class OnHeapIntDictionary extends OnHeapDictionary {
     return _valToDictId.get(Integer.parseInt(stringValue));
   }
 
+  @Override
+  public int indexOf(int intValue) {
+    return _valToDictId.get(intValue);
+  }
+
+  @Override
+  public int insertionIndexOf(String stringValue) {
+    int intValue = Integer.parseInt(stringValue);
+    int index = _valToDictId.get(intValue);
+    return (index != Dictionary.NULL_VALUE_INDEX) ? index : 
Arrays.binarySearch(_dictIdToVal, intValue);
+  }
+
   @Override
   public Integer get(int dictId) {
     return _dictIdToVal[dictId];
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapLongDictionary.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapLongDictionary.java
index 7f4097a39d..f122f3ed96 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapLongDictionary.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapLongDictionary.java
@@ -35,7 +35,7 @@ import org.apache.pinot.spi.data.FieldSpec.DataType;
  * </ul>
  * <p>This helps avoid creation of Long from byte[].
  */
-public class OnHeapLongDictionary extends OnHeapDictionary {
+public class OnHeapLongDictionary extends BaseImmutableDictionary {
   private final Long2IntOpenHashMap _valToDictId;
   private final long[] _dictIdToVal;
 
@@ -60,13 +60,6 @@ public class OnHeapLongDictionary extends OnHeapDictionary {
     }
   }
 
-  @Override
-  public int insertionIndexOf(String stringValue) {
-    long longValue = Long.parseLong(stringValue);
-    int index = _valToDictId.get(longValue);
-    return (index != Dictionary.NULL_VALUE_INDEX) ? index : 
Arrays.binarySearch(_dictIdToVal, longValue);
-  }
-
   @Override
   public DataType getValueType() {
     return DataType.LONG;
@@ -77,6 +70,18 @@ public class OnHeapLongDictionary extends OnHeapDictionary {
     return _valToDictId.get(Long.parseLong(stringValue));
   }
 
+  @Override
+  public int indexOf(long longValue) {
+    return _valToDictId.get(longValue);
+  }
+
+  @Override
+  public int insertionIndexOf(String stringValue) {
+    long longValue = Long.parseLong(stringValue);
+    int index = _valToDictId.get(longValue);
+    return (index != Dictionary.NULL_VALUE_INDEX) ? index : 
Arrays.binarySearch(_dictIdToVal, longValue);
+  }
+
   @Override
   public Long get(int dictId) {
     return _dictIdToVal[dictId];
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapStringDictionary.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapStringDictionary.java
index d0d07629d9..68ef6da2f5 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapStringDictionary.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapStringDictionary.java
@@ -36,7 +36,7 @@ import org.apache.pinot.spi.utils.BytesUtils;
  * </ul>
  * <p>This helps avoid creation of String from byte[], which is expensive as 
well as creates garbage.
  */
-public class OnHeapStringDictionary extends OnHeapDictionary {
+public class OnHeapStringDictionary extends BaseImmutableDictionary {
   private final byte _paddingByte;
   private final String[] _unpaddedStrings;
   private final Object2IntOpenHashMap<String> _unPaddedStringToIdMap;
@@ -67,6 +67,16 @@ public class OnHeapStringDictionary extends OnHeapDictionary 
{
     }
   }
 
+  @Override
+  public DataType getValueType() {
+    return DataType.STRING;
+  }
+
+  @Override
+  public int indexOf(String stringValue) {
+    return _unPaddedStringToIdMap.getInt(stringValue);
+  }
+
   /**
    * WARNING: With non-zero padding byte, binary search result might not 
reflect the real insertion index for the value.
    * E.g. with padding byte 'b', if unpadded value "aa" is in the dictionary, 
and stored as "aab", then unpadded value
@@ -85,16 +95,6 @@ public class OnHeapStringDictionary extends OnHeapDictionary 
{
     }
   }
 
-  @Override
-  public DataType getValueType() {
-    return DataType.STRING;
-  }
-
-  @Override
-  public int indexOf(String stringValue) {
-    return _unPaddedStringToIdMap.getInt(stringValue);
-  }
-
   @Override
   public String get(int dictId) {
     return _unpaddedStrings[dictId];
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/StringDictionary.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/StringDictionary.java
index 1df28c7549..7e167a1b69 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/StringDictionary.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/StringDictionary.java
@@ -31,13 +31,13 @@ public class StringDictionary extends 
BaseImmutableDictionary {
   }
 
   @Override
-  public int insertionIndexOf(String stringValue) {
-    return binarySearch(stringValue);
+  public DataType getValueType() {
+    return DataType.STRING;
   }
 
   @Override
-  public DataType getValueType() {
-    return DataType.STRING;
+  public int insertionIndexOf(String stringValue) {
+    return binarySearch(stringValue);
   }
 
   @Override
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readers/ImmutableDictionaryTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readers/ImmutableDictionaryTest.java
index 182f07da34..943c841ef8 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readers/ImmutableDictionaryTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readers/ImmutableDictionaryTest.java
@@ -35,7 +35,7 @@ import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentDictionaryCrea
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.spi.data.DimensionFieldSpec;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.MetricFieldSpec;
 import org.apache.pinot.spi.utils.BigDecimalUtils;
 import org.apache.pinot.spi.utils.ByteArray;
@@ -132,44 +132,40 @@ public class ImmutableDictionaryTest {
     Arrays.sort(_bytesValues);
 
     try (SegmentDictionaryCreator dictionaryCreator = new 
SegmentDictionaryCreator(_intValues,
-        new DimensionFieldSpec(INT_COLUMN_NAME, FieldSpec.DataType.INT, true), 
TEMP_DIR)) {
+        new DimensionFieldSpec(INT_COLUMN_NAME, DataType.INT, true), 
TEMP_DIR)) {
       dictionaryCreator.build();
     }
 
     try (SegmentDictionaryCreator dictionaryCreator = new 
SegmentDictionaryCreator(_longValues,
-        new DimensionFieldSpec(LONG_COLUMN_NAME, FieldSpec.DataType.LONG, 
true), TEMP_DIR)) {
+        new DimensionFieldSpec(LONG_COLUMN_NAME, DataType.LONG, true), 
TEMP_DIR)) {
       dictionaryCreator.build();
     }
 
     try (SegmentDictionaryCreator dictionaryCreator = new 
SegmentDictionaryCreator(_floatValues,
-        new DimensionFieldSpec(FLOAT_COLUMN_NAME, FieldSpec.DataType.FLOAT, 
true), TEMP_DIR)) {
+        new DimensionFieldSpec(FLOAT_COLUMN_NAME, DataType.FLOAT, true), 
TEMP_DIR)) {
       dictionaryCreator.build();
     }
 
     try (SegmentDictionaryCreator dictionaryCreator = new 
SegmentDictionaryCreator(_doubleValues,
-        new DimensionFieldSpec(DOUBLE_COLUMN_NAME, FieldSpec.DataType.DOUBLE, 
true), TEMP_DIR)) {
+        new DimensionFieldSpec(DOUBLE_COLUMN_NAME, DataType.DOUBLE, true), 
TEMP_DIR)) {
       dictionaryCreator.build();
     }
 
     // Note: BigDecimalDictionary requires setting useVarLengthDictionary to 
true.
     boolean useVarLengthDictionary = true;
-    MetricFieldSpec bigDecimalMetricField =
-        new MetricFieldSpec(BIG_DECIMAL_COLUMN_NAME, 
FieldSpec.DataType.BIG_DECIMAL);
-    bigDecimalMetricField.setSingleValueField(true);
     try (SegmentDictionaryCreator dictionaryCreator = new 
SegmentDictionaryCreator(_bigDecimalValues,
-        bigDecimalMetricField, TEMP_DIR,
-        useVarLengthDictionary)) {
+        new MetricFieldSpec(BIG_DECIMAL_COLUMN_NAME, DataType.BIG_DECIMAL), 
TEMP_DIR, useVarLengthDictionary)) {
       dictionaryCreator.build();
     }
 
     try (SegmentDictionaryCreator dictionaryCreator = new 
SegmentDictionaryCreator(_stringValues,
-        new DimensionFieldSpec(STRING_COLUMN_NAME, FieldSpec.DataType.STRING, 
true), TEMP_DIR)) {
+        new DimensionFieldSpec(STRING_COLUMN_NAME, DataType.STRING, true), 
TEMP_DIR)) {
       dictionaryCreator.build();
       _numBytesPerStringValue = dictionaryCreator.getNumBytesPerEntry();
     }
 
     try (SegmentDictionaryCreator dictionaryCreator = new 
SegmentDictionaryCreator(_bytesValues,
-        new DimensionFieldSpec(BYTES_COLUMN_NAME, FieldSpec.DataType.BYTES, 
true), TEMP_DIR)) {
+        new DimensionFieldSpec(BYTES_COLUMN_NAME, DataType.BYTES, true), 
TEMP_DIR)) {
       dictionaryCreator.build();
       assertEquals(dictionaryCreator.getNumBytesPerEntry(), BYTES_LENGTH);
     }
@@ -215,9 +211,8 @@ public class ImmutableDictionaryTest {
   @Test
   public void testLongDictionary()
       throws Exception {
-    try (LongDictionary longDictionary = new LongDictionary(PinotDataBuffer
-        .mapReadOnlyBigEndianFile(new File(TEMP_DIR, LONG_COLUMN_NAME + 
V1Constants.Dict.FILE_EXTENSION)),
-        NUM_VALUES)) {
+    try (LongDictionary longDictionary = new 
LongDictionary(PinotDataBuffer.mapReadOnlyBigEndianFile(
+        new File(TEMP_DIR, LONG_COLUMN_NAME + 
V1Constants.Dict.FILE_EXTENSION)), NUM_VALUES)) {
       testLongDictionary(longDictionary);
     }
   }
@@ -225,9 +220,8 @@ public class ImmutableDictionaryTest {
   @Test
   public void testOnHeapLongDictionary()
       throws Exception {
-    try (OnHeapLongDictionary onHeapLongDictionary = new 
OnHeapLongDictionary(PinotDataBuffer
-        .mapReadOnlyBigEndianFile(new File(TEMP_DIR, LONG_COLUMN_NAME + 
V1Constants.Dict.FILE_EXTENSION)),
-        NUM_VALUES)) {
+    try (OnHeapLongDictionary onHeapLongDictionary = new 
OnHeapLongDictionary(PinotDataBuffer.mapReadOnlyBigEndianFile(
+        new File(TEMP_DIR, LONG_COLUMN_NAME + 
V1Constants.Dict.FILE_EXTENSION)), NUM_VALUES)) {
       testLongDictionary(onHeapLongDictionary);
     }
   }
@@ -252,9 +246,8 @@ public class ImmutableDictionaryTest {
   @Test
   public void testFloatDictionary()
       throws Exception {
-    try (FloatDictionary floatDictionary = new FloatDictionary(PinotDataBuffer
-        .mapReadOnlyBigEndianFile(new File(TEMP_DIR, FLOAT_COLUMN_NAME + 
V1Constants.Dict.FILE_EXTENSION)),
-        NUM_VALUES)) {
+    try (FloatDictionary floatDictionary = new 
FloatDictionary(PinotDataBuffer.mapReadOnlyBigEndianFile(
+        new File(TEMP_DIR, FLOAT_COLUMN_NAME + 
V1Constants.Dict.FILE_EXTENSION)), NUM_VALUES)) {
       testFloatDictionary(floatDictionary);
     }
   }
@@ -262,9 +255,9 @@ public class ImmutableDictionaryTest {
   @Test
   public void testOnHeapFloatDictionary()
       throws Exception {
-    try (OnHeapFloatDictionary onHeapFloatDictionary = new 
OnHeapFloatDictionary(PinotDataBuffer
-        .mapReadOnlyBigEndianFile(new File(TEMP_DIR, FLOAT_COLUMN_NAME + 
V1Constants.Dict.FILE_EXTENSION)),
-        NUM_VALUES)) {
+    try (OnHeapFloatDictionary onHeapFloatDictionary = new 
OnHeapFloatDictionary(
+        PinotDataBuffer.mapReadOnlyBigEndianFile(
+            new File(TEMP_DIR, FLOAT_COLUMN_NAME + 
V1Constants.Dict.FILE_EXTENSION)), NUM_VALUES)) {
       testFloatDictionary(onHeapFloatDictionary);
     }
   }
@@ -289,9 +282,8 @@ public class ImmutableDictionaryTest {
   @Test
   public void testDoubleDictionary()
       throws Exception {
-    try (DoubleDictionary doubleDictionary = new 
DoubleDictionary(PinotDataBuffer
-        .mapReadOnlyBigEndianFile(new File(TEMP_DIR, DOUBLE_COLUMN_NAME + 
V1Constants.Dict.FILE_EXTENSION)),
-        NUM_VALUES)) {
+    try (DoubleDictionary doubleDictionary = new 
DoubleDictionary(PinotDataBuffer.mapReadOnlyBigEndianFile(
+        new File(TEMP_DIR, DOUBLE_COLUMN_NAME + 
V1Constants.Dict.FILE_EXTENSION)), NUM_VALUES)) {
       testDoubleDictionary(doubleDictionary);
     }
   }
@@ -299,9 +291,9 @@ public class ImmutableDictionaryTest {
   @Test
   public void testOnHeapDoubleDictionary()
       throws Exception {
-    try (OnHeapDoubleDictionary onHeapDoubleDictionary = new 
OnHeapDoubleDictionary(PinotDataBuffer
-        .mapReadOnlyBigEndianFile(new File(TEMP_DIR, DOUBLE_COLUMN_NAME + 
V1Constants.Dict.FILE_EXTENSION)),
-        NUM_VALUES)) {
+    try (OnHeapDoubleDictionary onHeapDoubleDictionary = new 
OnHeapDoubleDictionary(
+        PinotDataBuffer.mapReadOnlyBigEndianFile(
+            new File(TEMP_DIR, DOUBLE_COLUMN_NAME + 
V1Constants.Dict.FILE_EXTENSION)), NUM_VALUES)) {
       testDoubleDictionary(onHeapDoubleDictionary);
     }
   }
@@ -326,13 +318,24 @@ public class ImmutableDictionaryTest {
   @Test
   public void testBigDecimalDictionary()
       throws Exception {
-    try (BigDecimalDictionary bigDecimalDictionary = new 
BigDecimalDictionary(PinotDataBuffer
-        .mapReadOnlyBigEndianFile(new File(TEMP_DIR, BIG_DECIMAL_COLUMN_NAME + 
V1Constants.Dict.FILE_EXTENSION)),
-        NUM_VALUES, _bigDecimalByteLength)) {
+    try (BigDecimalDictionary bigDecimalDictionary = new 
BigDecimalDictionary(PinotDataBuffer.mapReadOnlyBigEndianFile(
+        new File(TEMP_DIR, BIG_DECIMAL_COLUMN_NAME + 
V1Constants.Dict.FILE_EXTENSION)), NUM_VALUES,
+        _bigDecimalByteLength)) {
       testBigDecimalDictionary(bigDecimalDictionary);
     }
   }
 
+  @Test
+  public void testOnHeapBigDecimalDictionary()
+      throws Exception {
+    try (OnHeapBigDecimalDictionary onHeapBigDecimalDictionary = new 
OnHeapBigDecimalDictionary(
+        PinotDataBuffer.mapReadOnlyBigEndianFile(
+            new File(TEMP_DIR, BIG_DECIMAL_COLUMN_NAME + 
V1Constants.Dict.FILE_EXTENSION)), NUM_VALUES,
+        _bigDecimalByteLength)) {
+      testBigDecimalDictionary(onHeapBigDecimalDictionary);
+    }
+  }
+
   private void testBigDecimalDictionary(BaseImmutableDictionary 
bigDecimalDictionary) {
     for (int i = 0; i < NUM_VALUES; i++) {
       assertEquals(bigDecimalDictionary.get(i), _bigDecimalValues[i]);
@@ -354,9 +357,9 @@ public class ImmutableDictionaryTest {
   @Test
   public void testStringDictionary()
       throws Exception {
-    try (StringDictionary stringDictionary = new 
StringDictionary(PinotDataBuffer
-        .mapReadOnlyBigEndianFile(new File(TEMP_DIR, STRING_COLUMN_NAME + 
V1Constants.Dict.FILE_EXTENSION)), NUM_VALUES,
-        _numBytesPerStringValue, (byte) 0)) {
+    try (StringDictionary stringDictionary = new 
StringDictionary(PinotDataBuffer.mapReadOnlyBigEndianFile(
+        new File(TEMP_DIR, STRING_COLUMN_NAME + 
V1Constants.Dict.FILE_EXTENSION)), NUM_VALUES, _numBytesPerStringValue,
+        (byte) 0)) {
       testStringDictionary(stringDictionary);
     }
   }
@@ -364,8 +367,9 @@ public class ImmutableDictionaryTest {
   @Test
   public void testOnHeapStringDictionary()
       throws Exception {
-    try (OnHeapStringDictionary onHeapStringDictionary = new 
OnHeapStringDictionary(PinotDataBuffer
-        .mapReadOnlyBigEndianFile(new File(TEMP_DIR, STRING_COLUMN_NAME + 
V1Constants.Dict.FILE_EXTENSION)), NUM_VALUES,
+    try (OnHeapStringDictionary onHeapStringDictionary = new 
OnHeapStringDictionary(
+        PinotDataBuffer.mapReadOnlyBigEndianFile(
+            new File(TEMP_DIR, STRING_COLUMN_NAME + 
V1Constants.Dict.FILE_EXTENSION)), NUM_VALUES,
         _numBytesPerStringValue, (byte) 0)) {
       testStringDictionary(onHeapStringDictionary);
     }
@@ -387,13 +391,22 @@ public class ImmutableDictionaryTest {
   @Test
   public void testBytesDictionary()
       throws Exception {
-    try (BytesDictionary bytesDictionary = new BytesDictionary(PinotDataBuffer
-        .mapReadOnlyBigEndianFile(new File(TEMP_DIR, BYTES_COLUMN_NAME + 
V1Constants.Dict.FILE_EXTENSION)), NUM_VALUES,
-        BYTES_LENGTH)) {
+    try (BytesDictionary bytesDictionary = new 
BytesDictionary(PinotDataBuffer.mapReadOnlyBigEndianFile(
+        new File(TEMP_DIR, BYTES_COLUMN_NAME + 
V1Constants.Dict.FILE_EXTENSION)), NUM_VALUES, BYTES_LENGTH)) {
       testBytesDictionary(bytesDictionary);
     }
   }
 
+  @Test
+  public void testOnHeapBytesDictionary()
+      throws Exception {
+    try (OnHeapBytesDictionary onHeapBytesDictionary = new 
OnHeapBytesDictionary(
+        PinotDataBuffer.mapReadOnlyBigEndianFile(
+            new File(TEMP_DIR, BYTES_COLUMN_NAME + 
V1Constants.Dict.FILE_EXTENSION)), NUM_VALUES, BYTES_LENGTH)) {
+      testBytesDictionary(onHeapBytesDictionary);
+    }
+  }
+
   private void testBytesDictionary(BaseImmutableDictionary bytesDictionary) {
     for (int i = 0; i < NUM_VALUES; i++) {
       assertEquals(bytesDictionary.get(i), _bytesValues[i].getBytes());
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readers/ImmutableDictionaryTypeConversionTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readers/ImmutableDictionaryTypeConversionTest.java
index 31b17677b3..b89c0cdd6a 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readers/ImmutableDictionaryTypeConversionTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readers/ImmutableDictionaryTypeConversionTest.java
@@ -28,7 +28,7 @@ import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentDictionaryCrea
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.spi.data.DimensionFieldSpec;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.MetricFieldSpec;
 import org.apache.pinot.spi.utils.ArrayCopyUtils;
 import org.apache.pinot.spi.utils.BigDecimalUtils;
@@ -113,41 +113,38 @@ public class ImmutableDictionaryTypeConversionTest {
     }
 
     try (SegmentDictionaryCreator dictionaryCreator = new 
SegmentDictionaryCreator(_intValues,
-        new DimensionFieldSpec(INT_COLUMN_NAME, FieldSpec.DataType.INT, true), 
TEMP_DIR)) {
+        new DimensionFieldSpec(INT_COLUMN_NAME, DataType.INT, true), 
TEMP_DIR)) {
       dictionaryCreator.build();
     }
 
     try (SegmentDictionaryCreator dictionaryCreator = new 
SegmentDictionaryCreator(_longValues,
-        new DimensionFieldSpec(LONG_COLUMN_NAME, FieldSpec.DataType.LONG, 
true), TEMP_DIR)) {
+        new DimensionFieldSpec(LONG_COLUMN_NAME, DataType.LONG, true), 
TEMP_DIR)) {
       dictionaryCreator.build();
     }
 
     try (SegmentDictionaryCreator dictionaryCreator = new 
SegmentDictionaryCreator(_floatValues,
-        new DimensionFieldSpec(FLOAT_COLUMN_NAME, FieldSpec.DataType.FLOAT, 
true), TEMP_DIR)) {
+        new DimensionFieldSpec(FLOAT_COLUMN_NAME, DataType.FLOAT, true), 
TEMP_DIR)) {
       dictionaryCreator.build();
     }
 
     try (SegmentDictionaryCreator dictionaryCreator = new 
SegmentDictionaryCreator(_doubleValues,
-        new DimensionFieldSpec(DOUBLE_COLUMN_NAME, FieldSpec.DataType.DOUBLE, 
true), TEMP_DIR)) {
+        new DimensionFieldSpec(DOUBLE_COLUMN_NAME, DataType.DOUBLE, true), 
TEMP_DIR)) {
       dictionaryCreator.build();
     }
 
-    MetricFieldSpec bigDecimalMetricField = new 
MetricFieldSpec(BIG_DECIMAL_COLUMN_NAME,
-        FieldSpec.DataType.BIG_DECIMAL);
-    bigDecimalMetricField.setSingleValueField(true);
     try (SegmentDictionaryCreator dictionaryCreator = new 
SegmentDictionaryCreator(_bigDecimalValues,
-        bigDecimalMetricField, TEMP_DIR)) {
+        new MetricFieldSpec(BIG_DECIMAL_COLUMN_NAME, DataType.BIG_DECIMAL), 
TEMP_DIR)) {
       dictionaryCreator.build();
     }
 
     try (SegmentDictionaryCreator dictionaryCreator = new 
SegmentDictionaryCreator(_stringValues,
-        new DimensionFieldSpec(STRING_COLUMN_NAME, FieldSpec.DataType.STRING, 
true), TEMP_DIR)) {
+        new DimensionFieldSpec(STRING_COLUMN_NAME, DataType.STRING, true), 
TEMP_DIR)) {
       dictionaryCreator.build();
       assertEquals(dictionaryCreator.getNumBytesPerEntry(), STRING_LENGTH);
     }
 
     try (SegmentDictionaryCreator dictionaryCreator = new 
SegmentDictionaryCreator(_bytesValues,
-        new DimensionFieldSpec(BYTES_COLUMN_NAME, FieldSpec.DataType.BYTES, 
true), TEMP_DIR)) {
+        new DimensionFieldSpec(BYTES_COLUMN_NAME, DataType.BYTES, true), 
TEMP_DIR)) {
       dictionaryCreator.build();
       assertEquals(dictionaryCreator.getNumBytesPerEntry(), BYTES_LENGTH);
     }
@@ -188,9 +185,8 @@ public class ImmutableDictionaryTypeConversionTest {
   @Test
   public void testLongDictionary()
       throws Exception {
-    try (LongDictionary longDictionary = new LongDictionary(PinotDataBuffer
-        .mapReadOnlyBigEndianFile(new File(TEMP_DIR, LONG_COLUMN_NAME + 
V1Constants.Dict.FILE_EXTENSION)),
-        NUM_VALUES)) {
+    try (LongDictionary longDictionary = new 
LongDictionary(PinotDataBuffer.mapReadOnlyBigEndianFile(
+        new File(TEMP_DIR, LONG_COLUMN_NAME + 
V1Constants.Dict.FILE_EXTENSION)), NUM_VALUES)) {
       testNumericDictionary(longDictionary);
     }
   }
@@ -198,9 +194,8 @@ public class ImmutableDictionaryTypeConversionTest {
   @Test
   public void testOnHeapLongDictionary()
       throws Exception {
-    try (OnHeapLongDictionary onHeapLongDictionary = new 
OnHeapLongDictionary(PinotDataBuffer
-        .mapReadOnlyBigEndianFile(new File(TEMP_DIR, LONG_COLUMN_NAME + 
V1Constants.Dict.FILE_EXTENSION)),
-        NUM_VALUES)) {
+    try (OnHeapLongDictionary onHeapLongDictionary = new 
OnHeapLongDictionary(PinotDataBuffer.mapReadOnlyBigEndianFile(
+        new File(TEMP_DIR, LONG_COLUMN_NAME + 
V1Constants.Dict.FILE_EXTENSION)), NUM_VALUES)) {
       testNumericDictionary(onHeapLongDictionary);
     }
   }
@@ -208,9 +203,8 @@ public class ImmutableDictionaryTypeConversionTest {
   @Test
   public void testFloatDictionary()
       throws Exception {
-    try (FloatDictionary floatDictionary = new FloatDictionary(PinotDataBuffer
-        .mapReadOnlyBigEndianFile(new File(TEMP_DIR, FLOAT_COLUMN_NAME + 
V1Constants.Dict.FILE_EXTENSION)),
-        NUM_VALUES)) {
+    try (FloatDictionary floatDictionary = new 
FloatDictionary(PinotDataBuffer.mapReadOnlyBigEndianFile(
+        new File(TEMP_DIR, FLOAT_COLUMN_NAME + 
V1Constants.Dict.FILE_EXTENSION)), NUM_VALUES)) {
       testNumericDictionary(floatDictionary);
     }
   }
@@ -218,9 +212,9 @@ public class ImmutableDictionaryTypeConversionTest {
   @Test
   public void testOnHeapFloatDictionary()
       throws Exception {
-    try (OnHeapFloatDictionary onHeapFloatDictionary = new 
OnHeapFloatDictionary(PinotDataBuffer
-        .mapReadOnlyBigEndianFile(new File(TEMP_DIR, FLOAT_COLUMN_NAME + 
V1Constants.Dict.FILE_EXTENSION)),
-        NUM_VALUES)) {
+    try (OnHeapFloatDictionary onHeapFloatDictionary = new 
OnHeapFloatDictionary(
+        PinotDataBuffer.mapReadOnlyBigEndianFile(
+            new File(TEMP_DIR, FLOAT_COLUMN_NAME + 
V1Constants.Dict.FILE_EXTENSION)), NUM_VALUES)) {
       testNumericDictionary(onHeapFloatDictionary);
     }
   }
@@ -228,9 +222,8 @@ public class ImmutableDictionaryTypeConversionTest {
   @Test
   public void testDoubleDictionary()
       throws Exception {
-    try (DoubleDictionary doubleDictionary = new 
DoubleDictionary(PinotDataBuffer
-        .mapReadOnlyBigEndianFile(new File(TEMP_DIR, DOUBLE_COLUMN_NAME + 
V1Constants.Dict.FILE_EXTENSION)),
-        NUM_VALUES)) {
+    try (DoubleDictionary doubleDictionary = new 
DoubleDictionary(PinotDataBuffer.mapReadOnlyBigEndianFile(
+        new File(TEMP_DIR, DOUBLE_COLUMN_NAME + 
V1Constants.Dict.FILE_EXTENSION)), NUM_VALUES)) {
       testNumericDictionary(doubleDictionary);
     }
   }
@@ -238,9 +231,9 @@ public class ImmutableDictionaryTypeConversionTest {
   @Test
   public void testOnHeapDoubleDictionary()
       throws Exception {
-    try (OnHeapDoubleDictionary onHeapDoubleDictionary = new 
OnHeapDoubleDictionary(PinotDataBuffer
-        .mapReadOnlyBigEndianFile(new File(TEMP_DIR, DOUBLE_COLUMN_NAME + 
V1Constants.Dict.FILE_EXTENSION)),
-        NUM_VALUES)) {
+    try (OnHeapDoubleDictionary onHeapDoubleDictionary = new 
OnHeapDoubleDictionary(
+        PinotDataBuffer.mapReadOnlyBigEndianFile(
+            new File(TEMP_DIR, DOUBLE_COLUMN_NAME + 
V1Constants.Dict.FILE_EXTENSION)), NUM_VALUES)) {
       testNumericDictionary(onHeapDoubleDictionary);
     }
   }
@@ -248,13 +241,24 @@ public class ImmutableDictionaryTypeConversionTest {
   @Test
   public void testBigDecimalDictionary()
       throws Exception {
-    try (BigDecimalDictionary bigDecimalDictionary = new 
BigDecimalDictionary(PinotDataBuffer
-        .mapReadOnlyBigEndianFile(new File(TEMP_DIR, BIG_DECIMAL_COLUMN_NAME + 
V1Constants.Dict.FILE_EXTENSION)),
-        NUM_VALUES, _bigDecimalByteLength)) {
+    try (BigDecimalDictionary bigDecimalDictionary = new 
BigDecimalDictionary(PinotDataBuffer.mapReadOnlyBigEndianFile(
+        new File(TEMP_DIR, BIG_DECIMAL_COLUMN_NAME + 
V1Constants.Dict.FILE_EXTENSION)), NUM_VALUES,
+        _bigDecimalByteLength)) {
       testNumericDictionary(bigDecimalDictionary);
     }
   }
 
+  @Test
+  public void testOnHeapBigDecimalDictionary()
+      throws Exception {
+    try (OnHeapBigDecimalDictionary onHeapBigDecimalDictionary = new 
OnHeapBigDecimalDictionary(
+        PinotDataBuffer.mapReadOnlyBigEndianFile(
+            new File(TEMP_DIR, BIG_DECIMAL_COLUMN_NAME + 
V1Constants.Dict.FILE_EXTENSION)), NUM_VALUES,
+        _bigDecimalByteLength)) {
+      testNumericDictionary(onHeapBigDecimalDictionary);
+    }
+  }
+
   private void testNumericDictionary(BaseImmutableDictionary dictionary) {
     for (int i = 0; i < NUM_VALUES; i++) {
       Assert.assertEquals(((Number) dictionary.get(i)).intValue(), 
_intValues[i]);
@@ -283,7 +287,7 @@ public class ImmutableDictionaryTypeConversionTest {
 
     try {
       dictionary.getBytesValue(0);
-      if (dictionary.getClass() != BigDecimalDictionary.class) {
+      if (dictionary.getValueType() != DataType.BIG_DECIMAL) {
         Assert.fail();
       }
     } catch (UnsupportedOperationException e) {
@@ -291,7 +295,7 @@ public class ImmutableDictionaryTypeConversionTest {
     }
     try {
       dictionary.readBytesValues(_dictIds, NUM_VALUES, _bytesValuesBuffer);
-      if (dictionary.getClass() != BigDecimalDictionary.class) {
+      if (dictionary.getValueType() != DataType.BIG_DECIMAL) {
         Assert.fail();
       }
     } catch (UnsupportedOperationException e) {
@@ -302,9 +306,9 @@ public class ImmutableDictionaryTypeConversionTest {
   @Test
   public void testStringDictionary()
       throws Exception {
-    try (StringDictionary stringDictionary = new 
StringDictionary(PinotDataBuffer
-        .mapReadOnlyBigEndianFile(new File(TEMP_DIR, STRING_COLUMN_NAME + 
V1Constants.Dict.FILE_EXTENSION)), NUM_VALUES,
-        STRING_LENGTH, (byte) 0)) {
+    try (StringDictionary stringDictionary = new 
StringDictionary(PinotDataBuffer.mapReadOnlyBigEndianFile(
+        new File(TEMP_DIR, STRING_COLUMN_NAME + 
V1Constants.Dict.FILE_EXTENSION)), NUM_VALUES, STRING_LENGTH,
+        (byte) 0)) {
       testStringDictionary(stringDictionary);
     }
   }
@@ -312,9 +316,10 @@ public class ImmutableDictionaryTypeConversionTest {
   @Test
   public void testOnHeapStringDictionary()
       throws Exception {
-    try (OnHeapStringDictionary onHeapStringDictionary = new 
OnHeapStringDictionary(PinotDataBuffer
-        .mapReadOnlyBigEndianFile(new File(TEMP_DIR, STRING_COLUMN_NAME + 
V1Constants.Dict.FILE_EXTENSION)), NUM_VALUES,
-        STRING_LENGTH, (byte) 0)) {
+    try (OnHeapStringDictionary onHeapStringDictionary = new 
OnHeapStringDictionary(
+        PinotDataBuffer.mapReadOnlyBigEndianFile(
+            new File(TEMP_DIR, STRING_COLUMN_NAME + 
V1Constants.Dict.FILE_EXTENSION)), NUM_VALUES, STRING_LENGTH,
+        (byte) 0)) {
       testStringDictionary(onHeapStringDictionary);
     }
   }
@@ -348,13 +353,22 @@ public class ImmutableDictionaryTypeConversionTest {
   @Test
   public void testBytesDictionary()
       throws Exception {
-    try (BytesDictionary bytesDictionary = new BytesDictionary(PinotDataBuffer
-        .mapReadOnlyBigEndianFile(new File(TEMP_DIR, BYTES_COLUMN_NAME + 
V1Constants.Dict.FILE_EXTENSION)), NUM_VALUES,
-        BYTES_LENGTH)) {
+    try (BytesDictionary bytesDictionary = new 
BytesDictionary(PinotDataBuffer.mapReadOnlyBigEndianFile(
+        new File(TEMP_DIR, BYTES_COLUMN_NAME + 
V1Constants.Dict.FILE_EXTENSION)), NUM_VALUES, BYTES_LENGTH)) {
       testBytesDictionary(bytesDictionary);
     }
   }
 
+  @Test
+  public void testOnHeapBytesDictionary()
+      throws Exception {
+    try (OnHeapBytesDictionary onHeapBytesDictionary = new 
OnHeapBytesDictionary(
+        PinotDataBuffer.mapReadOnlyBigEndianFile(
+            new File(TEMP_DIR, BYTES_COLUMN_NAME + 
V1Constants.Dict.FILE_EXTENSION)), NUM_VALUES, BYTES_LENGTH)) {
+      testBytesDictionary(onHeapBytesDictionary);
+    }
+  }
+
   private void testBytesDictionary(BaseImmutableDictionary dictionary) {
     for (int i = 0; i < NUM_VALUES; i++) {
       assertEquals(dictionary.get(i), _bytesValues[i].getBytes());
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/Dictionary.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/Dictionary.java
index 5e96ad4ae7..8e9c22de23 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/Dictionary.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/Dictionary.java
@@ -56,35 +56,51 @@ public interface Dictionary extends Closeable {
   int indexOf(String stringValue);
 
   /**
-   * Returns the index of the value in the dictionary, or {@link 
#NULL_VALUE_INDEX} (-1) if
-   * the value does not exist.
+   * Returns the index of the value in the dictionary, or {@link 
#NULL_VALUE_INDEX} (-1) if the value does not exist.
+   * Must be implemented for INT dictionaries.
    */
   default int indexOf(int intValue) {
-    return indexOf(String.valueOf(intValue));
+    throw new UnsupportedOperationException();
   }
 
   /**
-   * Returns the index of the value in the dictionary, or {@link 
#NULL_VALUE_INDEX} (-1) if
-   * the value does not exist.
+   * Returns the index of the value in the dictionary, or {@link 
#NULL_VALUE_INDEX} (-1) if the value does not exist.
+   * Must be implemented for LONG dictionaries.
    */
   default int indexOf(long longValue) {
-    return indexOf(String.valueOf(longValue));
+    throw new UnsupportedOperationException();
   }
 
   /**
-   * Returns the index of the value in the dictionary, or {@link 
#NULL_VALUE_INDEX} (-1) if
-   * the value does not exist.
+   * Returns the index of the value in the dictionary, or {@link 
#NULL_VALUE_INDEX} (-1) if the value does not exist.
+   * Must be implemented for FLOAT dictionaries.
    */
   default int indexOf(float floatValue) {
-    return indexOf(String.valueOf(floatValue));
+    throw new UnsupportedOperationException();
   }
 
   /**
-   * Returns the index of the value in the dictionary, or {@link 
#NULL_VALUE_INDEX} (-1) if
-   * the value does not exist.
+   * Returns the index of the value in the dictionary, or {@link 
#NULL_VALUE_INDEX} (-1) if the value does not exist.
+   * Must be implemented for DOUBLE dictionaries.
    */
   default int indexOf(double doubleValue) {
-    return indexOf(String.valueOf(doubleValue));
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Returns the index of the value in the dictionary, or {@link 
#NULL_VALUE_INDEX} (-1) if the value does not exist.
+   * Must be implemented for BIG_DECIMAL dictionaries.
+   */
+  default int indexOf(BigDecimal bigDecimalValue) {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Returns the index of the value in the dictionary, or {@link 
#NULL_VALUE_INDEX} (-1) if the value does not exist.
+   * Must be implemented for BYTE_ARRAY dictionaries.
+   */
+  default int indexOf(ByteArray bytesValue) {
+    throw new UnsupportedOperationException();
   }
 
   /**
@@ -119,9 +135,10 @@ public interface Dictionary extends Closeable {
   Comparable getMaxVal();
 
   /**
-   * Returns an sorted array of all values in the dictionary. For type 
INT/LONG/FLOAT/DOUBLE, primitive type array will
-   * be returned; for type STRING, {@code String[]} will be returned; for type 
BYTES, {@code ByteArray[]} will be
-   * returned. This method is for the stats collection phase when sealing the 
consuming segment.
+   * Returns a sorted array of all values in the dictionary. For type 
INT/LONG/FLOAT/DOUBLE, primitive type array will
+   * be returned; for type BIG_DECIMAL, {@code BigDecimal[]} will be returned; 
for type STRING, {@code String[]} will be
+   * returned; for type BYTES, {@code ByteArray[]} will be returned.
+   * This method is for the stats collection phase when sealing the consuming 
segment.
    */
   Object getSortedValues();
 
@@ -135,6 +152,7 @@ public interface Dictionary extends Closeable {
    *   <li>LONG -> Long</li>
    *   <li>FLOAT -> Float</li>
    *   <li>DOUBLE -> Double</li>
+   *   <li>BIG_DECIMAL -> BigDecimal</li>
    *   <li>STRING -> String</li>
    *   <li>BYTES -> byte[]</li>
    * </ul>
@@ -149,6 +167,7 @@ public interface Dictionary extends Closeable {
    *   <li>LONG -> Long</li>
    *   <li>FLOAT -> Float</li>
    *   <li>DOUBLE -> Double</li>
+   *   <li>BIG_DECIMAL -> BigDecimal</li>
    *   <li>STRING -> String</li>
    *   <li>BYTES -> ByteArray</li>
    * </ul>
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/ByteArray.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/ByteArray.java
index cc031c7754..6eefcadeae 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/ByteArray.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/ByteArray.java
@@ -23,7 +23,6 @@ import java.lang.invoke.MethodHandle;
 import java.lang.invoke.MethodHandles;
 import java.lang.invoke.MethodType;
 import java.util.Arrays;
-import javax.annotation.Nonnull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,12 +30,12 @@ import org.slf4j.LoggerFactory;
 /**
  * Wrapper around byte[] that provides additional features such as:
  * <ul>
- *   <li> Implements comparable interface, so comparison and sorting can be 
performed. </li>
- *   <li> Implements equals() and hashCode(), so it can be used as key for 
HashMap/Set. </li>
+ *   <li>Implements comparable interface, so comparison and sorting can be 
performed</li>
+ *   <li>Implements equals() and hashCode(), so it can be used as key for 
HashMap/Set</li>
+ *   <li>Caches the hash code of the byte[]</li>
  * </ul>
  */
 public class ByteArray implements Comparable<ByteArray>, Serializable {
-
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ByteArray.class);
 
   private static final MethodHandle COMPARE_UNSIGNED;
@@ -54,6 +53,9 @@ public class ByteArray implements Comparable<ByteArray>, 
Serializable {
 
   private final byte[] _bytes;
 
+  // Hash for empty ByteArray is 1
+  private int _hash = 1;
+
   public ByteArray(byte[] bytes) {
     _bytes = bytes;
   }
@@ -91,27 +93,30 @@ public class ByteArray implements Comparable<ByteArray>, 
Serializable {
 
   @Override
   public int hashCode() {
-    int hash = 1;
-    int i = 0;
-    for (; i + 7 < _bytes.length; i += 8) {
-      hash = -1807454463 * hash
-          + 1742810335 * _bytes[i]
-          + 887503681 * _bytes[i + 1]
-          + 28629151 * _bytes[i + 2]
-          + 923521 * _bytes[i + 3]
-          + 29791 * _bytes[i + 4]
-          + 961 * _bytes[i + 5]
-          + 31 * _bytes[i + 6]
-          + _bytes[i + 7];
-    }
-    for (; i < _bytes.length; i++) {
-      hash = 31 * hash + _bytes[i];
+    int hash = _hash;
+    if (hash == 1 && _bytes.length > 0) {
+      int i = 0;
+      for (; i + 7 < _bytes.length; i += 8) {
+        hash = -1807454463 * hash
+            + 1742810335 * _bytes[i]
+            + 887503681 * _bytes[i + 1]
+            + 28629151 * _bytes[i + 2]
+            + 923521 * _bytes[i + 3]
+            + 29791 * _bytes[i + 4]
+            + 961 * _bytes[i + 5]
+            + 31 * _bytes[i + 6]
+            + _bytes[i + 7];
+      }
+      for (; i < _bytes.length; i++) {
+        hash = 31 * hash + _bytes[i];
+      }
+      _hash = hash;
     }
     return hash;
   }
 
   @Override
-  public int compareTo(@Nonnull ByteArray that) {
+  public int compareTo(ByteArray that) {
     if (this == that) {
       return 0;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to