This is an automated email from the ASF dual-hosted git repository.

kishoreg pushed a commit to branch range-index
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 7b2eacf65dfb06f411756a11f1e44b98a557e8ad
Author: kishoreg <g.kish...@gmail.com>
AuthorDate: Fri Apr 10 23:02:13 2020 -0700

    Initial code dump for supporting range queries efficiently
---
 .../pinot/core/common/BlockMultiValIterator.java   |   4 +-
 .../org/apache/pinot/core/common/DataSource.java   |   6 +
 .../indexsegment/mutable/MutableSegmentImpl.java   |   4 +-
 .../DictionaryBasedMultiValueIterator.java         | 108 ++++++
 ...ava => DictionaryBasedSingleValueIterator.java} |  43 ++-
 .../docvaliterators/MultiValueIterator.java        |  25 ++
 .../core/operator/filter/FilterOperatorUtils.java  |   4 +
 .../core/operator/filter/RangeFilterOperator.java  | 113 ++++++
 .../org/apache/pinot/core/plan/FilterPlanNode.java |   1 -
 .../core/segment/creator/impl/V1Constants.java     |   1 +
 .../impl/inv/DictionaryBasedRangeIndexCreator.java | 416 +++++++++++++++++++++
 .../segment/index/column/ColumnIndexContainer.java |   5 +
 .../index/column/PhysicalColumnIndexContainer.java |  20 +-
 .../segment/index/datasource/BaseDataSource.java   |  11 +-
 .../index/datasource/ImmutableDataSource.java      |   2 +-
 .../index/datasource/MutableDataSource.java        |   4 +-
 .../segment/index/loader/IndexLoadingConfig.java   |  18 +
 .../segment/index/loader/SegmentPreProcessor.java  |   6 +
 .../loader/invertedindex/RangeIndexHandler.java    | 168 +++++++++
 .../segment/index/metadata/SegmentMetadata.java    |   2 +
 .../index/metadata/SegmentMetadataImpl.java        |   5 +
 .../index/readers/BitmapRangeIndexReader.java      | 134 +++++++
 .../core/segment/store/ColumnIndexDirectory.java   |  19 +
 .../pinot/core/segment/store/ColumnIndexType.java  |   3 +-
 .../core/segment/store/FilePerIndexDirectory.java  |  17 +
 .../segment/store/SegmentLocalFSDirectory.java     |   5 +
 .../segment/store/SingleFileIndexDirectory.java    |  12 +
 .../virtualcolumn/VirtualColumnIndexContainer.java |   5 +
 .../core/startree/v2/store/StarTreeDataSource.java |   3 +-
 .../core/common/RealtimeNoDictionaryTest.java      |  12 +-
 .../apache/pinot/spi/config/IndexingConfig.java    |   9 +
 .../tools/scan/query/RangePredicateFilter.java     |   8 +
 .../baseballStats_offline_table_config.json        |   3 +
 33 files changed, 1172 insertions(+), 24 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/BlockMultiValIterator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/common/BlockMultiValIterator.java
index 69d5da5..68c0fc5 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/common/BlockMultiValIterator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/common/BlockMultiValIterator.java
@@ -24,7 +24,7 @@ package org.apache.pinot.core.common;
  */
 public abstract class BlockMultiValIterator implements BlockValIterator {
 
-  public int nextCharVal(int[] charArray) {
+  public int nextCharVal(char[] charArray) {
     throw new UnsupportedOperationException();
   }
 
@@ -44,7 +44,7 @@ public abstract class BlockMultiValIterator implements 
BlockValIterator {
     throw new UnsupportedOperationException();
   }
 
-  public byte[][] nextBytesArrayVal(byte[][] bytesArrays) {
+  public int nextBytesArrayVal(byte[][] bytesArrays) {
     throw new UnsupportedOperationException();
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/DataSource.java 
b/pinot-core/src/main/java/org/apache/pinot/core/common/DataSource.java
index 8f33931..492c344 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/DataSource.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/DataSource.java
@@ -55,6 +55,12 @@ public abstract class DataSource extends BaseOperator {
   public abstract InvertedIndexReader getInvertedIndex();
 
   /**
+   * Returns the inverted index for the column if exists, or {@code null} if 
not.
+   */
+  @Nullable
+  public abstract InvertedIndexReader getRangeIndex();
+
+  /**
    * Returns the bloom filter for the column if exists, or {@code null} if not.
    */
   @Nullable
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
index ba42e8b..53aee51 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
@@ -112,6 +112,7 @@ public class MutableSegmentImpl implements MutableSegment {
   private final Map<String, BaseMutableDictionary> _dictionaryMap = new 
HashMap<>();
   private final Map<String, DataFileReader> _indexReaderWriterMap = new 
HashMap<>();
   private final Map<String, InvertedIndexReader> _invertedIndexMap = new 
HashMap<>();
+  private final Map<String, InvertedIndexReader> _rangeIndexMap = new 
HashMap<>();
   private final Map<String, BloomFilterReader> _bloomFilterMap = new 
HashMap<>();
   private final Map<String, RealtimeNullValueVectorReaderWriter> 
_nullValueVectorMap = new HashMap<>();
   private final IdMap<FixedIntArray> _recordIdMap;
@@ -607,11 +608,12 @@ public class MutableSegmentImpl implements MutableSegment 
{
       DataFileReader forwardIndex = _indexReaderWriterMap.get(column);
       BaseMutableDictionary dictionary = _dictionaryMap.get(column);
       InvertedIndexReader invertedIndex = _invertedIndexMap.get(column);
+      InvertedIndexReader rangeIndex = _rangeIndexMap.get(column);
       BloomFilterReader bloomFilter = _bloomFilterMap.get(column);
       RealtimeNullValueVectorReaderWriter nullValueVector = 
_nullValueVectorMap.get(column);
       return new MutableDataSource(fieldSpec, _numDocsIndexed, 
numValuesInfo.getNumValues(),
           numValuesInfo.getMaxNumValuesPerMVEntry(), partitionFunction, 
partitionId, forwardIndex, dictionary,
-          invertedIndex, bloomFilter, nullValueVector);
+          invertedIndex, rangeIndex, bloomFilter, nullValueVector);
     }
   }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/DictionaryBasedMultiValueIterator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/DictionaryBasedMultiValueIterator.java
new file mode 100644
index 0000000..9f3dc2a
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/DictionaryBasedMultiValueIterator.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.docvaliterators;
+
+import org.apache.pinot.core.common.BlockMultiValIterator;
+import org.apache.pinot.core.common.BlockSingleValIterator;
+import org.apache.pinot.core.io.reader.ReaderContext;
+import org.apache.pinot.core.io.reader.SingleColumnMultiValueReader;
+import org.apache.pinot.core.io.reader.SingleColumnSingleValueReader;
+import org.apache.pinot.core.segment.index.readers.Dictionary;
+
+
+@SuppressWarnings("unchecked")
+public final class DictionaryBasedMultiValueIterator extends 
BlockMultiValIterator {
+
+  private final SingleColumnMultiValueReader _reader;
+  private final int _numDocs;
+  private final ReaderContext _context;
+  private final Dictionary _dictionary;
+  private final int[] _dictIds;
+
+  private int _nextDocId;
+
+  public DictionaryBasedMultiValueIterator(SingleColumnMultiValueReader 
reader, Dictionary dictionary, int numDocs,
+      int maxLength) {
+    _reader = reader;
+    _numDocs = numDocs;
+    _context = _reader.createContext();
+    _dictionary = dictionary;
+    _dictIds = new int[maxLength];
+  }
+
+  @Override
+  public int nextIntVal(int[] intArray) {
+    int length = _reader.getIntArray(_nextDocId++, _dictIds, _context);
+    for (int i = 0; i < length; i++) {
+      intArray[i] = _dictionary.getIntValue(_dictIds[i]);
+    }
+    return length;
+  }
+
+  @Override
+  public int nextDoubleVal(double[] doubleArray) {
+    int length = _reader.getIntArray(_nextDocId++, _dictIds, _context);
+    for (int i = 0; i < length; i++) {
+      doubleArray[i] = _dictionary.getDoubleValue(_dictIds[i]);
+    }
+    return length;
+  }
+
+  @Override
+  public int nextFloatVal(float[] floatArray) {
+    int length = _reader.getIntArray(_nextDocId++, _dictIds, _context);
+    for (int i = 0; i < length; i++) {
+      floatArray[i] = _dictionary.getFloatValue(_dictIds[i]);
+    }
+    return length;
+  }
+
+  @Override
+  public int nextLongVal(long[] longArray) {
+    int length = _reader.getIntArray(_nextDocId++, _dictIds, _context);
+    for (int i = 0; i < length; i++) {
+      longArray[i] = _dictionary.getLongValue(_dictIds[i]);
+    }
+    return length;
+  }
+
+  @Override
+  public int nextBytesArrayVal(byte[][] bytesArrays) {
+    int length = _reader.getIntArray(_nextDocId++, _dictIds, _context);
+    for (int i = 0; i < length; i++) {
+      bytesArrays[i] = _dictionary.getBytesValue(_dictIds[i]);
+    }
+    return length;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return _nextDocId < _numDocs;
+  }
+
+  @Override
+  public void skipTo(int docId) {
+    _nextDocId = docId;
+  }
+
+  @Override
+  public void reset() {
+    _nextDocId = 0;
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/MultiValueIterator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/DictionaryBasedSingleValueIterator.java
similarity index 52%
copy from 
pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/MultiValueIterator.java
copy to 
pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/DictionaryBasedSingleValueIterator.java
index 85ae7c7..34855e0 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/MultiValueIterator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/DictionaryBasedSingleValueIterator.java
@@ -18,28 +18,57 @@
  */
 package org.apache.pinot.core.operator.docvaliterators;
 
-import org.apache.pinot.core.common.BlockMultiValIterator;
+import org.apache.pinot.core.common.BlockSingleValIterator;
 import org.apache.pinot.core.io.reader.ReaderContext;
-import org.apache.pinot.core.io.reader.SingleColumnMultiValueReader;
+import org.apache.pinot.core.io.reader.SingleColumnSingleValueReader;
+import org.apache.pinot.core.segment.index.readers.Dictionary;
 
 
 @SuppressWarnings("unchecked")
-public final class MultiValueIterator extends BlockMultiValIterator {
-  private final SingleColumnMultiValueReader _reader;
+public final class DictionaryBasedSingleValueIterator extends 
BlockSingleValIterator {
+
+  private final SingleColumnSingleValueReader _reader;
   private final int _numDocs;
   private final ReaderContext _context;
+  private final Dictionary _dictionary;
 
   private int _nextDocId;
 
-  public MultiValueIterator(SingleColumnMultiValueReader reader, int numDocs) {
+  public DictionaryBasedSingleValueIterator(SingleColumnSingleValueReader 
reader, Dictionary dictionary, int numDocs) {
     _reader = reader;
     _numDocs = numDocs;
     _context = _reader.createContext();
+    _dictionary = dictionary;
+  }
+
+  @Override
+  public int nextIntVal() {
+    return _dictionary.getIntValue(_reader.getInt(_nextDocId++, _context));
+  }
+
+  @Override
+  public long nextLongVal() {
+    return _dictionary.getLongValue(_reader.getInt(_nextDocId++, _context));
+  }
+
+  @Override
+  public float nextFloatVal() {
+    return _dictionary.getFloatValue(_reader.getInt(_nextDocId++, _context));
+  }
+
+  @Override
+  public double nextDoubleVal() {
+    return _dictionary.getDoubleValue(_reader.getInt(_nextDocId++, _context));
+  }
+
+  @Override
+  public String nextStringVal() {
+    return _dictionary.getStringValue(_reader.getInt(_nextDocId++, _context));
   }
 
   @Override
-  public int nextIntVal(int[] intArray) {
-    return _reader.getIntArray(_nextDocId++, intArray, _context);
+  public byte[] nextBytesVal() {
+    return _dictionary.getBytesValue(_reader.getInt(_nextDocId++, _context));
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/MultiValueIterator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/MultiValueIterator.java
index 85ae7c7..4529c84 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/MultiValueIterator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/MultiValueIterator.java
@@ -43,6 +43,31 @@ public final class MultiValueIterator extends 
BlockMultiValIterator {
   }
 
   @Override
+  public int nextCharVal(char[] charArray) {
+    return _reader.getCharArray(_nextDocId++, charArray);
+  }
+
+  @Override
+  public int nextDoubleVal(double[] doubleArray) {
+    return _reader.getDoubleArray(_nextDocId++, doubleArray);
+  }
+
+  @Override
+  public int nextFloatVal(float[] floatArray) {
+    return _reader.getFloatArray(_nextDocId++, floatArray);
+  }
+
+  @Override
+  public int nextLongVal(long[] longArray) {
+    return _reader.getLongArray(_nextDocId++, longArray);
+  }
+
+  @Override
+  public int nextBytesArrayVal(byte[][] bytesArrays) {
+     return _reader.getBytesArray(_nextDocId++, bytesArrays);
+  }
+
+  @Override
   public boolean hasNext() {
     return _nextDocId < _numDocs;
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java
index cb5bbb3..1de0b1b 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java
@@ -54,6 +54,10 @@ public class FilterOperatorUtils {
 
     Predicate.Type predicateType = predicateEvaluator.getPredicateType();
 
+    if (predicateType == Predicate.Type.RANGE && dataSource.getRangeIndex() != 
null) {
+      return new RangeFilterOperator(predicateEvaluator, dataSource, 
startDocId, endDocId);
+    }
+
     if (predicateType == Predicate.Type.TEXT_MATCH) {
       return new TextMatchFilterOperator(predicateEvaluator, dataSource, 
startDocId, endDocId);
     }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/RangeFilterOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/RangeFilterOperator.java
new file mode 100644
index 0000000..6211fa9
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/RangeFilterOperator.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.filter;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.core.common.BlockDocIdIterator;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.operator.blocks.FilterBlock;
+import org.apache.pinot.core.operator.dociditerators.MVScanDocIdIterator;
+import org.apache.pinot.core.operator.dociditerators.SVScanDocIdIterator;
+import org.apache.pinot.core.operator.dociditerators.ScanBasedDocIdIterator;
+import org.apache.pinot.core.operator.docidsets.BitmapDocIdSet;
+import org.apache.pinot.core.operator.docidsets.FilterBlockDocIdSet;
+import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
+import 
org.apache.pinot.core.operator.filter.predicate.RangePredicateEvaluatorFactory;
+import org.apache.pinot.core.segment.index.readers.BitmapRangeIndexReader;
+import org.apache.pinot.core.segment.index.readers.InvertedIndexReader;
+import org.roaringbitmap.RoaringBitmap;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+
+
+public class RangeFilterOperator extends BaseFilterOperator {
+  private static final String OPERATOR_NAME = "RangeFilterOperator";
+
+  private final PredicateEvaluator _predicateEvaluator;
+  private final DataSource _dataSource;
+  private final int _startDocId;
+  // TODO: change it to exclusive
+  // Inclusive
+  private final int _endDocId;
+  private final boolean _exclusive;
+
+  public RangeFilterOperator(PredicateEvaluator predicateEvaluator, DataSource 
dataSource, int startDocId,
+      int endDocId) {
+    // NOTE:
+    // Predicate that is always evaluated as true or false should not be 
passed into the BitmapBasedFilterOperator for
+    // performance concern.
+    // If predicate is always evaluated as true, use MatchAllFilterOperator; 
if predicate is always evaluated as false,
+    // use EmptyFilterOperator.
+    Preconditions.checkArgument(!predicateEvaluator.isAlwaysTrue() && 
!predicateEvaluator.isAlwaysFalse());
+
+    _predicateEvaluator = predicateEvaluator;
+    _dataSource = dataSource;
+    _startDocId = startDocId;
+    _endDocId = endDocId;
+    _exclusive = predicateEvaluator.isExclusive();
+  }
+
+  @Override
+  protected FilterBlock getNextBlock() {
+
+    
RangePredicateEvaluatorFactory.OfflineDictionaryBasedRangePredicateEvaluator 
evaluator =
+        
(RangePredicateEvaluatorFactory.OfflineDictionaryBasedRangePredicateEvaluator) 
_predicateEvaluator;
+    BitmapRangeIndexReader rangeIndexReader = (BitmapRangeIndexReader) 
_dataSource.getRangeIndex();
+    int startRangeId = 
rangeIndexReader.findRangeId(evaluator.getStartDictId());
+    int endRangeId = rangeIndexReader.findRangeId(evaluator.getEndDictId());
+    //Handle Matching Ranges - some ranges match fully but some partially
+    //below code assumes first and last range always match partially which may 
not be the case always //todo: optimize it
+    MutableRoaringBitmap mutableRoaringBitmap = new MutableRoaringBitmap();
+    mutableRoaringBitmap.or(rangeIndexReader.getDocIds(startRangeId));
+    mutableRoaringBitmap.or(rangeIndexReader.getDocIds(endRangeId));
+    final ScanBasedFilterOperator scanBasedFilterOperator =
+        new ScanBasedFilterOperator(_predicateEvaluator, _dataSource, 
_startDocId, _endDocId);
+    FilterBlockDocIdSet scanBlockDocIdSet = 
scanBasedFilterOperator.getNextBlock().getBlockDocIdSet();
+    BlockDocIdIterator iterator = scanBlockDocIdSet.iterator();
+
+    int id = 0;
+    List<ImmutableRoaringBitmap> bitmapList = new ArrayList<>();
+    if (_dataSource.getDataSourceMetadata().isSingleValue()) {
+      bitmapList.add(((SVScanDocIdIterator) 
iterator).applyAnd(mutableRoaringBitmap));
+    } else {
+      bitmapList.add(((MVScanDocIdIterator) 
iterator).applyAnd(mutableRoaringBitmap));
+    }
+
+    //All the intermediate
+    for (int rangeId = startRangeId + 1; rangeId < endRangeId; rangeId++) {
+      bitmapList.add(rangeIndexReader.getDocIds(rangeId));
+    }
+    ImmutableRoaringBitmap[] bitmaps = new 
ImmutableRoaringBitmap[bitmapList.size()];
+    bitmapList.toArray(bitmaps);
+    return new FilterBlock(new BitmapDocIdSet(bitmaps, _startDocId, _endDocId, 
_exclusive) {
+
+      @Override
+      public long getNumEntriesScannedInFilter() {
+        return scanBlockDocIdSet.getNumEntriesScannedInFilter();
+      }
+    });
+  }
+
+  @Override
+  public String getOperatorName() {
+    return OPERATOR_NAME;
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
index 0ddee87..2b1b29f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
@@ -117,7 +117,6 @@ public class FilterPlanNode implements PlanNode {
 
       TransformExpressionTree expression = filterQueryTree.getExpression();
       if (expression.getExpressionType() == 
TransformExpressionTree.ExpressionType.FUNCTION) {
-
         return new ExpressionFilterOperator(segment, expression, predicate);
       } else {
         DataSource dataSource = 
segment.getDataSource(filterQueryTree.getColumn());
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
index c3316cb..3dc60aa 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
@@ -36,6 +36,7 @@ public class V1Constants {
     public static final String RAW_SV_FORWARD_INDEX_FILE_EXTENSION = 
".sv.raw.fwd";
     public static final String UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION = 
".mv.fwd";
     public static final String BITMAP_INVERTED_INDEX_FILE_EXTENSION = 
".bitmap.inv";
+    public static final String BITMAP_RANGE_INDEX_FILE_EXTENSION = 
".bitmap.range";
     public static final String BLOOM_FILTER_FILE_EXTENSION = ".bloom";
     public static final String NULLVALUE_VECTOR_FILE_EXTENSION = 
".bitmap.nullvalue";
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/DictionaryBasedRangeIndexCreator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/DictionaryBasedRangeIndexCreator.java
new file mode 100644
index 0000000..7ad2b21
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/DictionaryBasedRangeIndexCreator.java
@@ -0,0 +1,416 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.creator.impl.inv;
+
+import com.google.common.base.Preconditions;
+import java.io.BufferedOutputStream;
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Random;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.segment.creator.InvertedIndexCreator;
+import org.apache.pinot.core.segment.creator.impl.V1Constants;
+import org.apache.pinot.core.segment.index.readers.BaseImmutableDictionary;
+import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.MetricFieldSpec;
+import org.roaringbitmap.RoaringBitmap;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+
+import static 
org.apache.pinot.core.segment.creator.impl.V1Constants.Indexes.BITMAP_RANGE_INDEX_FILE_EXTENSION;
+
+
+/**
+ * Implementation of {@link InvertedIndexCreator} that uses off-heap memory.
+ * <p>We use 2 passes to create the inverted index.
+ * <ul>
+ *
+ *   <li>
+ *     In the constructor compute the number of ranges needed. We can decide 
this based on the following
+ *     - number of buckets based on dictionary (use the user provided config 
for number for buckets or number of dictIds per bucket or pick something a 
default)
+ *   </li>
+ *   <li>
+ *     In the first pass (adding values phase), when add() method is called, 
store the dictIds into the forward index
+ *     value buffer (for multi-valued column also store number of values for 
each docId into forward index length
+ *     buffer). We also compute the inverted index length for each dictId 
while adding values.
+ *   </li>
+ *   <li>
+ *     In the second pass (processing values phase), when seal() method is 
called, all the dictIds should already been
+ *     added. We first reorder the values into the inverted index buffers by 
going over the dictIds in forward index
+ *     value buffer (for multi-valued column we also need forward index length 
buffer to get the docId for each dictId).
+ *     <p>Once we have the inverted index buffers, we simply go over them and 
create the bitmap for each dictId and
+ *     serialize them into a file.
+ *   </li>
+ * </ul>
+ * <p>Based on the number of values we need to store, we use direct memory or 
MMap file to allocate the buffer.
+ */
+public final class DictionaryBasedRangeIndexCreator implements 
InvertedIndexCreator {
+  // Use MMapBuffer if the value buffer size is larger than 2G
+  private static final int NUM_VALUES_THRESHOLD_FOR_MMAP_BUFFER = 500_000_000;
+  private static final int DEFAULT_NUM_RANGES = 20;
+
+  private static final String FORWARD_INDEX_VALUE_BUFFER_SUFFIX = 
".fwd.idx.val.buf";
+  private static final String FORWARD_INDEX_LENGTH_BUFFER_SUFFIX = 
".fwd.idx.len.buf";
+  private static final String INVERTED_INDEX_VALUE_BUFFER_SUFFIX = 
".inv.idx.val.buf";
+  private static final String INVERTED_INDEX_LENGTH_BUFFER_SUFFIX = 
".inv.idx.len.buf";
+
+  private final File _invertedIndexFile;
+  private final File _forwardIndexValueBufferFile;
+  private final File _forwardIndexLengthBufferFile;
+  private final File _invertedIndexValueBufferFile;
+  private final File _invertedIndexLengthBufferFile;
+  private final boolean _singleValue;
+  private final int _cardinality;
+  private final int _numDocs;
+  private final int _numValues;
+  private final boolean _useMMapBuffer;
+
+  private final int[] _values;
+
+  // Forward index buffers (from docId to dictId)
+  private int _nextDocId;
+  private PinotDataBuffer _forwardIndexValueBuffer;
+  // For multi-valued column only because each docId can have multiple dictIds
+  private int _nextValueId;
+  private PinotDataBuffer _forwardIndexLengthBuffer;
+
+  // Inverted index buffers (from dictId to docId)
+  private PinotDataBuffer _invertedIndexValueBuffer;
+  private PinotDataBuffer _invertedIndexLengthBuffer;
+  private int _numRanges;
+  private int _numDocsPerRange;
+
+  public DictionaryBasedRangeIndexCreator(File indexDir, FieldSpec fieldSpec, 
int cardinality, int numDocs,
+      int numValues)
+      throws IOException {
+    String columnName = fieldSpec.getName();
+    _invertedIndexFile = new File(indexDir, columnName + 
BITMAP_RANGE_INDEX_FILE_EXTENSION);
+    _forwardIndexValueBufferFile = new File(indexDir, columnName + 
FORWARD_INDEX_VALUE_BUFFER_SUFFIX);
+    _forwardIndexLengthBufferFile = new File(indexDir, columnName + 
FORWARD_INDEX_LENGTH_BUFFER_SUFFIX);
+    _invertedIndexValueBufferFile = new File(indexDir, columnName + 
INVERTED_INDEX_VALUE_BUFFER_SUFFIX);
+    _invertedIndexLengthBufferFile = new File(indexDir, columnName + 
INVERTED_INDEX_LENGTH_BUFFER_SUFFIX);
+    _singleValue = fieldSpec.isSingleValueField();
+    _cardinality = cardinality;
+    _numDocs = numDocs;
+    _numValues = _singleValue ? numDocs : numValues;
+    _useMMapBuffer = _numValues > NUM_VALUES_THRESHOLD_FOR_MMAP_BUFFER;
+
+    try {
+      _values = new int[_numValues];
+      _numRanges = DEFAULT_NUM_RANGES;
+      _numDocsPerRange = (int) Math.ceil(_numValues / _numRanges);
+      _forwardIndexValueBuffer = createTempBuffer((long) _numValues * 
Integer.BYTES, _forwardIndexValueBufferFile);
+      if (!_singleValue) {
+        _forwardIndexLengthBuffer = createTempBuffer((long) _numDocs * 
Integer.BYTES, _forwardIndexLengthBufferFile);
+      }
+
+      // We need to clear the inverted index length buffer because we rely on 
the initial value of 0, and keep updating
+      // the value instead of directly setting the value
+      _invertedIndexLengthBuffer =
+          createTempBuffer((long) _cardinality * Integer.BYTES, 
_invertedIndexLengthBufferFile);
+      for (int i = 0; i < _cardinality; i++) {
+        _invertedIndexLengthBuffer.putInt((long) i * Integer.BYTES, 0);
+      }
+    } catch (Exception e) {
+      destroyBuffer(_forwardIndexValueBuffer, _forwardIndexValueBufferFile);
+      destroyBuffer(_forwardIndexLengthBuffer, _forwardIndexLengthBufferFile);
+      destroyBuffer(_invertedIndexLengthBuffer, 
_invertedIndexLengthBufferFile);
+      throw e;
+    }
+  }
+
+  @Override
+  public void add(int dictId) {
+    putInt(_forwardIndexValueBuffer, _nextDocId, dictId);
+    putInt(_invertedIndexLengthBuffer, dictId, 
getInt(_invertedIndexLengthBuffer, dictId) + 1);
+    _values[_nextDocId] = dictId;
+    _nextDocId = _nextDocId + 1;
+  }
+
+  @Override
+  public void add(int[] dictIds, int length) {
+    for (int i = 0; i < length; i++) {
+      int dictId = dictIds[i];
+      putInt(_forwardIndexValueBuffer, _nextValueId, dictId);
+      putInt(_invertedIndexLengthBuffer, dictId, 
getInt(_invertedIndexLengthBuffer, dictId) + 1);
+      _values[_nextValueId] = dictId;
+      _nextValueId = _nextValueId + 1;
+    }
+    putInt(_forwardIndexLengthBuffer, _nextDocId++, length);
+  }
+
+  @Override
+  public void addDoc(Object document, int docIdCounter) {
+    throw new IllegalStateException("Bitmap inverted index creator does not 
support Object type currently");
+  }
+
+  @Override
+  public void seal()
+      throws IOException {
+    //copy forward index, sort the forward index, create a copy
+    //go over the sorted value to compute ranges
+    Integer[] positions = new Integer[_values.length];
+    for (int i = 0; i < _values.length; i++) {
+      positions[i] = i;
+    }
+    Arrays.sort(positions, Comparator.comparingInt(pos -> _values[pos]));
+
+//    List<Integer> ranges = new ArrayList<>();
+//    for (int i = 0; i < _values.length; i++) {
+//      if (i % _numDocsPerRange == 0) {
+//        ranges.add(_values[i]);
+//      }
+//    }
+
+    List<Integer> rangeStartList = new ArrayList<>();
+    int numDocsInCurrentRange = 0;
+    // Calculate value index for each dictId in the inverted index value buffer
+    // Re-use inverted index length buffer to store the value index for each 
dictId, where value index is the index in
+    // the inverted index value buffer where we should put next docId for the 
dictId
+    int invertedValueIndex = 0;
+    rangeStartList.add(0);
+    int prevEndDict = 0;
+    for (int dictId = 0; dictId < _cardinality; dictId++) {
+      int length = getInt(_invertedIndexLengthBuffer, dictId);
+      putInt(_invertedIndexLengthBuffer, dictId, invertedValueIndex);
+      invertedValueIndex += length;
+      if (prevEndDict == dictId || (numDocsInCurrentRange + length <= 
_numDocsPerRange)) {
+        numDocsInCurrentRange += length;
+      } else {
+        rangeStartList.add(dictId);
+        prevEndDict = dictId;
+        numDocsInCurrentRange = length;
+      }
+    }
+
+    // Put values into inverted index value buffer
+    _invertedIndexValueBuffer = createTempBuffer((long) _numValues * 
Integer.BYTES, _invertedIndexValueBufferFile);
+    if (_singleValue) {
+      for (int docId = 0; docId < _numDocs; docId++) {
+        int dictId = getInt(_forwardIndexValueBuffer, docId);
+        int index = getInt(_invertedIndexLengthBuffer, dictId);
+        putInt(_invertedIndexValueBuffer, index, docId);
+        putInt(_invertedIndexLengthBuffer, dictId, index + 1);
+      }
+
+      // Destroy buffer no longer needed
+      destroyBuffer(_forwardIndexValueBuffer, _forwardIndexValueBufferFile);
+      _forwardIndexValueBuffer = null;
+    } else {
+      int valueId = 0;
+      for (int docId = 0; docId < _numDocs; docId++) {
+        int length = getInt(_forwardIndexLengthBuffer, docId);
+        for (int i = 0; i < length; i++) {
+          int dictId = getInt(_forwardIndexValueBuffer, valueId++);
+          int index = getInt(_invertedIndexLengthBuffer, dictId);
+          putInt(_invertedIndexValueBuffer, index, docId);
+          putInt(_invertedIndexLengthBuffer, dictId, index + 1);
+        }
+      }
+
+      // Destroy buffers no longer needed
+      destroyBuffer(_forwardIndexValueBuffer, _forwardIndexValueBufferFile);
+      _forwardIndexValueBuffer = null;
+      destroyBuffer(_forwardIndexLengthBuffer, _forwardIndexLengthBufferFile);
+      _forwardIndexLengthBuffer = null;
+    }
+
+    // Create bitmaps from inverted index buffers and serialize them to file
+    //HEADER
+    //# OF RANGES
+    //   Range Start 0
+    //    .........
+    //   Range Start R - 1
+    //   Bitmap for Range 0 Start Offset
+    //      .....
+    //   Bitmap for Range R Start Offset
+    //BODY
+    //   Bitmap for range 0
+    //   Bitmap for range 2
+    //    ......
+    //   Bitmap for range R - 1
+    long length = 0;
+    try (BufferedOutputStream bos = new BufferedOutputStream(new 
FileOutputStream(_invertedIndexFile));
+        DataOutputStream header = new DataOutputStream(bos);
+        FileOutputStream fos = new FileOutputStream(_invertedIndexFile);
+        DataOutputStream dataOutputStream = new DataOutputStream(new 
BufferedOutputStream(fos))) {
+
+      //Write the Range values
+      header.writeInt(rangeStartList.size());
+//      System.out.println("rangeStartList = " + rangeStartList);
+      for (int rangeStart : rangeStartList) {
+        header.writeInt(rangeStart);
+      }
+      //compute the offset where the bitmap for the first range would be 
written
+      int bitmapOffset =
+          Integer.BYTES + (rangeStartList.size()) * Integer.BYTES + 
(rangeStartList.size() + 1) * Integer.BYTES;
+      length += bitmapOffset;
+      header.writeInt(bitmapOffset);
+//      System.out.println("bitmapOffset = " + bitmapOffset);
+      //set the starting position where the actual bitmaps will be written
+      fos.getChannel().position(bitmapOffset);
+
+      int startIndex = 0;
+      int[] bitmapOffsets = new int[_numRanges];
+      for (int i = 0; i < rangeStartList.size(); i++) {
+        int rangeStartDictId = rangeStartList.get(i);
+        int rangeEndDictId = (i == rangeStartList.size() - 1) ? _cardinality : 
rangeStartList.get(i + 1);
+        MutableRoaringBitmap bitmap = new MutableRoaringBitmap();
+        for (int dictId = rangeStartDictId; dictId < rangeEndDictId; dictId++) 
{
+          int endIndex = getInt(_invertedIndexLengthBuffer, dictId);
+          for (int index = startIndex; index < endIndex; index++) {
+            bitmap.add(getInt(_invertedIndexValueBuffer, index));
+          }
+          startIndex = endIndex;
+        }
+        // Write offset and bitmap into file
+        int sizeInBytes = bitmap.serializedSizeInBytes();
+//        System.out.println("sizeInBytes = " + sizeInBytes);
+        bitmapOffset += sizeInBytes;
+        length += sizeInBytes;
+        // Check for int overflow
+        Preconditions.checkState(bitmapOffset > 0, "Inverted index file: %s 
exceeds 2GB limit", _invertedIndexFile);
+
+        header.writeInt(bitmapOffset);
+        byte[] bytes = new byte[sizeInBytes];
+        bitmap.serialize(ByteBuffer.wrap(bytes));
+//        System.out.println("Arrays.toString(bytes) = " + 
Arrays.toString(bytes));
+        dataOutputStream.write(bytes);
+      }
+//      System.out.println("bitmapOffsets = " + 
Arrays.toString(bitmapOffsets));
+//      System.out.println("length = " + length);
+    } catch (Exception e) {
+      FileUtils.deleteQuietly(_invertedIndexFile);
+      throw e;
+    }
+//    System.out.println("_invertedIndexFile = " + 
_invertedIndexFile.length());
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    org.apache.pinot.common.utils.FileUtils
+        .close(new DataBufferAndFile(_forwardIndexValueBuffer, 
_forwardIndexValueBufferFile),
+            new DataBufferAndFile(_forwardIndexLengthBuffer, 
_forwardIndexLengthBufferFile),
+            new DataBufferAndFile(_invertedIndexValueBuffer, 
_invertedIndexValueBufferFile),
+            new DataBufferAndFile(_invertedIndexLengthBuffer, 
_invertedIndexLengthBufferFile));
+  }
+
+  private class DataBufferAndFile implements Closeable {
+    private final PinotDataBuffer _dataBuffer;
+    private final File _file;
+
+    DataBufferAndFile(final PinotDataBuffer buffer, final File file) {
+      _dataBuffer = buffer;
+      _file = file;
+    }
+
+    @Override
+    public void close()
+        throws IOException {
+      destroyBuffer(_dataBuffer, _file);
+    }
+  }
+
+  private static void putInt(PinotDataBuffer buffer, long index, int value) {
+    buffer.putInt(index << 2, value);
+  }
+
+  private static int getInt(PinotDataBuffer buffer, long index) {
+    return buffer.getInt(index << 2);
+  }
+
+  private PinotDataBuffer createTempBuffer(long size, File mmapFile)
+      throws IOException {
+    if (_useMMapBuffer) {
+      return PinotDataBuffer.mapFile(mmapFile, false, 0, size, 
PinotDataBuffer.NATIVE_ORDER,
+          "OffHeapBitmapInvertedIndexCreator: temp buffer");
+    } else {
+      return PinotDataBuffer.allocateDirect(size, PinotDataBuffer.NATIVE_ORDER,
+          "OffHeapBitmapInvertedIndexCreator: temp buffer for " + 
mmapFile.getName());
+    }
+  }
+
+  private void destroyBuffer(PinotDataBuffer buffer, File mmapFile)
+      throws IOException {
+    if (buffer != null) {
+      buffer.close();
+      if (mmapFile.exists()) {
+        FileUtils.forceDelete(mmapFile);
+      }
+    }
+  }
+
+  public static void main(String[] args)
+      throws IOException {
+    File indexDir = new File("/tmp/testRangeIndex");
+    indexDir.mkdirs();
+    FieldSpec fieldSpec = new MetricFieldSpec();
+    fieldSpec.setDataType(FieldSpec.DataType.INT);
+    String columnName = "latency";
+    fieldSpec.setName(columnName);
+    int cardinality = 1000;
+    int numDocs = 10000;
+    int numValues = 10000;
+    DictionaryBasedRangeIndexCreator creator =
+        new DictionaryBasedRangeIndexCreator(indexDir, fieldSpec, cardinality, 
numDocs, numValues);
+    Random r = new Random();
+    for (int i = 0; i < numDocs; i++) {
+      creator.add(r.nextInt(cardinality));
+    }
+    creator.seal();
+
+    File file = new File(indexDir, columnName + 
BITMAP_RANGE_INDEX_FILE_EXTENSION);
+    DataInputStream dis = new DataInputStream(new FileInputStream(file));
+    int numRanges = dis.readInt();
+    System.out.println("numRanges = " + numRanges);
+    int[] rangeStart = new int[numRanges];
+    for (int i = 0; i < numRanges; i++) {
+      rangeStart[i] = dis.readInt();
+    }
+    System.out.println("Arrays.toString(rangeStart) = " + 
Arrays.toString(rangeStart));
+    int[] rangeBitmapOffsets = new int[numRanges + 1];
+    for (int i = 0; i <= numRanges; i++) {
+      rangeBitmapOffsets[i] = dis.readInt();
+    }
+    System.out.println("Arrays.toString(rangeBitmapOffsets) = " + 
Arrays.toString(rangeBitmapOffsets));
+    ImmutableRoaringBitmap[] bitmaps = new ImmutableRoaringBitmap[numRanges];
+    for (int i = 0; i < numRanges; i++) {
+      long serializedBitmapLength;
+      serializedBitmapLength = rangeBitmapOffsets[i + 1] - 
rangeBitmapOffsets[i];
+      System.out.println("serializedBitmapLength = " + serializedBitmapLength);
+      byte[] bytes = new byte[(int) serializedBitmapLength];
+      dis.read(bytes, 0, (int) serializedBitmapLength);
+      System.out.println("bytes = " + Arrays.toString(bytes));
+      bitmaps[i] = new ImmutableRoaringBitmap(ByteBuffer.wrap(bytes));
+      System.out.println(bitmaps[i]);
+    }
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/ColumnIndexContainer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/ColumnIndexContainer.java
index 531c3fc..ab9fbfc 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/ColumnIndexContainer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/ColumnIndexContainer.java
@@ -41,6 +41,11 @@ public interface ColumnIndexContainer {
   InvertedIndexReader getInvertedIndex();
 
   /**
+   * Returns the inverted index for the column, or {@code null} if it does not 
exist.
+   */
+  InvertedIndexReader getRangeIndex();
+
+  /**
    * Returns the dictionary for the column, or {@code null} if it does not 
exist.
    */
   Dictionary getDictionary();
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java
index 76ff19e..8bf151c 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java
@@ -32,6 +32,7 @@ import 
org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.core.segment.index.metadata.ColumnMetadata;
 import org.apache.pinot.core.segment.index.readers.BaseImmutableDictionary;
 import org.apache.pinot.core.segment.index.readers.BitmapInvertedIndexReader;
+import org.apache.pinot.core.segment.index.readers.BitmapRangeIndexReader;
 import org.apache.pinot.core.segment.index.readers.BloomFilterReader;
 import org.apache.pinot.core.segment.index.readers.BytesDictionary;
 import org.apache.pinot.core.segment.index.readers.DoubleDictionary;
@@ -60,6 +61,7 @@ public final class PhysicalColumnIndexContainer implements 
ColumnIndexContainer
 
   private final DataFileReader _forwardIndex;
   private final InvertedIndexReader _invertedIndex;
+  private final InvertedIndexReader _rangeIndex;
   private final BaseImmutableDictionary _dictionary;
   private final BloomFilterReader _bloomFilterReader;
   private final NullValueVectorReaderImpl _nullValueVectorReader;
@@ -69,11 +71,13 @@ public final class PhysicalColumnIndexContainer implements 
ColumnIndexContainer
       throws IOException {
     String columnName = metadata.getColumnName();
     boolean loadInvertedIndex = false;
+    boolean loadRangeIndex = false;
     boolean loadTextIndex = false;
     boolean loadOnHeapDictionary = false;
     boolean loadBloomFilter = false;
     if (indexLoadingConfig != null) {
       loadInvertedIndex = 
indexLoadingConfig.getInvertedIndexColumns().contains(columnName);
+      loadRangeIndex = 
indexLoadingConfig.getRangeIndexColumns().contains(columnName);
       loadOnHeapDictionary = 
indexLoadingConfig.getOnHeapDictionaryColumns().contains(columnName);
       loadBloomFilter = 
indexLoadingConfig.getBloomFilterColumns().contains(columnName);
       loadTextIndex = 
indexLoadingConfig.getTextIndexColumns().contains(columnName);
@@ -107,6 +111,7 @@ public final class PhysicalColumnIndexContainer implements 
ColumnIndexContainer
               new SortedIndexReaderImpl(fwdIndexBuffer, 
metadata.getCardinality());
           _forwardIndex = sortedIndexReader;
           _invertedIndex = sortedIndexReader;
+          _rangeIndex = null;
           return;
         } else {
           // Unsorted
@@ -123,14 +128,21 @@ public final class PhysicalColumnIndexContainer 
implements ColumnIndexContainer
         _invertedIndex =
             new 
BitmapInvertedIndexReader(segmentReader.getIndexFor(columnName, 
ColumnIndexType.INVERTED_INDEX),
                 metadata.getCardinality());
+        _rangeIndex = null;
+      } else if (loadRangeIndex) {
+        _invertedIndex = null;
+        _rangeIndex =
+            new BitmapRangeIndexReader(segmentReader.getIndexFor(columnName, 
ColumnIndexType.RANGE_INDEX));
       } else {
         _invertedIndex = null;
+        _rangeIndex = null;
       }
     } else {
       // Raw index
       _forwardIndex = loadRawForwardIndex(fwdIndexBuffer, 
metadata.getDataType());
       _dictionary = null;
       _bloomFilterReader = null;
+      _rangeIndex = null;
       if (loadTextIndex) {
         _invertedIndex = new LuceneTextIndexReader(columnName, 
segmentIndexDir, metadata.getTotalDocs());
       } else {
@@ -150,6 +162,11 @@ public final class PhysicalColumnIndexContainer implements 
ColumnIndexContainer
   }
 
   @Override
+  public InvertedIndexReader getRangeIndex() {
+    return _rangeIndex;
+  }
+
+  @Override
   public BaseImmutableDictionary getDictionary() {
     return _dictionary;
   }
@@ -164,7 +181,8 @@ public final class PhysicalColumnIndexContainer implements 
ColumnIndexContainer
     return _nullValueVectorReader;
   }
 
-  private static BaseImmutableDictionary loadDictionary(PinotDataBuffer 
dictionaryBuffer, ColumnMetadata metadata,
+  //TODO: move this to a DictionaryLoader class
+  public static BaseImmutableDictionary loadDictionary(PinotDataBuffer 
dictionaryBuffer, ColumnMetadata metadata,
       boolean loadOnHeap) {
     FieldSpec.DataType dataType = metadata.getDataType();
     if (loadOnHeap) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/BaseDataSource.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/BaseDataSource.java
index 4fc99cb..fc32487 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/BaseDataSource.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/BaseDataSource.java
@@ -39,17 +39,20 @@ public abstract class BaseDataSource extends DataSource {
   private final DataFileReader _forwardIndex;
   private final Dictionary _dictionary;
   private final InvertedIndexReader _invertedIndex;
+  private final InvertedIndexReader _rangeIndex;
   private final BloomFilterReader _bloomFilter;
   private final NullValueVectorReader _nullValueVector;
   private final String _operatorName;
 
   public BaseDataSource(DataSourceMetadata dataSourceMetadata, DataFileReader 
forwardIndex,
       @Nullable Dictionary dictionary, @Nullable InvertedIndexReader 
invertedIndex,
-      @Nullable BloomFilterReader bloomFilter, @Nullable NullValueVectorReader 
nullValueVector, String operatorName) {
+      @Nullable InvertedIndexReader rangeIndex, @Nullable BloomFilterReader 
bloomFilter,
+      @Nullable NullValueVectorReader nullValueVector, String operatorName) {
     _dataSourceMetadata = dataSourceMetadata;
     _forwardIndex = forwardIndex;
     _dictionary = dictionary;
     _invertedIndex = invertedIndex;
+    _rangeIndex = rangeIndex;
     _bloomFilter = bloomFilter;
     _nullValueVector = nullValueVector;
     _operatorName = operatorName;
@@ -79,6 +82,12 @@ public abstract class BaseDataSource extends DataSource {
 
   @Nullable
   @Override
+  public InvertedIndexReader getRangeIndex() {
+    return _rangeIndex;
+  }
+
+  @Nullable
+  @Override
   public BloomFilterReader getBloomFilter() {
     return _bloomFilter;
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/ImmutableDataSource.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/ImmutableDataSource.java
index a7009a8..b4b0f5a 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/ImmutableDataSource.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/ImmutableDataSource.java
@@ -35,7 +35,7 @@ public class ImmutableDataSource extends BaseDataSource {
 
   public ImmutableDataSource(ColumnMetadata columnMetadata, 
ColumnIndexContainer columnIndexContainer) {
     super(new ImmutableDataSourceMetadata(columnMetadata), 
columnIndexContainer.getForwardIndex(),
-        columnIndexContainer.getDictionary(), 
columnIndexContainer.getInvertedIndex(),
+        columnIndexContainer.getDictionary(), 
columnIndexContainer.getInvertedIndex(), columnIndexContainer.getRangeIndex(),
         columnIndexContainer.getBloomFilter(), 
columnIndexContainer.getNullValueVector(),
         OPERATOR_NAME_PREFIX + columnMetadata.getColumnName());
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/MutableDataSource.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/MutableDataSource.java
index cf8e8bb..70610da 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/MutableDataSource.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/MutableDataSource.java
@@ -40,10 +40,10 @@ public class MutableDataSource extends BaseDataSource {
 
   public MutableDataSource(FieldSpec fieldSpec, int numDocs, int numValues, 
int maxNumValuesPerMVEntry,
       @Nullable PartitionFunction partitionFunction, int partitionId, 
DataFileReader forwardIndex,
-      @Nullable Dictionary dictionary, @Nullable InvertedIndexReader 
invertedIndex,
+      @Nullable Dictionary dictionary, @Nullable InvertedIndexReader 
invertedIndex, @Nullable InvertedIndexReader rangeIndex,
       @Nullable BloomFilterReader bloomFilter, @Nullable NullValueVectorReader 
nullValueVector) {
     super(new MutableDataSourceMetadata(fieldSpec, numDocs, numValues, 
maxNumValuesPerMVEntry, partitionFunction,
-            partitionId), forwardIndex, dictionary, invertedIndex, 
bloomFilter, nullValueVector,
+            partitionId), forwardIndex, dictionary, invertedIndex, rangeIndex, 
bloomFilter, nullValueVector,
         OPERATOR_NAME_PREFIX + fieldSpec.getName());
   }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
index c3a4076..3db11dc 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
@@ -46,6 +46,7 @@ public class IndexLoadingConfig {
   private List<String> _sortedColumns = Collections.emptyList();
   private Set<String> _invertedIndexColumns = new HashSet<>();
   private Set<String> _textIndexColumns = new HashSet<>();
+  private Set<String> _rangeIndexColumns = new HashSet<>();
   private Set<String> _noDictionaryColumns = new HashSet<>(); // TODO: replace 
this by _noDictionaryConfig.
   private Map<String, String> _noDictionaryConfig = new HashMap<>();
   private Set<String> _varLengthDictionaryColumns = new HashSet<>();
@@ -83,6 +84,11 @@ public class IndexLoadingConfig {
       _invertedIndexColumns.addAll(invertedIndexColumns);
     }
 
+    List<String> rangeIndexColumns = indexingConfig.getRangeIndexColumns();
+    if (rangeIndexColumns != null) {
+      _rangeIndexColumns.addAll(rangeIndexColumns);
+    }
+
     List<String> bloomFilterColumns = indexingConfig.getBloomFilterColumns();
     if (bloomFilterColumns != null) {
       _bloomFilterColumns.addAll(bloomFilterColumns);
@@ -194,6 +200,10 @@ public class IndexLoadingConfig {
   @Nonnull
   public Set<String> getInvertedIndexColumns() {
     return _invertedIndexColumns;
+  }  @Nonnull
+
+  public Set<String> getRangeIndexColumns() {
+    return _rangeIndexColumns;
   }
 
   /**
@@ -220,6 +230,14 @@ public class IndexLoadingConfig {
   }
 
   /**
+   * For tests only.
+   */
+  @VisibleForTesting
+  public void setRangeIndexColumns(@Nonnull Set<String> rangeIndexColumns) {
+    _rangeIndexColumns = rangeIndexColumns;
+  }
+
+  /**
    * Used directly from text search unit test code since the test code
    * doesn't really have a table config and is directly testing the
    * query execution code of text search using data from generated segments
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessor.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessor.java
index 564321b..89432ab 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessor.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessor.java
@@ -31,6 +31,7 @@ import 
org.apache.pinot.core.segment.index.loader.columnminmaxvalue.ColumnMinMax
 import 
org.apache.pinot.core.segment.index.loader.defaultcolumn.DefaultColumnHandler;
 import 
org.apache.pinot.core.segment.index.loader.defaultcolumn.DefaultColumnHandlerFactory;
 import 
org.apache.pinot.core.segment.index.loader.invertedindex.InvertedIndexHandler;
+import 
org.apache.pinot.core.segment.index.loader.invertedindex.RangeIndexHandler;
 import 
org.apache.pinot.core.segment.index.loader.invertedindex.TextIndexHandler;
 import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.core.segment.store.SegmentDirectory;
@@ -99,6 +100,11 @@ public class SegmentPreProcessor implements AutoCloseable {
           new InvertedIndexHandler(_indexDir, _segmentMetadata, 
_indexLoadingConfig, segmentWriter);
       invertedIndexHandler.createInvertedIndices();
 
+      // Create column inverted indices according to the index config.
+      RangeIndexHandler rangeIndexHandler =
+          new RangeIndexHandler(_indexDir, _segmentMetadata, 
_indexLoadingConfig, segmentWriter);
+      rangeIndexHandler.createRangeIndices();
+
       Set<String> textIndexColumns = _indexLoadingConfig.getTextIndexColumns();
       if (textIndexColumns.size() > 0) {
         TextIndexHandler textIndexHandler =
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/RangeIndexHandler.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/RangeIndexHandler.java
new file mode 100644
index 0000000..7b2cb1c
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/RangeIndexHandler.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.index.loader.invertedindex;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import javax.annotation.Nonnull;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
+import org.apache.pinot.core.io.reader.DataFileReader;
+import org.apache.pinot.core.io.reader.SingleColumnMultiValueReader;
+import org.apache.pinot.core.io.reader.impl.v1.FixedBitMultiValueReader;
+import org.apache.pinot.core.io.reader.impl.v1.FixedBitSingleValueReader;
+import org.apache.pinot.core.segment.creator.impl.V1Constants;
+import 
org.apache.pinot.core.segment.creator.impl.inv.DictionaryBasedRangeIndexCreator;
+import 
org.apache.pinot.core.segment.creator.impl.inv.OffHeapBitmapInvertedIndexCreator;
+import org.apache.pinot.core.segment.index.column.PhysicalColumnIndexContainer;
+import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.core.segment.index.loader.LoaderUtils;
+import org.apache.pinot.core.segment.index.metadata.ColumnMetadata;
+import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.core.segment.index.readers.BaseImmutableDictionary;
+import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+import org.apache.pinot.core.segment.store.ColumnIndexType;
+import org.apache.pinot.core.segment.store.SegmentDirectory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class RangeIndexHandler {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RangeIndexHandler.class);
+
+  private final File _indexDir;
+  private final SegmentDirectory.Writer _segmentWriter;
+  private final String _segmentName;
+  private final SegmentVersion _segmentVersion;
+  private final Set<ColumnMetadata> _rangeIndexColumns = new HashSet<>();
+
+  public RangeIndexHandler(@Nonnull File indexDir, @Nonnull 
SegmentMetadataImpl segmentMetadata,
+      @Nonnull IndexLoadingConfig indexLoadingConfig, @Nonnull 
SegmentDirectory.Writer segmentWriter) {
+    _indexDir = indexDir;
+    _segmentWriter = segmentWriter;
+    _segmentName = segmentMetadata.getName();
+    _segmentVersion = SegmentVersion.valueOf(segmentMetadata.getVersion());
+
+    // Do not create inverted index for sorted column
+    for (String column : indexLoadingConfig.getRangeIndexColumns()) {
+      ColumnMetadata columnMetadata = 
segmentMetadata.getColumnMetadataFor(column);
+      if (columnMetadata != null && !columnMetadata.isSorted()) {
+        _rangeIndexColumns.add(columnMetadata);
+      }
+    }
+  }
+
+  public void createRangeIndices()
+      throws IOException {
+    for (ColumnMetadata columnMetadata : _rangeIndexColumns) {
+      createRangeIndexForColumn(columnMetadata);
+    }
+  }
+
+  private void createRangeIndexForColumn(ColumnMetadata columnMetadata)
+      throws IOException {
+    String column = columnMetadata.getColumnName();
+
+    File inProgress = new File(_indexDir, column + ".range.inprogress");
+    File rangeIndexFile = new File(_indexDir, column + 
V1Constants.Indexes.BITMAP_RANGE_INDEX_FILE_EXTENSION);
+
+    if (!inProgress.exists()) {
+      // Marker file does not exist, which means last run ended normally.
+
+      if (_segmentWriter.hasIndexFor(column, ColumnIndexType.RANGE_INDEX)) {
+        // Skip creating range index if already exists.
+
+        LOGGER.info("Found Range index for segment: {}, column: {}", 
_segmentName, column);
+        return;
+      }
+
+      // Create a marker file.
+      FileUtils.touch(inProgress);
+    } else {
+      // Marker file exists, which means last run gets interrupted.
+      // Remove inverted index if exists.
+      // For v1 and v2, it's the actual inverted index. For v3, it's the 
temporary inverted index.
+      FileUtils.deleteQuietly(rangeIndexFile);
+    }
+
+    // Create new inverted index for the column.
+    LOGGER.info("Creating new range index for segment: {}, column: {}", 
_segmentName, column);
+    int numDocs = columnMetadata.getTotalDocs();
+
+    if (columnMetadata.hasDictionary()) {
+      PinotDataBuffer dictBuffer =
+          _segmentWriter.getIndexFor(columnMetadata.getColumnName(), 
ColumnIndexType.DICTIONARY);
+      BaseImmutableDictionary dictionary =
+          PhysicalColumnIndexContainer.loadDictionary(dictBuffer, 
columnMetadata, false);
+      handleDictionaryBasedColumn(dictionary, columnMetadata, numDocs);
+    } else {
+
+    }
+
+    // For v3, write the generated inverted index file into the single file 
and remove it.
+    if (_segmentVersion == SegmentVersion.v3) {
+      LoaderUtils.writeIndexToV3Format(_segmentWriter, column, rangeIndexFile, 
ColumnIndexType.RANGE_INDEX);
+    }
+
+    // Delete the marker file.
+    FileUtils.deleteQuietly(inProgress);
+
+    LOGGER.info("Created inverted index for segment: {}, column: {}", 
_segmentName, column);
+  }
+
+  private void handleDictionaryBasedColumn(BaseImmutableDictionary dictionary, 
ColumnMetadata columnMetadata, int numDocs)
+      throws IOException {
+    try (DictionaryBasedRangeIndexCreator creator = new 
DictionaryBasedRangeIndexCreator(_indexDir,
+        columnMetadata.getFieldSpec(), columnMetadata.getCardinality(), 
numDocs,
+        columnMetadata.getTotalNumberOfEntries())) {
+      try (DataFileReader fwdIndex = getForwardIndexReader(columnMetadata, 
_segmentWriter)) {
+        if (columnMetadata.isSingleValue()) {
+          // Single-value column.
+          FixedBitSingleValueReader svFwdIndex = (FixedBitSingleValueReader) 
fwdIndex;
+          for (int i = 0; i < numDocs; i++) {
+            creator.add(svFwdIndex.getInt(i));
+          }
+        } else {
+          // Multi-value column.
+          SingleColumnMultiValueReader mvFwdIndex = 
(SingleColumnMultiValueReader) fwdIndex;
+          int[] dictIds = new int[columnMetadata.getMaxNumberOfMultiValues()];
+          for (int i = 0; i < numDocs; i++) {
+            int length = mvFwdIndex.getIntArray(i, dictIds);
+            creator.add(dictIds, length);
+          }
+        }
+        creator.seal();
+      }
+    }
+  }
+
+  private DataFileReader getForwardIndexReader(ColumnMetadata columnMetadata, 
SegmentDirectory.Writer segmentWriter)
+      throws IOException {
+    PinotDataBuffer buffer = 
segmentWriter.getIndexFor(columnMetadata.getColumnName(), 
ColumnIndexType.FORWARD_INDEX);
+    int numRows = columnMetadata.getTotalDocs();
+    int numBitsPerValue = columnMetadata.getBitsPerElement();
+    if (columnMetadata.isSingleValue()) {
+      return new FixedBitSingleValueReader(buffer, numRows, numBitsPerValue);
+    } else {
+      return new FixedBitMultiValueReader(buffer, numRows, 
columnMetadata.getTotalNumberOfEntries(), numBitsPerValue);
+    }
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadata.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadata.java
index 8ee589a..d01406c 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadata.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadata.java
@@ -96,6 +96,8 @@ public interface SegmentMetadata {
 
   String getBitmapInvertedIndexFileName(String column);
 
+  String getBitmapRangeIndexFileName(String column);
+
   String getBloomFilterFileName(String column);
 
   String getNullValueVectorFileName(String column);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadataImpl.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadataImpl.java
index 5b58f9e..3bbdcdd 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadataImpl.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadataImpl.java
@@ -452,6 +452,11 @@ public class SegmentMetadataImpl implements 
SegmentMetadata {
   }
 
   @Override
+  public String getBitmapRangeIndexFileName(String column) {
+    return column + V1Constants.Indexes.BITMAP_RANGE_INDEX_FILE_EXTENSION;
+  }
+
+  @Override
   public String getBloomFilterFileName(String column) {
     return column + V1Constants.Indexes.BLOOM_FILTER_FILE_EXTENSION;
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BitmapRangeIndexReader.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BitmapRangeIndexReader.java
new file mode 100644
index 0000000..b22004e
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BitmapRangeIndexReader.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.index.readers;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.lang.ref.SoftReference;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class BitmapRangeIndexReader implements 
InvertedIndexReader<ImmutableRoaringBitmap> {
+  public static final Logger LOGGER = 
LoggerFactory.getLogger(BitmapRangeIndexReader.class);
+
+  private final PinotDataBuffer _buffer;
+  private final int _numRanges;
+  final int _bitmapIndexOffset;
+  private final int[] _rangeStartArray;
+
+  private volatile SoftReference<SoftReference<ImmutableRoaringBitmap>[]> 
_bitmaps = null;
+
+  /**
+   * Constructs an inverted index with the specified size.
+   * @param indexDataBuffer data buffer for the inverted index.
+   */
+  public BitmapRangeIndexReader(PinotDataBuffer indexDataBuffer) {
+    _buffer = indexDataBuffer;
+    _numRanges = _buffer.getInt(0);
+    _rangeStartArray = new int[_numRanges];
+    _bitmapIndexOffset = Integer.BYTES + _numRanges * Integer.BYTES;
+    final int lastOffset = _buffer.getInt(_numRanges * Integer.BYTES + 
(_numRanges + 1) * Integer.BYTES);
+
+    Preconditions.checkState(lastOffset == _buffer.size(),
+        "The last offset should be equal to buffer size! Current lastOffset: " 
+ lastOffset + ", buffer size: "
+            + _buffer.size());
+    for (int i = 0; i < _numRanges; i++) {
+      _rangeStartArray[i] = _buffer.getInt(Integer.BYTES + i * Integer.BYTES);
+    }
+    System.out.println("_rangeStartArray = " + _rangeStartArray);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public ImmutableRoaringBitmap getDocIds(int rangeId) {
+    SoftReference<ImmutableRoaringBitmap>[] bitmapArrayReference = null;
+    // Return the bitmap if it's still on heap
+    if (_bitmaps != null) {
+      bitmapArrayReference = _bitmaps.get();
+      if (bitmapArrayReference != null) {
+        SoftReference<ImmutableRoaringBitmap> bitmapReference = 
bitmapArrayReference[rangeId];
+        if (bitmapReference != null) {
+          ImmutableRoaringBitmap value = bitmapReference.get();
+          if (value != null) {
+            return value;
+          }
+        }
+      } else {
+        bitmapArrayReference = new SoftReference[_numRanges];
+        _bitmaps = new 
SoftReference<SoftReference<ImmutableRoaringBitmap>[]>(bitmapArrayReference);
+      }
+    } else {
+      bitmapArrayReference = new SoftReference[_numRanges];
+      _bitmaps = new 
SoftReference<SoftReference<ImmutableRoaringBitmap>[]>(bitmapArrayReference);
+    }
+    synchronized (this) {
+      ImmutableRoaringBitmap value;
+      if (bitmapArrayReference[rangeId] == null || 
bitmapArrayReference[rangeId].get() == null) {
+        value = buildRoaringBitmapForIndex(rangeId);
+        bitmapArrayReference[rangeId] = new 
SoftReference<ImmutableRoaringBitmap>(value);
+      } else {
+        value = bitmapArrayReference[rangeId].get();
+      }
+      return value;
+    }
+  }
+
+  @Override
+  public ImmutableRoaringBitmap getDocIds(Object value) {
+    // This should not be called from anywhere. If it happens, there is a bug
+    // and that's why we throw illegal state exception
+    throw new IllegalStateException("bitmap inverted index reader supports 
lookup only on dictionary id");
+  }
+
+  private synchronized ImmutableRoaringBitmap buildRoaringBitmapForIndex(final 
int rangeId) {
+    final int currentOffset = getOffset(rangeId);
+    final int nextOffset = getOffset(rangeId + 1);
+    final int bufferLength = nextOffset - currentOffset;
+
+    // Slice the buffer appropriately for Roaring Bitmap
+    ByteBuffer bb = _buffer.toDirectByteBuffer(currentOffset, bufferLength);
+    return new ImmutableRoaringBitmap(bb);
+  }
+
+  private int getOffset(final int rangeId) {
+    return _buffer.getInt(_bitmapIndexOffset + rangeId * Integer.BYTES);
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    _buffer.close();
+  }
+
+  public int findRangeId(int dictId) {
+    for (int i = 0; i < _rangeStartArray.length - 1; i++) {
+      if (dictId >= _rangeStartArray[i] && dictId < _rangeStartArray[i + 1]) {
+        return i;
+      }
+    }
+    return _rangeStartArray.length - 1;
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/ColumnIndexDirectory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/ColumnIndexDirectory.java
index 65322b5..6b212d2 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/ColumnIndexDirectory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/ColumnIndexDirectory.java
@@ -98,6 +98,15 @@ abstract class ColumnIndexDirectory implements Closeable {
       throws IOException;
 
   /**
+   * Get inverted index data buffer for a column
+   * @param column column name
+   * @return in-memory ByteBuffer like buffer for data
+   * @throws IOException
+   */
+  public abstract PinotDataBuffer getRangeIndexBufferFor(String column)
+      throws IOException;
+
+  /**
    * Get inverted bloom filter buffer for a column
    * @param column column name
    * @return in-memory ByteBuffer like buffer for data
@@ -152,6 +161,16 @@ abstract class ColumnIndexDirectory implements Closeable {
    * @return in-memory ByteBuffer like buffer for data
    * @throws IOException
    */
+  public abstract PinotDataBuffer newRangeIndexBuffer(String column, long 
sizeBytes)
+      throws IOException;
+
+  /**
+   * Allocate a new data buffer of specified sizeBytes in the columnar index 
directory
+   * @param column column name
+   * @param sizeBytes sizeBytes for the buffer allocation
+   * @return in-memory ByteBuffer like buffer for data
+   * @throws IOException
+   */
   public abstract PinotDataBuffer newBloomFilterBuffer(String column, long 
sizeBytes)
       throws IOException;
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/ColumnIndexType.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/ColumnIndexType.java
index bdb3b6a..dcd21df 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/ColumnIndexType.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/ColumnIndexType.java
@@ -24,7 +24,8 @@ public enum ColumnIndexType {
   INVERTED_INDEX("inverted_index"),
   BLOOM_FILTER("bloom_filter"),
   NULLVALUE_VECTOR("nullvalue_vector"),
-  TEXT_INDEX("text_index");
+  TEXT_INDEX("text_index"),
+  RANGE_INDEX("range_index");
 
   private final String indexName;
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/FilePerIndexDirectory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/FilePerIndexDirectory.java
index 9a2ef40..8f1299e 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/FilePerIndexDirectory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/FilePerIndexDirectory.java
@@ -85,6 +85,20 @@ class FilePerIndexDirectory extends ColumnIndexDirectory {
   }
 
   @Override
+  public PinotDataBuffer getRangeIndexBufferFor(String column)
+      throws IOException {
+    IndexKey key = new IndexKey(column, ColumnIndexType.RANGE_INDEX);
+    return getReadBufferFor(key);
+  }
+
+  @Override
+  public PinotDataBuffer newRangeIndexBuffer(String column, long sizeBytes)
+      throws IOException {
+    IndexKey key = new IndexKey(column, ColumnIndexType.RANGE_INDEX);
+    return getWriteBufferFor(key, sizeBytes);
+  }
+
+  @Override
   public PinotDataBuffer getBloomFilterBufferFor(String column)
       throws IOException {
     IndexKey key = new IndexKey(column, ColumnIndexType.BLOOM_FILTER);
@@ -174,6 +188,9 @@ class FilePerIndexDirectory extends ColumnIndexDirectory {
       case INVERTED_INDEX:
         filename = metadata.getBitmapInvertedIndexFileName(column);
         break;
+      case RANGE_INDEX:
+        filename = metadata.getBitmapRangeIndexFileName(column);
+        break;
       case BLOOM_FILTER:
         filename = metadata.getBloomFilterFileName(column);
         break;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/SegmentLocalFSDirectory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/SegmentLocalFSDirectory.java
index 9e61e2e..c985d37 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/SegmentLocalFSDirectory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/SegmentLocalFSDirectory.java
@@ -234,6 +234,9 @@ class SegmentLocalFSDirectory extends SegmentDirectory {
       case INVERTED_INDEX:
         buffer = columnIndexDirectory.getInvertedIndexBufferFor(column);
         break;
+      case RANGE_INDEX:
+        buffer = columnIndexDirectory.getRangeIndexBufferFor(column);
+        break;
       case BLOOM_FILTER:
         buffer = columnIndexDirectory.getBloomFilterBufferFor(column);
         break;
@@ -355,6 +358,8 @@ class SegmentLocalFSDirectory extends SegmentDirectory {
           return columnIndexDirectory.newForwardIndexBuffer(key.name, 
sizeBytes);
         case INVERTED_INDEX:
           return columnIndexDirectory.newInvertedIndexBuffer(key.name, 
sizeBytes);
+        case RANGE_INDEX:
+          return columnIndexDirectory.newRangeIndexBuffer(key.name, sizeBytes);
         case BLOOM_FILTER:
           return columnIndexDirectory.newBloomFilterBuffer(key.name, 
sizeBytes);
         case NULLVALUE_VECTOR:
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/SingleFileIndexDirectory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/SingleFileIndexDirectory.java
index cc0ebcc..2ba8725 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/SingleFileIndexDirectory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/SingleFileIndexDirectory.java
@@ -111,6 +111,12 @@ class SingleFileIndexDirectory extends 
ColumnIndexDirectory {
   }
 
   @Override
+  public PinotDataBuffer getRangeIndexBufferFor(String column)
+      throws IOException {
+    return checkAndGetIndexBuffer(column, ColumnIndexType.RANGE_INDEX);
+  }
+
+  @Override
   public PinotDataBuffer getBloomFilterBufferFor(String column)
       throws IOException {
     return checkAndGetIndexBuffer(column, ColumnIndexType.BLOOM_FILTER);
@@ -166,6 +172,12 @@ class SingleFileIndexDirectory extends 
ColumnIndexDirectory {
   }
 
   @Override
+  public PinotDataBuffer newRangeIndexBuffer(String column, long sizeBytes)
+      throws IOException {
+    return allocNewBufferInternal(column, ColumnIndexType.RANGE_INDEX, 
sizeBytes, "range_index.create");
+  }
+
+  @Override
   public PinotDataBuffer newBloomFilterBuffer(String column, long sizeBytes)
       throws IOException {
     return allocNewBufferInternal(column, ColumnIndexType.BLOOM_FILTER, 
sizeBytes, "bloom_filter.create");
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/VirtualColumnIndexContainer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/VirtualColumnIndexContainer.java
index de2af42..c91500d 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/VirtualColumnIndexContainer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/VirtualColumnIndexContainer.java
@@ -52,6 +52,11 @@ public class VirtualColumnIndexContainer implements 
ColumnIndexContainer {
   }
 
   @Override
+  public InvertedIndexReader getRangeIndex() {
+    return null;
+  }
+
+  @Override
   public Dictionary getDictionary() {
     return _dictionary;
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/startree/v2/store/StarTreeDataSource.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/startree/v2/store/StarTreeDataSource.java
index da13528..0e6e077 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/startree/v2/store/StarTreeDataSource.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/startree/v2/store/StarTreeDataSource.java
@@ -25,6 +25,7 @@ import org.apache.pinot.core.data.partition.PartitionFunction;
 import org.apache.pinot.core.io.reader.SingleColumnSingleValueReader;
 import org.apache.pinot.core.segment.index.datasource.BaseDataSource;
 import org.apache.pinot.core.segment.index.readers.Dictionary;
+import org.apache.pinot.core.segment.index.readers.InvertedIndexReader;
 import org.apache.pinot.spi.data.FieldSpec;
 
 
@@ -33,7 +34,7 @@ public class StarTreeDataSource extends BaseDataSource {
 
   public StarTreeDataSource(FieldSpec fieldSpec, int numDocs, 
SingleColumnSingleValueReader forwardIndex,
       @Nullable Dictionary dictionary) {
-    super(new StarTreeDataSourceMetadata(fieldSpec, numDocs), forwardIndex, 
dictionary, null, null, null,
+    super(new StarTreeDataSourceMetadata(fieldSpec, numDocs), forwardIndex, 
dictionary, null, null,null, null,
         OPERATOR_NAME_PREFIX + fieldSpec.getName());
   }
 
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/common/RealtimeNoDictionaryTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/common/RealtimeNoDictionaryTest.java
index 9e85b24..a0a5b95 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/common/RealtimeNoDictionaryTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/common/RealtimeNoDictionaryTest.java
@@ -110,17 +110,17 @@ public class RealtimeNoDictionaryTest {
 
     Map<String, DataSource> dataSourceBlock = new HashMap<>();
     dataSourceBlock.put(INT_COL_NAME,
-        new MutableDataSource(intSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, 
intRawIndex, null, null, null, null));
+        new MutableDataSource(intSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, 
intRawIndex, null, null, null,null, null));
     dataSourceBlock.put(LONG_COL_NAME,
-        new MutableDataSource(longSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, 
longRawIndex, null, null, null, null));
+        new MutableDataSource(longSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, 
longRawIndex, null, null, null,null, null));
     dataSourceBlock.put(FLOAT_COL_NAME,
-        new MutableDataSource(floatSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, 
floatRawIndex, null, null, null, null));
+        new MutableDataSource(floatSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, 
floatRawIndex, null, null, null,null, null));
     dataSourceBlock.put(DOUBLE_COL_NAME,
-        new MutableDataSource(doubleSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, 
doubleRawIndex, null, null, null, null));
+        new MutableDataSource(doubleSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, 
doubleRawIndex, null, null, null,null, null));
     dataSourceBlock.put(STRING_COL_NAME,
-        new MutableDataSource(stringSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, 
stringRawIndex, null, null, null, null));
+        new MutableDataSource(stringSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, 
stringRawIndex, null, null, null,null, null));
     dataSourceBlock.put(BYTES_COL_NAME,
-        new MutableDataSource(bytesSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, 
bytesRawIndex, null, null, null, null));
+        new MutableDataSource(bytesSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, 
bytesRawIndex, null, null, null,null, null));
 
     return new DataFetcher(dataSourceBlock);
   }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/IndexingConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/IndexingConfig.java
index e5dd595..732d36e 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/IndexingConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/IndexingConfig.java
@@ -25,6 +25,7 @@ import javax.annotation.Nullable;
 
 public class IndexingConfig extends BaseJsonConfig {
   private List<String> _invertedIndexColumns;
+  private List<String> _rangeIndexColumns;
   private boolean _autoGeneratedInvertedIndex;
   private boolean _createInvertedIndexDuringSegmentGeneration;
   private List<String> _sortedColumn;
@@ -57,6 +58,14 @@ public class IndexingConfig extends BaseJsonConfig {
     _invertedIndexColumns = invertedIndexColumns;
   }
 
+  public List<String> getRangeIndexColumns() {
+    return _rangeIndexColumns;
+  }
+
+  public void setRangeIndexColumns(List<String> rangeIndexColumns) {
+    _rangeIndexColumns = rangeIndexColumns;
+  }
+
   public boolean isAutoGeneratedInvertedIndex() {
     return _autoGeneratedInvertedIndex;
   }
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/scan/query/RangePredicateFilter.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/scan/query/RangePredicateFilter.java
index f93ee87..226aef4 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/scan/query/RangePredicateFilter.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/scan/query/RangePredicateFilter.java
@@ -80,4 +80,12 @@ public class RangePredicateFilter implements PredicateFilter 
{
     }
     return false;
   }
+
+  public int getStartIndex() {
+    return _startIndex;
+  }
+
+  public int getEndIndex() {
+    return _endIndex;
+  }
 }
diff --git 
a/pinot-tools/src/main/resources/examples/batch/baseballStats/baseballStats_offline_table_config.json
 
b/pinot-tools/src/main/resources/examples/batch/baseballStats/baseballStats_offline_table_config.json
index 168c698..88f816b 100644
--- 
a/pinot-tools/src/main/resources/examples/batch/baseballStats/baseballStats_offline_table_config.json
+++ 
b/pinot-tools/src/main/resources/examples/batch/baseballStats/baseballStats_offline_table_config.json
@@ -14,6 +14,9 @@
     "invertedIndexColumns": [
       "playerID",
       "teamID"
+    ],
+    "rangeIndexColumns": [
+      "hits"
     ]
   },
   "metadata": {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to