This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b2d5b6d9e Improve JSON_MATCH performance. (#15049)
8b2d5b6d9e is described below

commit 8b2d5b6d9ee980ee8004d44edb0c0e32ad5f7db0
Author: Bolek Ziobrowski <[email protected]>
AuthorDate: Thu Feb 27 10:19:09 2025 +0100

    Improve JSON_MATCH performance. (#15049)
---
 .../apache/pinot/queries/JsonMatchQueriesTest.java | 125 ++++-
 .../JsonMatchQueriesWithDisableUnnestTest.java     |  42 ++
 .../apache/pinot/perf/BenchmarkQueriesMSQE.java    |   6 +-
 ...hmarkQueries.java => BenchmarkQueriesSSQE.java} |  43 +-
 .../pinot/perf/BenchmarkRoaringBitmapMapping.java  | 317 +++++++++++++
 .../realtime/impl/json/MutableJsonIndexImpl.java   | 467 ++++++++++++-------
 .../segment/index/readers/StringDictionary.java    |  11 +
 .../readers/json/ImmutableJsonIndexReader.java     | 369 +++++++++------
 .../segment/local/segment/index/JsonIndexTest.java | 503 +++++++++++++++------
 9 files changed, 1446 insertions(+), 437 deletions(-)

diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/JsonMatchQueriesTest.java 
b/pinot-core/src/test/java/org/apache/pinot/queries/JsonMatchQueriesTest.java
index 321c9059de..5937d12b1a 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/JsonMatchQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/JsonMatchQueriesTest.java
@@ -18,13 +18,14 @@
  */
 package org.apache.pinot.queries;
 
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.TreeSet;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
@@ -34,11 +35,14 @@ import 
org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
 import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.JsonIndexConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -58,11 +62,11 @@ public class JsonMatchQueriesTest extends BaseQueriesTest {
 
   private static final String ID_COLUMN = "id";
   private static final String JSON_COLUMN = "json";
-  private static final Schema SCHEMA = new 
Schema.SchemaBuilder().addSingleValueDimension(ID_COLUMN, DataType.INT)
-      .addSingleValueDimension(JSON_COLUMN, DataType.JSON).build();
-  private static final TableConfig TABLE_CONFIG =
-      new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setJsonIndexColumns(List.of(JSON_COLUMN))
-          .build();
+
+  private static final Schema SCHEMA = new Schema.SchemaBuilder()
+      .addSingleValueDimension(ID_COLUMN, DataType.INT)
+      .addSingleValueDimension(JSON_COLUMN, DataType.JSON)
+      .build();
 
   private IndexSegment _indexSegment;
   private List<IndexSegment> _indexSegments;
@@ -110,7 +114,27 @@ public class JsonMatchQueriesTest extends BaseQueriesTest {
     // Top-level object with multiple nested-array values
     records.add(createRecord(13, "{\"key\": [1, [\"foo\", [true]]], \"key2\": 
[2, [\"bar\", false]]}"));
 
-    SegmentGeneratorConfig segmentGeneratorConfig = new 
SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
+    // nested arrays used to test not in/not eq predicates
+    records.add(createRecord(15, "{\"cities\":[ \"New York\" ] }"));
+    records.add(createRecord(16, "{\"cities\":[ \"Washington\", \"New York\"] 
}"));
+    records.add(createRecord(17, "{\"cities\":[ \"New York\", \"Washington\"] 
}"));
+    records.add(createRecord(18, "{\"cities\":[ \"Washington\"] }"));
+    records.add(createRecord(19, "{\"cities\":[ \"San Francisco\"] }"));
+    records.add(createRecord(20, "{\"cities\":[ \"San Francisco\", \"Miami\", 
\"Washington\"] }"));
+    records.add(createRecord(21, "{\"cities\":[] }"));
+    records.add(createRecord(22, "{\"cities\":[\"\"] }"));
+    records.add(createRecord(23, "{\"cities\":[ \"Washington\", 
\"Washington\"] }"));
+
+    // regular field used to test not in/not eq predicates
+    records.add(createRecord(24, "{\"country\": \"USA\"}"));
+    records.add(createRecord(25, "{\"country\": \"Canada\"}"));
+    records.add(createRecord(26, "{\"country\": \"Mexico\"}"));
+    records.add(createRecord(27, "{\"country\":\"\"}"));
+    records.add(createRecord(28, "{\"country\":null}"));
+
+    TableConfig tableConfig = getTableConfig();
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new 
SegmentGeneratorConfig(tableConfig, SCHEMA);
     segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
     segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
     segmentGeneratorConfig.setOutDir(INDEX_DIR.getPath());
@@ -119,13 +143,34 @@ public class JsonMatchQueriesTest extends BaseQueriesTest 
{
     driver.init(segmentGeneratorConfig, new GenericRowRecordReader(records));
     driver.build();
 
-    IndexLoadingConfig indexLoadingConfig = new 
IndexLoadingConfig(TABLE_CONFIG, SCHEMA);
+    IndexLoadingConfig indexLoadingConfig = new 
IndexLoadingConfig(tableConfig, SCHEMA);
     ImmutableSegment immutableSegment =
         ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), 
indexLoadingConfig);
     _indexSegment = immutableSegment;
     _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
   }
 
+  protected TableConfig getTableConfig() {
+    ObjectNode indexes = JsonUtils.newObjectNode();
+    JsonIndexConfig config = new JsonIndexConfig();
+    config.setDisableCrossArrayUnnest(isDisableCrossArrayUnnest());
+    indexes.put("json", config.toJsonNode());
+
+    return new TableConfigBuilder(TableType.OFFLINE)
+        .setTableName(RAW_TABLE_NAME)
+        //.setJsonIndexColumns(List.of(JSON_COLUMN))
+        .addFieldConfig(
+            new FieldConfig.Builder(JSON_COLUMN)
+                .withEncodingType(FieldConfig.EncodingType.RAW)
+                .withIndexes(indexes)
+                .build())
+        .build();
+  }
+
+  protected boolean isDisableCrossArrayUnnest() {
+    return false; // default value
+  }
+
   private GenericRow createRecord(int id, Object value) {
     GenericRow record = new GenericRow();
     record.putValue(ID_COLUMN, id);
@@ -173,9 +218,6 @@ public class JsonMatchQueriesTest extends BaseQueriesTest {
     assertEquals(getSelectedIds("'\"$.key[1][*]\"=true'"), Set.of(12));
     assertEquals(getSelectedIds("'\"$.key[1][1][0]\"=true'"), Set.of(13));
 
-    // Top-level object with multiple nested-array values
-    assertEquals(getSelectedIds("'\"$.key[*][*][*]\"=true AND 
\"$.key2[1][0]\"=''bar'''"), Set.of(13));
-
     // Legacy query format
     assertEquals(getSelectedIds("'key=1'"), Set.of(9));
     assertEquals(getSelectedIds("'key=''foo'''"), Set.of(10));
@@ -188,11 +230,68 @@ public class JsonMatchQueriesTest extends BaseQueriesTest 
{
     assertEquals(getSelectedIds("'\"key[1][1][0]\"=true'"), Set.of(13));
   }
 
-  private Set<Integer> getSelectedIds(String jsonMatchExpression) {
+  @Test
+  public void testQueriesOnNestedArrays() {
+    // Top-level object with multiple nested-array values
+    assertEquals(getSelectedIds("'\"$.key[*][*][*]\"=true AND 
\"$.key2[1][0]\"=''bar'''"), Set.of(13));
+    // searching one more than one nested arrays work when 
'disableCrossArrayUnnest' is false (default)
+    assertEquals(getSelectedIds("'\"$.key[0]\"=1 AND \"$.key2[0]\"=2'"), 
Set.of(13));
+  }
+
+  @Test
+  public void testOtherQueries() {
+    // NOT_EQ on array
+    assertEquals(getSelectedIds("'\"$.cities[0]\" != ''Seattle'' '"), 
Set.of(15, 16, 17, 18, 19, 20, 22, 23));
+    assertEquals(getSelectedIds("'\"$.cities[*]\" != ''Seattle'' '"), 
Set.of(15, 16, 17, 18, 19, 20, 22, 23));
+
+    assertEquals(getSelectedIds("'\"$.cities[0]\" != ''Washington'' '"), 
Set.of(15, 17, 19, 20, 22));
+    assertEquals(getSelectedIds("'\"$.cities[1]\" != ''Washington'' '"), 
Set.of(16, 20));
+    assertEquals(getSelectedIds("'\"$.cities[*]\" != ''Washington'' '"), 
Set.of(15, 16, 17, 19, 20, 22));
+
+    // NOT_IN on array
+    assertEquals(getSelectedIds("'\"$.cities[0]\" NOT IN (''Seattle'') '"), 
Set.of(15, 16, 17, 18, 19, 20, 22, 23));
+    assertEquals(getSelectedIds("'\"$.cities[*]\" NOT IN (''Seattle'') '"), 
Set.of(15, 16, 17, 18, 19, 20, 22, 23));
+    assertEquals(getSelectedIds("'\"$.cities[0]\" NOT IN (''Seattle'', 
''Boston'') '"),
+        Set.of(15, 16, 17, 18, 19, 20, 22, 23));
+    assertEquals(getSelectedIds("'\"$.cities[*]\" NOT IN (''Seattle'', 
''Boston'') '"),
+        Set.of(15, 16, 17, 18, 19, 20, 22, 23));
+
+    assertEquals(getSelectedIds("'\"$.cities[0]\" NOT IN (''Washington'') '"), 
Set.of(15, 17, 19, 20, 22));
+    assertEquals(getSelectedIds("'\"$.cities[1]\" NOT IN (''Washington'') '"), 
Set.of(16, 20));
+    assertEquals(getSelectedIds("'\"$.cities[*]\" NOT IN (''Washington'') '"), 
Set.of(15, 16, 17, 19, 20, 22));
+
+    assertEquals(getSelectedIds("'\"$.cities[0]\" NOT IN (''Washington'', 
''New York'') '"), Set.of(19, 20, 22));
+    assertEquals(getSelectedIds("'\"$.cities[1]\" NOT IN (''Washington'', 
''New York'') '"), Set.of(20));
+    assertEquals(getSelectedIds("'\"$.cities[*]\" NOT IN (''Washington'', 
''New York'') '"), Set.of(19, 20, 22));
+
+    // NOT_EQ on field
+    assertEquals(getSelectedIds("'\"$.country\" != ''USA'' '"), Set.of(25, 26, 
27));
+    assertEquals(getSelectedIds("'\"$.country\" != ''Canada'' '"), Set.of(24, 
26, 27));
+    // '"$.country" != '''' throws error for some reason,
+    assertEquals(getSelectedIds("'\"$.country\" != '' '' '"), Set.of(24, 25, 
26, 27));
+    assertEquals(getSelectedIds("'\"$.country\" != ''Brazil'' '"), Set.of(24, 
25, 26, 27));
+
+    // NOT IN on field
+    assertEquals(getSelectedIds("'\"$.country\" NOT IN (''USA'') '"), 
Set.of(25, 26, 27));
+    assertEquals(getSelectedIds("'\"$.country\" NOT IN (''Canada'') '"), 
Set.of(24, 26, 27));
+    assertEquals(getSelectedIds("'\"$.country\" NOT IN (''USA'', ''Canada'') 
'"), Set.of(26, 27));
+    // '\"$.country\" NOT IN ('''')  throws error for some reason
+    assertEquals(getSelectedIds("'\"$.country\" NOT IN ('' '') '"), Set.of(24, 
25, 26, 27));
+    assertEquals(getSelectedIds("'\"$.country\" NOT IN (''Brazil'', 
''Panama'') '"), Set.of(24, 25, 26, 27));
+
+    assertEquals(getSelectedIds("'REGEXP_LIKE(\"$.country\" , 
''Brazil|Panama'') '"), Set.of());
+    assertEquals(getSelectedIds("'REGEXP_LIKE(\"$.country\" , ''USA|Canada'') 
'"), Set.of(24, 25));
+    assertEquals(getSelectedIds("'REGEXP_LIKE(\"$.country\" , ''[MC][ea].*'') 
'"), Set.of(25, 26));
+    assertEquals(getSelectedIds("'REGEXP_LIKE(\"$.country\" , ''US.*'') '"), 
Set.of(24));
+
+    assertEquals(getSelectedIds("'\"$.country\" < ''Romania'' '"), Set.of(25, 
26, 27));
+  }
+
+  protected Set<Integer> getSelectedIds(String jsonMatchExpression) {
     String query = String.format("SELECT id FROM testTable WHERE 
JSON_MATCH(json, %s) LIMIT 100", jsonMatchExpression);
     BrokerResponseNative brokerResponse = getBrokerResponse(query);
     List<Object[]> rows = brokerResponse.getResultTable().getRows();
-    Set<Integer> selectedIds = new HashSet<>();
+    Set<Integer> selectedIds = new TreeSet<>();
     for (Object[] row : rows) {
       selectedIds.add((Integer) row[0]);
     }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/JsonMatchQueriesWithDisableUnnestTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/JsonMatchQueriesWithDisableUnnestTest.java
new file mode 100644
index 0000000000..8971fc4fc7
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/JsonMatchQueriesWithDisableUnnestTest.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+import java.util.Set;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+// same as JsonMatchQueriesTest but with array un-nesting disabled
+public class JsonMatchQueriesWithDisableUnnestTest extends 
JsonMatchQueriesTest {
+
+  @Override
+  protected boolean isDisableCrossArrayUnnest() {
+    return true;
+  }
+
+  @Test
+  public void testQueriesOnNestedArrays() {
+    // Top-level object with multiple nested-array values
+    // Searching one more than one nested arrays work when 
'disableCrossArrayUnnest' is false (default)
+    assertEquals(getSelectedIds("'\"$.key[*][*][*]\"=true AND 
\"$.key2[1][0]\"=''bar'''"), Set.of());
+    assertEquals(getSelectedIds("'\"$.key[0]\"=1 AND \"$.key2[0]\"=2'"), 
Set.of());
+  }
+}
diff --git 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkQueriesMSQE.java 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkQueriesMSQE.java
index c70c25bd2f..9728861f99 100644
--- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkQueriesMSQE.java
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkQueriesMSQE.java
@@ -121,7 +121,7 @@ public class BenchmarkQueriesMSQE extends 
BaseClusterIntegrationTest {
       + "  from MyTable \n"
       + "  limit 100000\n"
       + ") \n"
-      + "where regexp_like_const('.*a.*', RAW_STRING_COL )";
+      + "where regexp_like('.*a.*', RAW_STRING_COL )";
 
   public static final String REGEXP_LIKE_VAR_QUERY = "select * from \n"
       + "(\n"
@@ -129,7 +129,7 @@ public class BenchmarkQueriesMSQE extends 
BaseClusterIntegrationTest {
       + "  from MyTable \n"
       + "  limit 100000\n"
       + ") \n"
-      + "where regexp_like('.*a.*', RAW_STRING_COL )";
+      + "where regexp_like_var('.*a.*', RAW_STRING_COL )";
 
   private Distribution.DataSupplier _supplier;
 
@@ -199,7 +199,7 @@ public class BenchmarkQueriesMSQE extends 
BaseClusterIntegrationTest {
 
   private void buildSegment(String segmentName)
       throws Exception {
-    LazyDataGenerator rows = BenchmarkQueries.createTestData(_numRows, 
_supplier);
+    LazyDataGenerator rows = BenchmarkQueriesSSQE.createTestData(_numRows, 
_supplier);
     SegmentGeneratorConfig config = new SegmentGeneratorConfig(TABLE_CONFIG, 
SCHEMA);
     config.setOutDir(_segmentDir.getPath());
     config.setTableName(TABLE_NAME);
diff --git 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkQueries.java 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkQueriesSSQE.java
similarity index 89%
rename from pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkQueries.java
rename to 
pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkQueriesSSQE.java
index 0b0b4a4abe..b6e09da346 100644
--- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkQueries.java
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkQueriesSSQE.java
@@ -69,11 +69,11 @@ import org.openjdk.jmh.runner.options.OptionsBuilder;
 @Warmup(iterations = 5, time = 1)
 @Measurement(iterations = 5, time = 1)
 @State(Scope.Benchmark)
-public class BenchmarkQueries extends BaseQueriesTest {
+public class BenchmarkQueriesSSQE extends BaseQueriesTest {
 
   public static void main(String[] args)
       throws Exception {
-    ChainedOptionsBuilder opt = new 
OptionsBuilder().include(BenchmarkQueries.class.getSimpleName());
+    ChainedOptionsBuilder opt = new 
OptionsBuilder().include(BenchmarkQueriesSSQE.class.getSimpleName());
     new Runner(opt.build()).run();
   }
 
@@ -88,6 +88,7 @@ public class BenchmarkQueries extends BaseQueriesTest {
   private static final String NO_INDEX_STRING_COL = "NO_INDEX_STRING_COL";
   private static final String LOW_CARDINALITY_STRING_COL = 
"LOW_CARDINALITY_STRING_COL";
   private static final String TIMESTAMP_COL = "TSTMP_COL";
+  private static final String JSON_COL = "JSON_COL";
   private static final List<FieldConfig> FIELD_CONFIGS = new ArrayList<>();
 
   private static final TableConfig TABLE_CONFIG = new 
TableConfigBuilder(TableType.OFFLINE)
@@ -97,6 +98,7 @@ public class BenchmarkQueries extends BaseQueriesTest {
       .setNoDictionaryColumns(List.of(RAW_INT_COL_NAME, RAW_STRING_COL_NAME, 
TIMESTAMP_COL))
       .setSortedColumn(SORTED_COL_NAME)
       .setRangeIndexColumns(List.of(INT_COL_NAME, LOW_CARDINALITY_STRING_COL))
+      .setJsonIndexColumns(List.of(JSON_COL))
       .setStarTreeIndexConfigs(
           Collections.singletonList(
               new StarTreeIndexConfig(List.of(SORTED_COL_NAME, INT_COL_NAME), 
null,
@@ -114,6 +116,7 @@ public class BenchmarkQueries extends BaseQueriesTest {
       .addSingleValueDimension(NO_INDEX_STRING_COL, FieldSpec.DataType.STRING)
       .addSingleValueDimension(LOW_CARDINALITY_STRING_COL, 
FieldSpec.DataType.STRING)
       .addSingleValueDimension(TIMESTAMP_COL, FieldSpec.DataType.TIMESTAMP)
+      .addSingleValueDimension(JSON_COL, FieldSpec.DataType.JSON)
       .build();
 
   public static final String FILTERED_QUERY = "SELECT SUM(INT_COL) 
FILTER(WHERE INT_COL > 123 AND INT_COL < 599999),"
@@ -200,6 +203,19 @@ public class BenchmarkQueries extends BaseQueriesTest {
           + " group by 1 "
           + " limit 1000000\n";
 
+  public static final String JSON_MATCH_QUERY =
+      "SELECT\n"
+          + "  COUNT(*) AS count,\n"
+          + "  SUM(INT_COL) AS size,\n"
+          + "  LOW_CARDINALITY_STRING_COL as type\n"
+          + "FROM MyTable\n"
+          + "WHERE JSON_MATCH(\n"
+          + "    JSON_COL,\t\n"
+          + "    '(\"$.type\" = ''type0'' OR (\"$.type\" = ''type1'' AND 
(\"$.changes[0].author.name\" != ''author10''"
+          + "     OR \"$.changes[1].author.name\" IS NOT NULL)))'\n"
+          + "  )\n"
+          + "GROUP BY LOW_CARDINALITY_STRING_COL";
+
   @Param({"1", "2", "10", "50"})
   private int _numSegments;
   @Param("1500000")
@@ -212,7 +228,7 @@ public class BenchmarkQueries extends BaseQueriesTest {
       RAW_COLUMN_SUMMARY_STATS, COUNT_OVER_BITMAP_INDEX_IN, 
COUNT_OVER_BITMAP_INDEXES,
       COUNT_OVER_BITMAP_AND_SORTED_INDEXES, COUNT_OVER_BITMAP_INDEX_EQUALS, 
STARTREE_SUM_QUERY, STARTREE_FILTER_QUERY,
       FILTERING_BITMAP_SCAN_QUERY, FILTERING_SCAN_QUERY, 
FILTERING_ON_TIMESTAMP_WORKAROUND_QUERY,
-      FILTERING_ON_TIMESTAMP_QUERY, REGEXP_REPLACE_QUERY
+      FILTERING_ON_TIMESTAMP_QUERY, REGEXP_REPLACE_QUERY, JSON_MATCH_QUERY
   })
   String _query;
   private IndexSegment _indexSegment;
@@ -253,6 +269,7 @@ public class BenchmarkQueries extends BaseQueriesTest {
       private final String[] _lowCardinalityValues =
           IntStream.range(0, 10).mapToObj(i -> "value" + 
i).toArray(String[]::new);
       private Distribution.DataSupplier _supplier = supplier;
+      private String[] _jsons = generateJsons();
 
       @Override
       public int size() {
@@ -270,6 +287,7 @@ public class BenchmarkQueries extends BaseQueriesTest {
         row.putValue(NO_INDEX_STRING_COL, row.getValue(RAW_STRING_COL_NAME));
         row.putValue(LOW_CARDINALITY_STRING_COL, _lowCardinalityValues[i % 
_lowCardinalityValues.length]);
         row.putValue(TIMESTAMP_COL, i * 1200 * 1000L);
+        row.putValue(JSON_COL, _jsons[i % _jsons.length]);
 
         return null;
       }
@@ -279,6 +297,25 @@ public class BenchmarkQueries extends BaseQueriesTest {
         _strings.clear();
         _supplier.reset();
       }
+
+      private String[] generateJsons() {
+        String[] jsons = new String[1000];
+        StringBuilder buffer = new StringBuilder();
+
+        for (int i = 0; i < jsons.length; i++) {
+          buffer.setLength(0);
+          buffer.append("{ \"type\": \"type").append(i % 50).append("\"")
+              .append(", \"changes\": [ ")
+              .append("{ \"author\": { \"name\": \"author").append(i % 
1000).append("\" } }");
+          if (i % 2 == 0) {
+            buffer.append(", { \"author\": { \"name\": \"author").append(i % 
100).append("\" } }");
+          }
+          buffer.append(" ] }");
+          jsons[i] = buffer.toString();
+        }
+
+        return jsons;
+      }
     };
   }
 
diff --git 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRoaringBitmapMapping.java
 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRoaringBitmapMapping.java
new file mode 100644
index 0000000000..cd09923f9d
--- /dev/null
+++ 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRoaringBitmapMapping.java
@@ -0,0 +1,317 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.perf;
+
+import com.google.common.io.Resources;
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.segment.spi.memory.PinotByteBuffer;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.profile.GCProfiler;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.ChainedOptionsBuilder;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.roaringbitmap.IntConsumer;
+import org.roaringbitmap.RoaringBitmap;
+import org.roaringbitmap.RoaringBitmapWriter;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+
+
+/**
+ * Test optimal settings for transforming bitmap via mapping.
+ *  Depends on following files:
+ *  - docMapping.buffer (json flattened doc ids -> doc ids mapping)
+ *  - test.bitmap (serialized mutable roaring bitmap)
+ *  that have to be generated (copied from pinot instance) before benchmark 
run.
+ *  */
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@Fork(1)
+@Warmup(iterations = 3, time = 1)
+@Measurement(iterations = 5, time = 1)
+@State(Scope.Benchmark)
+public class BenchmarkRoaringBitmapMapping {
+
+  public static void main(String[] args)
+      throws Exception {
+    ChainedOptionsBuilder opt = new OptionsBuilder()
+        .shouldDoGC(true)
+        .addProfiler(GCProfiler.class)
+        //.addProfiler(JavaFlightRecorderProfiler.class)
+        .include(BenchmarkRoaringBitmapMapping.class.getSimpleName());
+    new Runner(opt.build()).run();
+  }
+
+  PinotDataBuffer _bitmapBuffer;
+  ImmutableRoaringBitmap _docIds;
+  PinotDataBuffer _docIdMapping;
+
+  private int getDocId(int flattenedDocId) {
+    return _docIdMapping.getInt((long) flattenedDocId << 2);
+  }
+
+  @Setup
+  public void setUp()
+      throws IOException {
+    String fileName = "test.bitmap";
+
+    _bitmapBuffer = getPinotDataBuffer(fileName);
+    _docIds = new ImmutableRoaringBitmap(
+        _bitmapBuffer.toDirectByteBuffer(0, (int) _bitmapBuffer.size()));
+    _docIdMapping = getPinotDataBuffer("docMapping.buffer");
+  }
+
+  private static PinotDataBuffer getPinotDataBuffer(String fileName)
+      throws IOException {
+    URL bitmapUrl = Resources.getResource(fileName);
+    File file = new File(bitmapUrl.getFile());
+    if (!file.exists()) {
+      throw new RuntimeException("File test.bitmap doesn't exist!");
+    }
+    return PinotByteBuffer.mapReadOnlyBigEndianFile(file);
+  }
+
+  @TearDown
+  public void tearDown()
+      throws IOException {
+    if (_bitmapBuffer != null) {
+      try {
+        _bitmapBuffer.close();
+      } catch (Exception e) {
+        // Ignore
+      }
+    }
+
+    if (_docIdMapping != null) {
+      try {
+        _docIdMapping.close();
+      } catch (Exception e) {
+        // Ignore
+      }
+    }
+  }
+
+  @Benchmark
+  public MutableRoaringBitmap mapWithDefaults() {
+    RoaringBitmapWriter<MutableRoaringBitmap> writer = 
RoaringBitmapWriter.bufferWriter()
+        .get();
+    return map(writer);
+  }
+
+  @Benchmark
+  public MutableRoaringBitmap mapWithInitCapacity() {
+    RoaringBitmapWriter<MutableRoaringBitmap> writer = 
RoaringBitmapWriter.bufferWriter()
+        .initialCapacity((_docIds.getCardinality() >>> 16) + 1)
+        .get();
+    return map(writer);
+  }
+
+  @Benchmark
+  public MutableRoaringBitmap mapWithMaxInitCapacity() {
+    RoaringBitmapWriter<MutableRoaringBitmap> writer = 
RoaringBitmapWriter.bufferWriter()
+        .initialCapacity(65534)
+        .get();
+    return map(writer);
+  }
+
+  @Benchmark
+  public MutableRoaringBitmap mapWithRunCompressDisabled() {
+    RoaringBitmapWriter<MutableRoaringBitmap> writer = 
RoaringBitmapWriter.bufferWriter()
+        .runCompress(false)
+        .get();
+    return map(writer);
+  }
+
+  @Benchmark
+  public MutableRoaringBitmap mapWithPartialRadixSort() {
+    RoaringBitmapWriter<MutableRoaringBitmap> writer = 
RoaringBitmapWriter.bufferWriter()
+        .doPartialRadixSort()
+        .get();
+
+    int[] buffer = new int[1024];
+
+    IntConsumer consumer = new IntConsumer() {
+      int _idx = 0;
+
+      @Override
+      public void accept(int value) {
+        buffer[_idx++] = getDocId(value);
+        if (_idx == 1024) {
+          writer.addMany(buffer);
+          _idx = 0;
+        }
+      }
+    };
+    _docIds.forEach(consumer);
+
+    // ignore small leftover
+
+    return writer.get();
+  }
+
+  @Benchmark
+  public MutableRoaringBitmap mapWithPartialRadixSortPrealloc() {
+    RoaringBitmapWriter<MutableRoaringBitmap> writer = 
RoaringBitmapWriter.bufferWriter()
+        .get();
+
+    final int[] buffer = new int[10 * 1024];
+    final int bufLen = buffer.length;
+
+    IntConsumer consumer = new IntConsumer() {
+      int _idx = 0;
+      final int[] _low = new int[257];
+      final int[] _high = new int[257];
+      int[] _copy = new int[buffer.length];
+
+      @Override
+      public void accept(int value) {
+        buffer[_idx++] = getDocId(value);
+        if (_idx == bufLen) {
+          partialRadixSort(buffer, _low, _high, _copy);
+          writer.addMany(buffer);
+          _idx = 0;
+        }
+      }
+    };
+    _docIds.forEach(consumer);
+
+    // ignore small leftover
+
+    return writer.get();
+  }
+
+  @Benchmark
+  public MutableRoaringBitmap mapWithOptimisedForRunsAppender() {
+    RoaringBitmapWriter<MutableRoaringBitmap> writer = 
RoaringBitmapWriter.bufferWriter()
+        .optimiseForRuns()
+        .get();
+    return map(writer);
+  }
+
+  @Benchmark
+  public MutableRoaringBitmap mapWithOptimisedForArraysAppender() {
+    RoaringBitmapWriter<MutableRoaringBitmap> writer = 
RoaringBitmapWriter.bufferWriter()
+        .optimiseForArrays()
+        .get();
+    return map(writer);
+  }
+
+  @Benchmark
+  public MutableRoaringBitmap mapSimple() {
+    MutableRoaringBitmap target = new MutableRoaringBitmap();
+    _docIds.forEach((IntConsumer) flattenedDocId -> 
target.add(getDocId(flattenedDocId)));
+    return target;
+  }
+
+  @Benchmark
+  public RoaringBitmap mapRoaringSimple() {
+    RoaringBitmap target = new RoaringBitmap();
+    _docIds.forEach((IntConsumer) flattenedDocId -> 
target.add(getDocId(flattenedDocId)));
+    return target;
+  }
+
+  @Benchmark
+  public RoaringBitmap mapRoaringAppender() {
+    RoaringBitmapWriter<RoaringBitmap> writer = RoaringBitmapWriter.writer()
+        .get();
+    _docIds.forEach((IntConsumer) flattenedDocId -> 
writer.add(getDocId(flattenedDocId)));
+    RoaringBitmap result = writer.get();
+    return result;
+  }
+
+  @Benchmark
+  public RoaringBitmap mapRoaringAppenderConstantMem() {
+    RoaringBitmapWriter<RoaringBitmap> writer = RoaringBitmapWriter.writer()
+        .constantMemory()
+        .get();
+    _docIds.forEach((IntConsumer) flattenedDocId -> 
writer.add(getDocId(flattenedDocId)));
+    return writer.get();
+  }
+
+  @Benchmark
+  public long iterateMapping() {
+    long result = 0;
+    for (int i = 0, n = (int) _docIdMapping.size() / 8; i < n; i++) {
+      result += _docIdMapping.getLong(i);
+    }
+    return result;
+  }
+
+  private MutableRoaringBitmap map(RoaringBitmapWriter<MutableRoaringBitmap> 
writer) {
+    _docIds.forEach((IntConsumer) flattenedDocId -> 
writer.add(getDocId(flattenedDocId)));
+    return writer.get();
+  }
+
+  // same as partialRadixSort in RB, but with arrays pre-allocated
+  private static void partialRadixSort(int[] data, int[] low, int[] high, 
int[] copy) {
+    Arrays.fill(low, 0);
+    Arrays.fill(high, 0);
+    for (int value : data) {
+      ++low[((value >>> 16) & 0xFF) + 1];
+      ++high[(value >>> 24) + 1];
+    }
+    // avoid passes over the data if it's not required
+    boolean sortLow = low[1] < data.length;
+    boolean sortHigh = high[1] < data.length;
+    if (!sortLow && !sortHigh) {
+      return;
+    }
+    Arrays.fill(copy, 0);
+    if (sortLow) {
+      for (int i = 1; i < low.length; i++) {
+        low[i] += low[i - 1];
+      }
+      for (int value : data) {
+        copy[low[(value >>> 16) & 0xFF]++] = value;
+      }
+    }
+    if (sortHigh) {
+      for (int i = 1; i < high.length; i++) {
+        high[i] += high[i - 1];
+      }
+      if (sortLow) {
+        for (int value : copy) {
+          data[high[value >>> 24]++] = value;
+        }
+      } else {
+        for (int value : data) {
+          copy[high[value >>> 24]++] = value;
+        }
+        System.arraycopy(copy, 0, data, 0, data.length);
+      }
+    } else {
+      System.arraycopy(copy, 0, data, 0, data.length);
+    }
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java
index 23de292693..92529b8263 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java
@@ -44,6 +44,7 @@ import 
org.apache.pinot.common.request.context.predicate.NotInPredicate;
 import org.apache.pinot.common.request.context.predicate.Predicate;
 import org.apache.pinot.common.request.context.predicate.RangePredicate;
 import org.apache.pinot.common.request.context.predicate.RegexpLikePredicate;
+import org.apache.pinot.common.utils.regex.Matcher;
 import org.apache.pinot.common.utils.regex.Pattern;
 import org.apache.pinot.segment.spi.index.creator.JsonIndexCreator;
 import org.apache.pinot.segment.spi.index.mutable.MutableJsonIndex;
@@ -139,17 +140,15 @@ public class MutableJsonIndexImpl implements 
MutableJsonIndex {
       if (filter.getType() == FilterContext.Type.PREDICATE && 
isExclusive(filter.getPredicate().getType())) {
         // Handle exclusive predicate separately because the flip can only be 
applied to the unflattened doc ids in
         // order to get the correct result, and it cannot be nested
-        RoaringBitmap matchingFlattenedDocIds = 
getMatchingFlattenedDocIds(filter.getPredicate());
+        LazyBitmap flattenedDocIds = 
getMatchingFlattenedDocIds(filter.getPredicate());
         MutableRoaringBitmap matchingDocIds = new MutableRoaringBitmap();
-        matchingFlattenedDocIds.forEach(
-            (IntConsumer) flattenedDocId -> 
matchingDocIds.add(_docIdMapping.getInt(flattenedDocId)));
+        flattenedDocIds.forEach(flattenedDocId -> 
matchingDocIds.add(_docIdMapping.getInt(flattenedDocId)));
         matchingDocIds.flip(0, (long) _nextDocId);
         return matchingDocIds;
       } else {
-        RoaringBitmap matchingFlattenedDocIds = 
getMatchingFlattenedDocIds(filter);
+        LazyBitmap flattenedDocIds = getMatchingFlattenedDocIds(filter);
         MutableRoaringBitmap matchingDocIds = new MutableRoaringBitmap();
-        matchingFlattenedDocIds.forEach(
-            (IntConsumer) flattenedDocId -> 
matchingDocIds.add(_docIdMapping.getInt(flattenedDocId)));
+        flattenedDocIds.forEach(flattenedDocId -> 
matchingDocIds.add(_docIdMapping.getInt(flattenedDocId)));
         return matchingDocIds;
       }
     } finally {
@@ -164,26 +163,141 @@ public class MutableJsonIndexImpl implements 
MutableJsonIndex {
     return predicateType == Predicate.Type.IS_NULL;
   }
 
+  /** This class allows delaying of cloning posting list bitmap for as long as 
possible
+   * It stores either a bitmap from posting list that must be cloned before 
mutating (readOnly=true)
+   * or an already  cloned bitmap.
+   */
+  static class LazyBitmap {
+
+    final static LazyBitmap EMPTY_BITMAP = new LazyBitmap(null);
+
+    // value should be null only for EMPTY
+    @Nullable
+    RoaringBitmap _value;
+
+    // if readOnly then bitmap needs to be cloned before applying mutating 
operations
+    boolean _readOnly;
+
+    LazyBitmap(RoaringBitmap bitmap) {
+      _value = bitmap;
+      _readOnly = true;
+    }
+
+    LazyBitmap(RoaringBitmap bitmap, boolean isReadOnly) {
+      _value = bitmap;
+      _readOnly = isReadOnly;
+    }
+
+    boolean isMutable() {
+      return !_readOnly;
+    }
+
+    LazyBitmap toMutable() {
+      if (_readOnly) {
+        if (_value == null) {
+          return new LazyBitmap(new RoaringBitmap(), false);
+        }
+
+        _value = _value.clone();
+        _readOnly = false;
+      }
+
+      return this;
+    }
+
+    void and(LazyBitmap bitmap) {
+      assert isMutable();
+
+      _value.and(bitmap._value);
+    }
+
+    LazyBitmap and(RoaringBitmap bitmap) {
+      LazyBitmap mutable = toMutable();
+      mutable._value.and(bitmap);
+      return mutable;
+    }
+
+    LazyBitmap andNot(RoaringBitmap bitmap) {
+      LazyBitmap mutable = toMutable();
+      mutable._value.andNot(bitmap);
+      return mutable;
+    }
+
+    void or(LazyBitmap bitmap) {
+      assert isMutable();
+
+      _value.or(bitmap._value);
+    }
+
+    LazyBitmap or(RoaringBitmap bitmap) {
+      LazyBitmap mutable = toMutable();
+      mutable._value.or(bitmap);
+      return mutable;
+    }
+
+    boolean isEmpty() {
+      if (_value == null) {
+        return true;
+      } else {
+        return _value.isEmpty();
+      }
+    }
+
+    void forEach(IntConsumer ic) {
+      if (_value != null) {
+        _value.forEach(ic);
+      }
+    }
+
+    LazyBitmap flip(long rangeStart, long rangeEnd) {
+      LazyBitmap result = toMutable();
+      result._value.flip(rangeStart, rangeEnd);
+      return result;
+    }
+
+    RoaringBitmap getValue() {
+      if (_value == null) {
+        return new RoaringBitmap();
+      } else {
+        return _value;
+      }
+    }
+  }
+
   /**
    * Returns the matching flattened doc ids for the given filter.
    */
-  private RoaringBitmap getMatchingFlattenedDocIds(FilterContext filter) {
+  private LazyBitmap getMatchingFlattenedDocIds(FilterContext filter) {
     switch (filter.getType()) {
       case AND: {
-        List<FilterContext> children = filter.getChildren();
-        int numChildren = children.size();
-        RoaringBitmap matchingDocIds = 
getMatchingFlattenedDocIds(children.get(0));
-        for (int i = 1; i < numChildren; i++) {
-          matchingDocIds.and(getMatchingFlattenedDocIds(children.get(i)));
+        List<FilterContext> filters = filter.getChildren();
+        LazyBitmap matchingDocIds = getMatchingFlattenedDocIds(filters.get(0));
+        for (int i = 1, numFilters = filters.size(); i < numFilters; i++) {
+          if (matchingDocIds.isEmpty()) {
+            break;
+          }
+
+          LazyBitmap filterDocIds = getMatchingFlattenedDocIds(filters.get(i));
+          if (filterDocIds.isEmpty()) {
+            return filterDocIds;
+          } else {
+            matchingDocIds = and(matchingDocIds, filterDocIds);
+          }
         }
         return matchingDocIds;
       }
       case OR: {
-        List<FilterContext> children = filter.getChildren();
-        int numChildren = children.size();
-        RoaringBitmap matchingDocIds = 
getMatchingFlattenedDocIds(children.get(0));
-        for (int i = 1; i < numChildren; i++) {
-          matchingDocIds.or(getMatchingFlattenedDocIds(children.get(i)));
+        List<FilterContext> filters = filter.getChildren();
+        LazyBitmap matchingDocIds = getMatchingFlattenedDocIds(filters.get(0));
+
+        for (int i = 1, numFilters = filters.size(); i < numFilters; i++) {
+          LazyBitmap filterDocIds = getMatchingFlattenedDocIds(filters.get(i));
+          // avoid having to convert matchingDocIds to mutable map
+          if (filterDocIds.isEmpty()) {
+            continue;
+          }
+
+          matchingDocIds = or(matchingDocIds, filterDocIds);
         }
         return matchingDocIds;
       }
@@ -203,7 +317,7 @@ public class MutableJsonIndexImpl implements 
MutableJsonIndex {
    * <p>Exclusive predicate is handled as the inclusive predicate, and the 
caller should flip the unflattened doc ids in
    * order to get the correct exclusive predicate result.
    */
-  private RoaringBitmap getMatchingFlattenedDocIds(Predicate predicate) {
+  private LazyBitmap getMatchingFlattenedDocIds(Predicate predicate) {
     ExpressionContext lhs = predicate.getLhs();
     Preconditions.checkArgument(lhs.getType() == 
ExpressionContext.Type.IDENTIFIER,
         "Left-hand side of the predicate must be an identifier, got: %s (%s). 
Put double quotes around the identifier"
@@ -218,11 +332,11 @@ public class MutableJsonIndexImpl implements 
MutableJsonIndex {
     } else {
       key = JsonUtils.KEY_SEPARATOR + key;
     }
-    Pair<String, RoaringBitmap> pair = getKeyAndFlattenedDocIds(key);
+    Pair<String, LazyBitmap> pair = getKeyAndFlattenedDocIds(key);
     key = pair.getLeft();
-    RoaringBitmap matchingDocIds = pair.getRight();
+    LazyBitmap matchingDocIds = pair.getRight();
     if (matchingDocIds != null && matchingDocIds.isEmpty()) {
-      return new RoaringBitmap();
+      return LazyBitmap.EMPTY_BITMAP;
     }
 
     Predicate.Type predicateType = predicate.getType();
@@ -230,152 +344,129 @@ public class MutableJsonIndexImpl implements 
MutableJsonIndex {
       case EQ: {
         String value = ((EqPredicate) predicate).getValue();
         String keyValuePair = key + JsonIndexCreator.KEY_VALUE_SEPARATOR + 
value;
-        RoaringBitmap matchingDocIdsForKeyValuePair = 
_postingListMap.get(keyValuePair);
-        if (matchingDocIdsForKeyValuePair != null) {
-          if (matchingDocIds == null) {
-            return matchingDocIdsForKeyValuePair.clone();
-          } else {
-            matchingDocIds.and(matchingDocIdsForKeyValuePair);
-            return matchingDocIds;
-          }
-        } else {
-          return new RoaringBitmap();
-        }
+        RoaringBitmap result = _postingListMap.get(keyValuePair);
+        return filter(result, matchingDocIds);
       }
 
       case NOT_EQ: {
-        Map<String, RoaringBitmap> subMap = getMatchingKeysMap(key);
-        if (subMap.isEmpty()) {
-          return new RoaringBitmap();
-        }
         String notEqualValue = ((NotEqPredicate) predicate).getValue();
-        RoaringBitmap result = null;
+        LazyBitmap result = null;
 
-        for (Map.Entry<String, RoaringBitmap> entry : subMap.entrySet()) {
-          if (notEqualValue.equals(entry.getKey().substring(key.length() + 
1))) {
-            continue;
-          }
-          if (result == null) {
-            result = entry.getValue().clone();
-          } else {
-            result.or(entry.getValue());
-          }
-        }
+        RoaringBitmap allDocIds = _postingListMap.get(key);
+        if (allDocIds != null && !allDocIds.isEmpty()) {
+          result = new LazyBitmap(allDocIds);
 
-        if (result == null) {
-          return new RoaringBitmap();
-        } else {
-          if (matchingDocIds == null) {
-            return result;
-          } else {
-            matchingDocIds.and(result);
-            return matchingDocIds;
+          RoaringBitmap notEqualDocIds =
+              _postingListMap.get(key + JsonIndexCreator.KEY_VALUE_SEPARATOR + 
notEqualValue);
+
+          if (notEqualDocIds != null && !notEqualDocIds.isEmpty()) {
+            result = result.andNot(notEqualDocIds);
           }
         }
+
+        return filter(result, matchingDocIds);
       }
 
       case IN: {
         List<String> values = ((InPredicate) predicate).getValues();
-        RoaringBitmap matchingDocIdsForKeyValuePairs = new RoaringBitmap();
+        LazyBitmap result = null;
+
+        StringBuilder buffer = new StringBuilder(key);
+        buffer.append(JsonIndexCreator.KEY_VALUE_SEPARATOR);
+        int pos = buffer.length();
+
         for (String value : values) {
-          String keyValuePair = key + JsonIndexCreator.KEY_VALUE_SEPARATOR + 
value;
-          RoaringBitmap matchingDocIdsForKeyValuePair = 
_postingListMap.get(keyValuePair);
-          if (matchingDocIdsForKeyValuePair != null) {
-            matchingDocIdsForKeyValuePairs.or(matchingDocIdsForKeyValuePair);
+          buffer.setLength(pos);
+          buffer.append(value);
+          String keyValue = buffer.toString();
+
+          RoaringBitmap docIds = _postingListMap.get(keyValue);
+
+          if (docIds != null && !docIds.isEmpty()) {
+            if (result == null) {
+              result = new LazyBitmap(docIds);
+            } else {
+              result = result.or(docIds);
+            }
           }
         }
-        if (matchingDocIds == null) {
-          return matchingDocIdsForKeyValuePairs;
-        } else {
-          matchingDocIds.and(matchingDocIdsForKeyValuePairs);
-          return matchingDocIds;
-        }
+
+        return filter(result, matchingDocIds);
       }
 
       case NOT_IN: {
-        Map<String, RoaringBitmap> subMap = getMatchingKeysMap(key);
-        if (subMap.isEmpty()) {
-          return new RoaringBitmap();
-        }
         List<String> notInValues = ((NotInPredicate) predicate).getValues();
-        RoaringBitmap result = null;
+        LazyBitmap result = null;
 
-        for (Map.Entry<String, RoaringBitmap> entry : subMap.entrySet()) {
-          if (notInValues.contains(entry.getKey().substring(key.length() + 
1))) {
-            continue;
-          }
-          if (result == null) {
-            result = entry.getValue().clone();
-          } else {
-            result.or(entry.getValue());
-          }
-        }
+        RoaringBitmap allDocIds = _postingListMap.get(key);
+        if (allDocIds != null && !allDocIds.isEmpty()) {
+          result = new LazyBitmap(allDocIds);
 
-        if (result == null) {
-          return new RoaringBitmap();
-        } else {
-          if (matchingDocIds == null) {
-            return result;
-          } else {
-            matchingDocIds.and(result);
-            return matchingDocIds;
+          StringBuilder buffer = new StringBuilder(key);
+          buffer.append(JsonIndexCreator.KEY_VALUE_SEPARATOR);
+          int pos = buffer.length();
+
+          for (String notInValue : notInValues) {
+            buffer.setLength(pos);
+            buffer.append(notInValue);
+            String keyValuePair = buffer.toString();
+
+            RoaringBitmap docIds = _postingListMap.get(keyValuePair);
+            if (docIds != null && !docIds.isEmpty()) {
+              result = result.andNot(docIds);
+            }
           }
         }
+
+        return filter(result, matchingDocIds);
       }
 
       case IS_NOT_NULL:
       case IS_NULL: {
-        RoaringBitmap matchingDocIdsForKey = _postingListMap.get(key);
-        if (matchingDocIdsForKey != null) {
-          if (matchingDocIds == null) {
-            return matchingDocIdsForKey.clone();
-          } else {
-            matchingDocIds.and(matchingDocIdsForKey);
-            return matchingDocIds;
-          }
-        } else {
-          return new RoaringBitmap();
-        }
+        RoaringBitmap result = _postingListMap.get(key);
+        return filter(result, matchingDocIds);
       }
 
       case REGEXP_LIKE: {
         Map<String, RoaringBitmap> subMap = getMatchingKeysMap(key);
         if (subMap.isEmpty()) {
-          return new RoaringBitmap();
+          return LazyBitmap.EMPTY_BITMAP;
         }
+
         Pattern pattern = ((RegexpLikePredicate) predicate).getPattern();
-        RoaringBitmap result = null;
+        Matcher matcher = pattern.matcher("");
+        LazyBitmap result = null;
+        StringBuilder value = new StringBuilder();
 
         for (Map.Entry<String, RoaringBitmap> entry : subMap.entrySet()) {
-          if (!pattern.matcher(entry.getKey().substring(key.length() + 
1)).matches()) {
+          String keyValue = entry.getKey();
+          value.setLength(0);
+          value.append(keyValue, key.length() + 1, keyValue.length());
+
+          if (!matcher.reset(value).matches()) {
             continue;
           }
-          if (result == null) {
-            result = entry.getValue().clone();
-          } else {
-            result.or(entry.getValue());
-          }
-        }
 
-        if (result == null) {
-          return new RoaringBitmap();
-        } else {
-          if (matchingDocIds == null) {
-            return result;
-          } else {
-            matchingDocIds.and(result);
-            return matchingDocIds;
+          RoaringBitmap docIds = entry.getValue();
+          if (docIds != null && !docIds.isEmpty()) {
+            if (result == null) {
+              result = new LazyBitmap(docIds);
+            } else {
+              result = result.or(docIds);
+            }
           }
         }
+
+        return filter(result, matchingDocIds);
       }
 
       case RANGE: {
         Map<String, RoaringBitmap> subMap = getMatchingKeysMap(key);
         if (subMap.isEmpty()) {
-          return new RoaringBitmap();
+          return LazyBitmap.EMPTY_BITMAP;
         }
-        RoaringBitmap result = null;
 
+        LazyBitmap result = null;
         RangePredicate rangePredicate = (RangePredicate) predicate;
         FieldSpec.DataType rangeDataType = rangePredicate.getRangeDataType();
         // Simplify to only support numeric and string types
@@ -402,23 +493,14 @@ public class MutableJsonIndexImpl implements 
MutableJsonIndex {
                   : rangeDataType.compare(valueObj, upperBound) < 0);
           if (lowerCompareResult && upperCompareResult) {
             if (result == null) {
-              result = entry.getValue().clone();
+              result = new LazyBitmap(entry.getValue());
             } else {
-              result.or(entry.getValue());
+              result = result.or(entry.getValue());
             }
           }
         }
 
-        if (result == null) {
-          return new RoaringBitmap();
-        } else {
-          if (matchingDocIds == null) {
-            return result;
-          } else {
-            matchingDocIds.and(result);
-            return matchingDocIds;
-          }
-        }
+        return filter(result, matchingDocIds);
       }
 
       default:
@@ -441,21 +523,22 @@ public class MutableJsonIndexImpl implements 
MutableJsonIndex {
 
   @Override
   public Map<String, RoaringBitmap> getMatchingFlattenedDocsMap(String 
jsonPathKey, @Nullable String filterString) {
-    Map<String, RoaringBitmap> valueToMatchingFlattenedDocIdsMap = new 
HashMap<>();
+    Map<String, RoaringBitmap> resultMap = new HashMap<>();
     _readLock.lock();
     try {
-      RoaringBitmap filteredFlattenedDocIds = null;
+      LazyBitmap filteredDocIds = null;
       FilterContext filter;
       if (filterString != null) {
         filter = 
RequestContextUtils.getFilter(CalciteSqlParser.compileToExpression(filterString));
         Preconditions.checkArgument(!filter.isConstant(), "Invalid json match 
filter: " + filterString);
+
         if (filter.getType() == FilterContext.Type.PREDICATE && 
isExclusive(filter.getPredicate().getType())) {
           // Handle exclusive predicate separately because the flip can only 
be applied to the
-          // unflattened doc ids in order to get the correct result, and it 
cannot be nested
-          filteredFlattenedDocIds = 
getMatchingFlattenedDocIds(filter.getPredicate());
-          filteredFlattenedDocIds.flip(0, (long) _nextFlattenedDocId);
+          // un-flattened doc ids in order to get the correct result, and it 
cannot be nested
+          filteredDocIds = getMatchingFlattenedDocIds(filter.getPredicate());
+          filteredDocIds = filteredDocIds.flip(0, _nextFlattenedDocId);
         } else {
-          filteredFlattenedDocIds = getMatchingFlattenedDocIds(filter);
+          filteredDocIds = getMatchingFlattenedDocIds(filter);
         }
       }
       // Support 2 formats:
@@ -466,28 +549,40 @@ public class MutableJsonIndexImpl implements 
MutableJsonIndex {
       } else {
         jsonPathKey = JsonUtils.KEY_SEPARATOR + jsonPathKey;
       }
-      Pair<String, RoaringBitmap> result = 
getKeyAndFlattenedDocIds(jsonPathKey);
+      Pair<String, LazyBitmap> result = getKeyAndFlattenedDocIds(jsonPathKey);
       jsonPathKey = result.getLeft();
-      RoaringBitmap arrayIndexFlattenDocIds = result.getRight();
-      if (arrayIndexFlattenDocIds != null && 
arrayIndexFlattenDocIds.isEmpty()) {
-        return valueToMatchingFlattenedDocIdsMap;
+      LazyBitmap arrayIndexDocIds = result.getRight();
+      if (arrayIndexDocIds != null && arrayIndexDocIds.isEmpty()) {
+        return resultMap;
       }
+
+      RoaringBitmap filteredBitmap = filteredDocIds != null ? 
filteredDocIds.getValue() : null;
+      RoaringBitmap arrayIndexBitmap = arrayIndexDocIds != null ? 
arrayIndexDocIds.getValue() : null;
+
       Map<String, RoaringBitmap> subMap = getMatchingKeysMap(jsonPathKey);
       for (Map.Entry<String, RoaringBitmap> entry : subMap.entrySet()) {
-        RoaringBitmap flattenedDocIds = entry.getValue().clone();
-        if (filteredFlattenedDocIds != null) {
-          flattenedDocIds.and(filteredFlattenedDocIds);
+        // there is no point using lazy bitmap here because filteredDocIds and 
arrayIndexDocIds
+        // are shared and can't be modified
+        RoaringBitmap docIds = entry.getValue();
+        if (docIds == null || docIds.isEmpty()) {
+          continue;
         }
-        if (arrayIndexFlattenDocIds != null) {
-          flattenedDocIds.and(arrayIndexFlattenDocIds);
+        docIds = docIds.clone();
+        if (filteredDocIds != null) {
+          docIds.and(filteredBitmap);
         }
-        if (!flattenedDocIds.isEmpty()) {
-          
valueToMatchingFlattenedDocIdsMap.put(entry.getKey().substring(jsonPathKey.length()
 + 1), flattenedDocIds);
-          
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(valueToMatchingFlattenedDocIdsMap.size());
+        if (arrayIndexDocIds != null) {
+          docIds.and(arrayIndexBitmap);
+        }
+
+        if (!docIds.isEmpty()) {
+          String value = entry.getKey().substring(jsonPathKey.length() + 1);
+          resultMap.put(value, docIds);
+          
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(resultMap.size());
         }
       }
 
-      return valueToMatchingFlattenedDocIdsMap;
+      return resultMap;
     } finally {
       _readLock.unlock();
     }
@@ -499,7 +594,7 @@ public class MutableJsonIndexImpl implements 
MutableJsonIndex {
    *  Else, return the json path that is generated by replacing array index 
with . on the original key
    *  and the associated flattenDocId bitmap
    */
-  private Pair<String, RoaringBitmap> getKeyAndFlattenedDocIds(String key) {
+  private Pair<String, LazyBitmap> getKeyAndFlattenedDocIds(String key) {
     // Process the array index within the key if exists
     // E.g. "[*]"=1 -> "."='1'
     // E.g. "[0]"=1 -> ".$index"='0' && "."='1'
@@ -507,7 +602,7 @@ public class MutableJsonIndexImpl implements 
MutableJsonIndex {
     // E.g. ".foo[*].bar[*].foobar"='abc' -> ".foo..bar..foobar"='abc'
     // E.g. ".foo[0].bar[1].foobar"='abc' -> ".foo.$index"='0' && 
".foo..bar.$index"='1' && ".foo..bar..foobar"='abc'
     // E.g. ".foo[0][1].bar"='abc' -> ".foo.$index"='0' && ".foo..$index"='1' 
&& ".foo...bar"='abc'
-    RoaringBitmap matchingDocIds = null;
+    LazyBitmap matchingDocIds = null;
     int leftBracketIndex;
     while ((leftBracketIndex = key.indexOf('[')) >= 0) {
       int rightBracketIndex = key.indexOf(']', leftBracketIndex + 2);
@@ -522,14 +617,15 @@ public class MutableJsonIndexImpl implements 
MutableJsonIndex {
         // ".foo[1].bar"='abc' -> ".foo.$index"=1 && ".foo..bar"='abc'
         String searchKey = leftPart + JsonUtils.ARRAY_INDEX_KEY + 
JsonIndexCreator.KEY_VALUE_SEPARATOR + arrayIndex;
         RoaringBitmap docIds = _postingListMap.get(searchKey);
+
         if (docIds != null) {
           if (matchingDocIds == null) {
-            matchingDocIds = docIds.clone();
+            matchingDocIds = new LazyBitmap(docIds);
           } else {
-            matchingDocIds.and(docIds);
+            matchingDocIds = matchingDocIds.and(docIds);
           }
         } else {
-          return Pair.of(null, new RoaringBitmap());
+          return Pair.of(null, LazyBitmap.EMPTY_BITMAP);
         }
       }
 
@@ -627,4 +723,65 @@ public class MutableJsonIndexImpl implements 
MutableJsonIndex {
   @Override
   public void close() {
   }
+
+  // AND given bitmaps, optionally converting first one to mutable (if it's 
not already)
+  private static LazyBitmap and(LazyBitmap target, LazyBitmap other) {
+    if (target.isMutable()) {
+      target.and(other);
+      return target;
+    } else if (other.isMutable()) {
+      other.and(target);
+      return other;
+    } else {
+      LazyBitmap mutableTarget = target.toMutable();
+      mutableTarget.and(other);
+      return mutableTarget;
+    }
+  }
+
+  private static LazyBitmap and(LazyBitmap target, RoaringBitmap other) {
+    if (target.isMutable()) {
+      target.and(other);
+      return target;
+    } else {
+      LazyBitmap mutableTarget = target.toMutable();
+      mutableTarget.and(other);
+      return mutableTarget;
+    }
+  }
+
+  // OR given bitmaps, optionally converting first one to mutable (if it's not 
already)
+  private static LazyBitmap or(LazyBitmap target, LazyBitmap other) {
+    if (target.isMutable()) {
+      target.or(other);
+      return target;
+    } else if (other.isMutable()) {
+      other.or(target);
+      return other;
+    } else {
+      LazyBitmap mutableTarget = target.toMutable();
+      mutableTarget.or(other);
+      return mutableTarget;
+    }
+  }
+
+  private static LazyBitmap filter(LazyBitmap result, LazyBitmap 
matchingDocIds) {
+    if (result == null) {
+      return LazyBitmap.EMPTY_BITMAP;
+    } else if (matchingDocIds == null) {
+      return result;
+    } else {
+      return and(matchingDocIds, result);
+    }
+  }
+
+  private static LazyBitmap filter(RoaringBitmap result, LazyBitmap 
matchingDocIds) {
+    if (result == null) {
+      return LazyBitmap.EMPTY_BITMAP;
+    } else if (matchingDocIds == null) {
+      return new LazyBitmap(result);
+    } else {
+      return and(matchingDocIds, result);
+    }
+  }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/StringDictionary.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/StringDictionary.java
index 5d173749f8..28f510eed8 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/StringDictionary.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/StringDictionary.java
@@ -74,6 +74,17 @@ public class StringDictionary extends 
BaseImmutableDictionary {
     return getUnpaddedString(dictId, getBuffer());
   }
 
+  /** Same as getStringValue(int) but allows reusing buffer, instead of 
allocating on each call. */
+  public String getStringValue(int dictId, byte[] buffer) {
+    return getUnpaddedString(dictId, buffer);
+  }
+
+  /** Allocate buffer to use with getString(int, byte[]) method. */
+  @Override
+  public byte[] getBuffer() {
+    return super.getBuffer();
+  }
+
   @Override
   public byte[] getBytesValue(int dictId) {
     return getUnpaddedBytes(dictId, getBuffer());
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/json/ImmutableJsonIndexReader.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/json/ImmutableJsonIndexReader.java
index e94eee4170..c702c3dac7 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/json/ImmutableJsonIndexReader.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/json/ImmutableJsonIndexReader.java
@@ -20,6 +20,11 @@ package 
org.apache.pinot.segment.local.segment.index.readers.json;
 
 import com.google.common.base.Preconditions;
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.util.ArrayList;
 import java.util.Comparator;
@@ -39,6 +44,7 @@ import 
org.apache.pinot.common.request.context.predicate.NotInPredicate;
 import org.apache.pinot.common.request.context.predicate.Predicate;
 import org.apache.pinot.common.request.context.predicate.RangePredicate;
 import org.apache.pinot.common.request.context.predicate.RegexpLikePredicate;
+import org.apache.pinot.common.utils.regex.Matcher;
 import org.apache.pinot.common.utils.regex.Pattern;
 import 
org.apache.pinot.segment.local.segment.creator.impl.inv.json.BaseJsonIndexCreator;
 import 
org.apache.pinot.segment.local.segment.index.readers.BitmapInvertedIndexReader;
@@ -70,6 +76,25 @@ public class ImmutableJsonIndexReader implements 
JsonIndexReader {
   private final long _numFlattenedDocs;
   private final PinotDataBuffer _docIdMapping;
 
+  // empty bitmap used to limit creation of new empty mutable bitmaps
+  private static final ImmutableRoaringBitmap EMPTY_BITMAP;
+
+  static {
+    // this convoluted way of creating empty immutable bitmap is used here to 
avoid creating another
+    // subclass and potentially affecting roaring bitmap call performance
+    MutableRoaringBitmap temp = MutableRoaringBitmap.bitmapOf();
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+
+    try (DataOutputStream dos = new DataOutputStream(bos)) {
+      temp.serialize(dos);
+    } catch (IOException ignoreMe) {
+      // nothing to do
+    }
+
+    ByteBuffer bb = ByteBuffer.wrap(bos.toByteArray());
+    EMPTY_BITMAP = new ImmutableRoaringBitmap(bb);
+  }
+
   public ImmutableJsonIndexReader(PinotDataBuffer dataBuffer, int numDocs) {
     _numDocs = numDocs;
     _version = dataBuffer.getInt(0);
@@ -107,16 +132,16 @@ public class ImmutableJsonIndexReader implements 
JsonIndexReader {
     if (filter.getType() == FilterContext.Type.PREDICATE && 
isExclusive(filter.getPredicate().getType())) {
       // Handle exclusive predicate separately because the flip can only be 
applied to the unflattened doc ids in order
       // to get the correct result, and it cannot be nested
-      MutableRoaringBitmap matchingFlattenedDocIds = 
getMatchingFlattenedDocIds(filter.getPredicate());
-      MutableRoaringBitmap matchingDocIds = new MutableRoaringBitmap();
-      matchingFlattenedDocIds.forEach((IntConsumer) flattenedDocId -> 
matchingDocIds.add(getDocId(flattenedDocId)));
-      matchingDocIds.flip(0, _numDocs);
-      return matchingDocIds;
+      ImmutableRoaringBitmap flattenedDocIds = 
getMatchingFlattenedDocIds(filter.getPredicate());
+      MutableRoaringBitmap resultDocIds = new MutableRoaringBitmap();
+      flattenedDocIds.forEach((IntConsumer) flattenedDocId -> 
resultDocIds.add(getDocId(flattenedDocId)));
+      resultDocIds.flip(0, _numDocs);
+      return resultDocIds;
     } else {
-      MutableRoaringBitmap matchingFlattenedDocIds = 
getMatchingFlattenedDocIds(filter);
-      MutableRoaringBitmap matchingDocIds = new MutableRoaringBitmap();
-      matchingFlattenedDocIds.forEach((IntConsumer) flattenedDocId -> 
matchingDocIds.add(getDocId(flattenedDocId)));
-      return matchingDocIds;
+      ImmutableRoaringBitmap flattenedDocIds = 
getMatchingFlattenedDocIds(filter);
+      MutableRoaringBitmap resultDocIds = new MutableRoaringBitmap();
+      flattenedDocIds.forEach((IntConsumer) flattenedDocId -> 
resultDocIds.add(getDocId(flattenedDocId)));
+      return resultDocIds;
     }
   }
 
@@ -127,28 +152,97 @@ public class ImmutableJsonIndexReader implements 
JsonIndexReader {
     return predicateType == Predicate.Type.IS_NULL;
   }
 
+  // AND given bitmaps, optionally converting first one to mutable (if it's 
not already)
+  private static MutableRoaringBitmap and(ImmutableRoaringBitmap target, 
ImmutableRoaringBitmap other) {
+    if (target instanceof MutableRoaringBitmap) {
+      MutableRoaringBitmap mutableTarget = (MutableRoaringBitmap) target;
+      mutableTarget.and(other);
+      return mutableTarget;
+    } else if (other instanceof MutableRoaringBitmap) {
+      MutableRoaringBitmap mutableOther = (MutableRoaringBitmap) other;
+      mutableOther.and(target);
+      return mutableOther;
+    } else { // base implementation
+      MutableRoaringBitmap mutableTarget = toMutable(target);
+      mutableTarget.and(other);
+      return mutableTarget;
+    }
+  }
+
+  private static ImmutableRoaringBitmap filter(ImmutableRoaringBitmap result,
+      ImmutableRoaringBitmap matchingDocIds) {
+    if (result == null) {
+      return EMPTY_BITMAP;
+    } else if (matchingDocIds == null) {
+      return result;
+    } else {
+      return and(matchingDocIds, result);
+    }
+  }
+
+  // OR given bitmaps, optionally converting first one to mutable (if it's not 
already)
+  private static MutableRoaringBitmap or(ImmutableRoaringBitmap target, 
ImmutableRoaringBitmap other) {
+    if (target instanceof MutableRoaringBitmap) {
+      MutableRoaringBitmap mutableTarget = (MutableRoaringBitmap) target;
+      mutableTarget.or(other);
+      return mutableTarget;
+    } else if (other instanceof MutableRoaringBitmap) {
+      MutableRoaringBitmap mutableOther = (MutableRoaringBitmap) other;
+      mutableOther.or(target);
+      return mutableOther;
+    } else { // base implementation
+      MutableRoaringBitmap mutableTarget = toMutable(target);
+      mutableTarget.or(other);
+      return mutableTarget;
+    }
+  }
+
+  // If given bitmap is not mutable, convert it to such
+  // used to delay immutable -> mutable conversion as much as possible
+  private static MutableRoaringBitmap toMutable(ImmutableRoaringBitmap bitmap) 
{
+    if (bitmap instanceof MutableRoaringBitmap) {
+      return (MutableRoaringBitmap) bitmap;
+    } else {
+      return bitmap.toMutableRoaringBitmap();
+    }
+  }
+
   /**
    * Returns the matching flattened doc ids for the given filter.
    */
-  private MutableRoaringBitmap getMatchingFlattenedDocIds(FilterContext 
filter) {
+  private ImmutableRoaringBitmap getMatchingFlattenedDocIds(FilterContext 
filter) {
     switch (filter.getType()) {
       case AND: {
-        List<FilterContext> children = filter.getChildren();
-        int numChildren = children.size();
-        MutableRoaringBitmap matchingDocIds =
-            getMatchingFlattenedDocIds(children.get(0));
-        for (int i = 1; i < numChildren; i++) {
-          matchingDocIds.and(getMatchingFlattenedDocIds(children.get(i)));
+        List<FilterContext> filters = filter.getChildren();
+        ImmutableRoaringBitmap matchingDocIds = 
getMatchingFlattenedDocIds(filters.get(0));
+
+        for (int i = 1, numFilters = filters.size(); i < numFilters; i++) {
+          // if current set is empty then there is no point AND-ing it with 
another one
+          if (matchingDocIds.isEmpty()) {
+            break;
+          }
+
+          ImmutableRoaringBitmap filterDocIds = 
getMatchingFlattenedDocIds(filters.get(i));
+          if (filterDocIds.isEmpty()) {
+            // potentially avoid converting matchingDocIds to mutable map
+            return filterDocIds;
+          } else {
+            matchingDocIds = and(matchingDocIds, filterDocIds);
+          }
         }
         return matchingDocIds;
       }
       case OR: {
-        List<FilterContext> children = filter.getChildren();
-        int numChildren = children.size();
-        MutableRoaringBitmap matchingDocIds =
-            getMatchingFlattenedDocIds(children.get(0));
-        for (int i = 1; i < numChildren; i++) {
-          matchingDocIds.or(getMatchingFlattenedDocIds(children.get(i)));
+        List<FilterContext> filters = filter.getChildren();
+        ImmutableRoaringBitmap matchingDocIds = 
getMatchingFlattenedDocIds(filters.get(0));
+
+        for (int i = 1, numFilters = filters.size(); i < numFilters; i++) {
+          ImmutableRoaringBitmap filterDocIds = 
getMatchingFlattenedDocIds(filters.get(i));
+          // avoid having to convert matchingDocIds to mutable map
+          if (filterDocIds.isEmpty()) {
+            continue;
+          }
+          matchingDocIds = or(matchingDocIds, filterDocIds);
         }
         return matchingDocIds;
       }
@@ -167,8 +261,9 @@ public class ImmutableJsonIndexReader implements 
JsonIndexReader {
    * Returns the matching flattened doc ids for the given predicate.
    * <p>Exclusive predicate is handled as the inclusive predicate, and the 
caller should flip the unflattened doc ids in
    * order to get the correct exclusive predicate result.
+   * Note: returned bitmap could actually be mutable
    */
-  private MutableRoaringBitmap getMatchingFlattenedDocIds(Predicate predicate) 
{
+  private ImmutableRoaringBitmap getMatchingFlattenedDocIds(Predicate 
predicate) {
     ExpressionContext lhs = predicate.getLhs();
     Preconditions.checkArgument(lhs.getType() == 
ExpressionContext.Type.IDENTIFIER,
         "Left-hand side of the predicate must be an identifier, got: %s (%s). 
Put double quotes around the identifier"
@@ -189,9 +284,9 @@ public class ImmutableJsonIndexReader implements 
JsonIndexReader {
         key = key.substring(2);
       }
     }
-    Pair<String, MutableRoaringBitmap> pair = getKeyAndFlattenedDocIds(key);
+    Pair<String, ImmutableRoaringBitmap> pair = getKeyAndFlattenedDocIds(key);
     key = pair.getLeft();
-    MutableRoaringBitmap matchingDocIds = pair.getRight();
+    ImmutableRoaringBitmap matchingDocIds = pair.getRight();
     if (matchingDocIds != null && matchingDocIds.isEmpty()) {
       return matchingDocIds;
     }
@@ -202,134 +297,160 @@ public class ImmutableJsonIndexReader implements 
JsonIndexReader {
         String value = ((EqPredicate) predicate).getValue();
         String keyValuePair = key + JsonIndexCreator.KEY_VALUE_SEPARATOR + 
value;
         int dictId = _dictionary.indexOf(keyValuePair);
+        ImmutableRoaringBitmap result = null;
         if (dictId >= 0) {
-          ImmutableRoaringBitmap matchingDocIdsForKeyValuePair = 
_invertedIndex.getDocIds(dictId);
-          if (matchingDocIds == null) {
-            matchingDocIds = 
matchingDocIdsForKeyValuePair.toMutableRoaringBitmap();
-          } else {
-            matchingDocIds.and(matchingDocIdsForKeyValuePair);
-          }
-          return matchingDocIds;
-        } else {
-          return new MutableRoaringBitmap();
+          result = _invertedIndex.getDocIds(dictId);
         }
+        return filter(result, matchingDocIds);
       }
 
       case NOT_EQ: {
+        // each array is un-nested and so flattened json document contains 
only one value
+        // that means for each key-value pair the set of flattened document 
ids is disjoint
         String notEqualValue = ((NotEqPredicate) predicate).getValue();
-        int[] dictIds = getDictIdRangeForKey(key);
-        MutableRoaringBitmap result = null;
-
-        for (int dictId = dictIds[0]; dictId < dictIds[1]; dictId++) {
-          String value = 
_dictionary.getStringValue(dictId).substring(key.length() + 1);
-          if (!notEqualValue.equals(value)) {
-            if (result == null) {
-              result = 
_invertedIndex.getDocIds(dictId).toMutableRoaringBitmap();
-            } else {
-              result.or(_invertedIndex.getDocIds(dictId));
+        ImmutableRoaringBitmap result = null;
+
+        // read bitmap with all values for this key instead of OR-ing many 
per-value bitmaps
+        int allValuesDictId = _dictionary.indexOf(key);
+        if (allValuesDictId >= 0) {
+          ImmutableRoaringBitmap allValuesDocIds = 
_invertedIndex.getDocIds(allValuesDictId);
+
+          if (!allValuesDocIds.isEmpty()) {
+            int notEqDictId = _dictionary.indexOf(key + 
JsonIndexCreator.KEY_VALUE_SEPARATOR + notEqualValue);
+            if (notEqDictId >= 0) {
+              ImmutableRoaringBitmap notEqDocIds = 
_invertedIndex.getDocIds(notEqDictId);
+              if (notEqDocIds.isEmpty()) {
+                //  there's no value to remove, use found bitmap (is this 
possible ?)
+                result = allValuesDocIds;
+              } else {
+                // remove doc ids for unwanted value
+                MutableRoaringBitmap mutableBitmap = 
allValuesDocIds.toMutableRoaringBitmap();
+                mutableBitmap.andNot(notEqDocIds);
+                result = mutableBitmap;
+              }
+            } else { // there's no value to remove, use found bitmap
+              result = allValuesDocIds;
             }
           }
         }
 
-        if (result == null) {
-          return new MutableRoaringBitmap();
-        } else {
-          if (matchingDocIds == null) {
-            return result;
-          } else {
-            matchingDocIds.and(result);
-            return matchingDocIds;
-          }
-        }
+        return filter(result, matchingDocIds);
       }
 
       case IN: {
         List<String> values = ((InPredicate) predicate).getValues();
-        MutableRoaringBitmap matchingDocIdsForKeyValuePairs = new 
MutableRoaringBitmap();
+        ImmutableRoaringBitmap result = null;
         for (String value : values) {
           String keyValuePair = key + JsonIndexCreator.KEY_VALUE_SEPARATOR + 
value;
           int dictId = _dictionary.indexOf(keyValuePair);
           if (dictId >= 0) {
-            
matchingDocIdsForKeyValuePairs.or(_invertedIndex.getDocIds(dictId));
+            ImmutableRoaringBitmap docIds = _invertedIndex.getDocIds(dictId);
+            if (result == null) {
+              result = docIds;
+            } else {
+              result = or(result, docIds);
+            }
           }
         }
-        if (matchingDocIds == null) {
-          matchingDocIds = matchingDocIdsForKeyValuePairs;
-        } else {
-          matchingDocIds.and(matchingDocIdsForKeyValuePairs);
-        }
-        return matchingDocIds;
+
+        return filter(result, matchingDocIds);
       }
 
       case NOT_IN: {
         List<String> notInValues = ((NotInPredicate) predicate).getValues();
         int[] dictIds = getDictIdRangeForKey(key);
-        MutableRoaringBitmap result = null;
+        ImmutableRoaringBitmap result = null;
+
+        int valueCount = dictIds[1] - dictIds[0];
+
+        if (notInValues.size() < valueCount / 2) {
+          // if there is less notIn values than In values
+          // read bitmap for all values and then remove values from bitmaps 
associated with notIn values
+
+          int allValuesDictId = _dictionary.indexOf(key);
+          if (allValuesDictId >= 0) {
+            ImmutableRoaringBitmap allValuesDocIds = 
_invertedIndex.getDocIds(allValuesDictId);
+
+            if (!allValuesDocIds.isEmpty()) {
+              result = allValuesDocIds;
+
+              for (String notInValue : notInValues) {
+                int notInDictId = _dictionary.indexOf(key + 
JsonIndexCreator.KEY_VALUE_SEPARATOR + notInValue);
+                if (notInDictId >= 0) {
+                  ImmutableRoaringBitmap notEqDocIds = 
_invertedIndex.getDocIds(notInDictId);
+                  // remove doc ids for unwanted value
+                  MutableRoaringBitmap mutableBitmap = toMutable(result);
+                  mutableBitmap.andNot(notEqDocIds);
+                  result = mutableBitmap;
+                }
+              }
+            }
+          }
+        } else {
+          // if there is more In values than notIn then OR bitmaps for all 
values except notIn values
+          // resolve dict ids for string values to avoid comparing strings
+          IntOpenHashSet notInDictIds = null;
+          if (dictIds[0] < dictIds[1]) {
+            notInDictIds = new IntOpenHashSet();
+            for (String notInValue : notInValues) {
+              int dictId = _dictionary.indexOf(key + 
JsonIndexCreator.KEY_VALUE_SEPARATOR + notInValue);
+              if (dictId >= 0) {
+                notInDictIds.add(dictId);
+              }
+            }
+          }
+
+          for (int dictId = dictIds[0]; dictId < dictIds[1]; dictId++) {
+            if (notInDictIds.contains(dictId)) {
+              continue;
+            }
 
-        for (int dictId = dictIds[0]; dictId < dictIds[1]; dictId++) {
-          String value = 
_dictionary.getStringValue(dictId).substring(key.length() + 1);
-          if (!notInValues.contains(value)) {
             if (result == null) {
-              result = 
_invertedIndex.getDocIds(dictId).toMutableRoaringBitmap();
+              result = _invertedIndex.getDocIds(dictId);
             } else {
-              result.or(_invertedIndex.getDocIds(dictId));
+              result = or(result, _invertedIndex.getDocIds(dictId));
             }
           }
         }
 
-        if (result == null) {
-          return new MutableRoaringBitmap();
-        } else {
-          if (matchingDocIds == null) {
-            return result;
-          } else {
-            matchingDocIds.and(result);
-            return matchingDocIds;
-          }
-        }
+        return filter(result, matchingDocIds);
       }
 
       case IS_NOT_NULL:
       case IS_NULL: {
+        ImmutableRoaringBitmap result = null;
         int dictId = _dictionary.indexOf(key);
         if (dictId >= 0) {
-          ImmutableRoaringBitmap matchingDocIdsForKey = 
_invertedIndex.getDocIds(dictId);
-          if (matchingDocIds == null) {
-            matchingDocIds = matchingDocIdsForKey.toMutableRoaringBitmap();
-          } else {
-            matchingDocIds.and(matchingDocIdsForKey);
-          }
-          return matchingDocIds;
-        } else {
-          return new MutableRoaringBitmap();
+          result = _invertedIndex.getDocIds(dictId);
         }
+
+        return filter(result, matchingDocIds);
       }
 
       case REGEXP_LIKE: {
         Pattern pattern = ((RegexpLikePredicate) predicate).getPattern();
+        Matcher matcher = pattern.matcher("");
         int[] dictIds = getDictIdRangeForKey(key);
 
-        MutableRoaringBitmap result = null;
+        ImmutableRoaringBitmap result = null;
+        byte[] dictBuffer = dictIds[0] < dictIds[1] ? _dictionary.getBuffer() 
: null;
+        StringBuilder value = new StringBuilder();
+
         for (int dictId = dictIds[0]; dictId < dictIds[1]; dictId++) {
-          String value = 
_dictionary.getStringValue(dictId).substring(key.length() + 1);
-          if (pattern.matcher(value).matches()) {
+          String stringValue = _dictionary.getStringValue(dictId, dictBuffer);
+          value.setLength(0);
+          value.append(stringValue, key.length() + 1, stringValue.length());
+
+          if (matcher.reset(value).matches()) {
             if (result == null) {
-              result = 
_invertedIndex.getDocIds(dictId).toMutableRoaringBitmap();
+              result = _invertedIndex.getDocIds(dictId);
             } else {
-              result.or(_invertedIndex.getDocIds(dictId));
+              result = or(result, _invertedIndex.getDocIds(dictId));
             }
           }
         }
-        if (result == null) {
-          return new MutableRoaringBitmap();
-        } else {
-          if (matchingDocIds == null) {
-            return result;
-          } else {
-            matchingDocIds.and(result);
-            return matchingDocIds;
-          }
-        }
+
+        return filter(result, matchingDocIds);
       }
 
       case RANGE: {
@@ -350,9 +471,11 @@ public class ImmutableJsonIndexReader implements 
JsonIndexReader {
         Object upperBound = upperUnbounded ? null : 
rangeDataType.convert(rangePredicate.getUpperBound());
 
         int[] dictIds = getDictIdRangeForKey(key);
-        MutableRoaringBitmap result = null;
+        ImmutableRoaringBitmap result = null;
+        byte[] dictBuffer = dictIds[0] < dictIds[1] ? _dictionary.getBuffer() 
: null;
+
         for (int dictId = dictIds[0]; dictId < dictIds[1]; dictId++) {
-          String value = 
_dictionary.getStringValue(dictId).substring(key.length() + 1);
+          String value = _dictionary.getStringValue(dictId, 
dictBuffer).substring(key.length() + 1);
           Object valueObj = rangeDataType.convert(value);
           boolean lowerCompareResult =
               lowerUnbounded || (lowerInclusive ? 
rangeDataType.compare(valueObj, lowerBound) >= 0
@@ -363,23 +486,14 @@ public class ImmutableJsonIndexReader implements 
JsonIndexReader {
 
           if (lowerCompareResult && upperCompareResult) {
             if (result == null) {
-              result = 
_invertedIndex.getDocIds(dictId).toMutableRoaringBitmap();
+              result = _invertedIndex.getDocIds(dictId);
             } else {
-              result.or(_invertedIndex.getDocIds(dictId));
+              result = or(result, _invertedIndex.getDocIds(dictId));
             }
           }
         }
 
-        if (result == null) {
-          return new MutableRoaringBitmap();
-        } else {
-          if (matchingDocIds == null) {
-            return result;
-          } else {
-            matchingDocIds.and(result);
-            return matchingDocIds;
-          }
-        }
+        return filter(result, matchingDocIds);
       }
 
       default:
@@ -435,7 +549,7 @@ public class ImmutableJsonIndexReader implements 
JsonIndexReader {
       }
     }
     Map<String, RoaringBitmap> result = new HashMap<>();
-    Pair<String, MutableRoaringBitmap> pathKey = 
getKeyAndFlattenedDocIds(jsonPathKey);
+    Pair<String, ImmutableRoaringBitmap> pathKey = 
getKeyAndFlattenedDocIds(jsonPathKey);
     if (pathKey.getRight() != null && pathKey.getRight().isEmpty()) {
       return result;
     }
@@ -446,8 +560,10 @@ public class ImmutableJsonIndexReader implements 
JsonIndexReader {
       arrayIndexFlattenDocIds = pathKey.getRight().toRoaringBitmap();
     }
     int[] dictIds = getDictIdRangeForKey(jsonPathKey);
+    byte[] dictBuffer = dictIds[0] < dictIds[1] ? _dictionary.getBuffer() : 
null;
+
     for (int dictId = dictIds[0]; dictId < dictIds[1]; dictId++) {
-      String key = _dictionary.getStringValue(dictId);
+      String key = _dictionary.getStringValue(dictId, dictBuffer);
       RoaringBitmap docIds = 
_invertedIndex.getDocIds(dictId).toRoaringBitmap();
       if (filteredFlattenedDocIds != null) {
         docIds.and(filteredFlattenedDocIds);
@@ -566,8 +682,9 @@ public class ImmutableJsonIndexReader implements 
JsonIndexReader {
    *  Else, return the json path that is generated by replacing array index 
with . on the original key
    *  and the associated flattenDocId bitmap
    */
-  private Pair<String, MutableRoaringBitmap> getKeyAndFlattenedDocIds(String 
key) {
-    MutableRoaringBitmap matchingDocIds = null;
+  private Pair<String, ImmutableRoaringBitmap> getKeyAndFlattenedDocIds(String 
key) {
+    ImmutableRoaringBitmap matchingDocIds = null;
+
     if (_version == BaseJsonIndexCreator.VERSION_2) {
       // Process the array index within the key if exists
       // E.g. "[*]"=1 -> "."='1'
@@ -594,12 +711,12 @@ public class ImmutableJsonIndexReader implements 
JsonIndexReader {
           if (dictId >= 0) {
             ImmutableRoaringBitmap docIds = _invertedIndex.getDocIds(dictId);
             if (matchingDocIds == null) {
-              matchingDocIds = docIds.toMutableRoaringBitmap();
+              matchingDocIds = docIds;
             } else {
-              matchingDocIds.and(docIds);
+              matchingDocIds = and(matchingDocIds, docIds);
             }
           } else {
-            return Pair.of(null, new MutableRoaringBitmap());
+            return Pair.of(null, EMPTY_BITMAP);
           }
         }
 
@@ -626,12 +743,12 @@ public class ImmutableJsonIndexReader implements 
JsonIndexReader {
           if (dictId >= 0) {
             ImmutableRoaringBitmap docIds = _invertedIndex.getDocIds(dictId);
             if (matchingDocIds == null) {
-              matchingDocIds = docIds.toMutableRoaringBitmap();
+              matchingDocIds = docIds;
             } else {
-              matchingDocIds.and(docIds);
+              matchingDocIds = and(matchingDocIds, docIds);
             }
           } else {
-            return Pair.of(null, new MutableRoaringBitmap());
+            return Pair.of(null, EMPTY_BITMAP);
           }
         }
 
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/JsonIndexTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/JsonIndexTest.java
index 1412214381..196c617336 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/JsonIndexTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/JsonIndexTest.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.collect.Lists;
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -48,9 +49,7 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertNull;
+import static org.testng.Assert.*;
 
 
 /**
@@ -60,6 +59,26 @@ public class JsonIndexTest implements 
PinotBuffersAfterMethodCheckRule {
   private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), 
"JsonIndexTest");
   private static final String ON_HEAP_COLUMN_NAME = "onHeap";
   private static final String OFF_HEAP_COLUMN_NAME = "offHeap";
+  public static final String TEST_RECORD = "{"
+      + "\"name\": \"adam\","
+      + "\"age\": 20,"
+      + "\"addresses\": ["
+      + "  {"
+      + "    \"country\": \"us\","
+      + "    \"street\": \"main st\","
+      + "    \"number\": 1"
+      + "  },"
+      + "  {"
+      + "    \"country\": \"ca\","
+      + "    \"street\": \"second st\","
+      + "    \"number\": 2"
+      + "  }"
+      + "],"
+      + "\"skills\": ["
+      + "  \"english\","
+      + "  \"programming\""
+      + "]"
+      + "}";
 
   @BeforeMethod
   public void setUp()
@@ -79,19 +98,47 @@ public class JsonIndexTest implements 
PinotBuffersAfterMethodCheckRule {
     // @formatter: off
     // CHECKSTYLE:OFF
     String[] records = new String[]{
-        "{" + "\"name\":\"adam\"," + "\"age\":20," + "\"score\":1.25," + 
"\"addresses\":["
-            + "   {\"street\":\"street-00\",\"country\":\"us\"}," + "   
{\"street\":\"street-01\",\"country\":\"us\"},"
-            + "   {\"street\":\"street-02\",\"country\":\"ca\"}]," + 
"\"skills\":[\"english\",\"programming\"]" + "}",
-        "{" + "\"name\":\"bob\"," + "\"age\":25," + "\"score\":1.94," + 
"\"addresses\":["
-            + "   {\"street\":\"street-10\",\"country\":\"ca\"}," + "   
{\"street\":\"street-11\",\"country\":\"us\"},"
-            + "   {\"street\":\"street-12\",\"country\":\"in\"}]," + 
"\"skills\":[]" + "}",
-        "{" + "\"name\":\"charles\"," + "\"age\":30," + "\"score\":0.90,"  + 
"\"addresses\":["
-            + "   {\"street\":\"street-20\",\"country\":\"jp\"}," + "   
{\"street\":\"street-21\",\"country\":\"kr\"},"
-            + "   {\"street\":\"street-22\",\"country\":\"cn\"}]," + 
"\"skills\":[\"japanese\",\"korean\",\"chinese\"]"
-            + "}", "{" + "\"name\":\"david\"," + "\"age\":35," + 
"\"score\":0.9999,"  + "\"addresses\":["
-        + "   
{\"street\":\"street-30\",\"country\":\"ca\",\"types\":[\"home\",\"office\"]},"
-        + "   {\"street\":\"street-31\",\"country\":\"ca\"}," + "   
{\"street\":\"street-32\",\"country\":\"ca\"}],"
-        + "\"skills\":null" + "}"
+        "{"
+            + "\"name\":\"adam\","
+            + "\"age\":20,"
+            + "\"score\":1.25,"
+            + "\"addresses\":["
+            + "   {\"street\":\"street-00\",\"country\":\"us\"},"
+            + "   {\"street\":\"street-01\",\"country\":\"us\"},"
+            + "   {\"street\":\"street-02\",\"country\":\"ca\"}],"
+            + "\"skills\":[\"english\",\"programming\"]"
+            + "}",
+        "{"
+            + "\"name\":\"bob\","
+            + "\"age\":25,"
+            + "\"score\":1.94,"
+            + "\"addresses\":["
+            + "   {\"street\":\"street-10\",\"country\":\"ca\"},"
+            + "   {\"street\":\"street-11\",\"country\":\"us\"},"
+            + "   {\"street\":\"street-12\",\"country\":\"in\"}],"
+            + "\"skills\":[]"
+            + "}",
+        "{"
+            + "\"name\":\"charles\","
+            + "\"age\":30,"
+            + "\"score\":0.90,"
+            + "\"addresses\":["
+            + "   {\"street\":\"street-20\",\"country\":\"jp\"},"
+            + "   {\"street\":\"street-21\",\"country\":\"kr\"},"
+            + "   {\"street\":\"street-22\",\"country\":\"cn\"}],"
+            + "\"skills\":[\"japanese\",\"korean\",\"chinese\"]"
+            + "}",
+        "{"
+            + "\"name\":\"david\","
+            + "\"age\":35,"
+            + "\"score\":0.9999,"
+            + "\"addresses\":["
+            + "   
{\"street\":\"street-30\",\"country\":\"ca\",\"types\":[\"home\",\"office\"]},"
+            + "   {\"street\":\"street-31\",\"country\":\"ca\"},"
+            + "   {\"street\":\"street-32\",\"country\":\"ca\"}"
+            + "],"
+            + "\"skills\":null"
+            + "}"
     };
     //CHECKSTYLE:ON
     // @formatter: on
@@ -114,134 +161,107 @@ public class JsonIndexTest implements 
PinotBuffersAfterMethodCheckRule {
         mutableJsonIndex.add(record);
       }
       JsonIndexReader[] indexReaders = new 
JsonIndexReader[]{onHeapIndexReader, offHeapIndexReader, mutableJsonIndex};
-      for (JsonIndexReader indexReader : indexReaders) {
-        MutableRoaringBitmap matchingDocIds = getMatchingDocIds(indexReader, 
"name='bob'");
-        Assert.assertEquals(matchingDocIds.toArray(), new int[]{1});
+      for (JsonIndexReader reader : indexReaders) {
 
-        matchingDocIds = getMatchingDocIds(indexReader, 
"\"addresses[*].street\" = 'street-21'");
-        Assert.assertEquals(matchingDocIds.toArray(), new int[]{2});
+        assertMatchedDocIds(reader, "name='bob'", new int[]{1});
 
-        matchingDocIds = getMatchingDocIds(indexReader, 
"REGEXP_LIKE(\"addresses[*].street\", 'street-2.*')");
-        Assert.assertEquals(matchingDocIds.toArray(), new int[]{2});
+        assertMatchedDocIds(reader, "\"addresses[*].street\" = 'street-21'", 
new int[]{2});
 
-        matchingDocIds = getMatchingDocIds(indexReader, "\"age\" > 25");
-        Assert.assertEquals(matchingDocIds.toArray(), new int[]{2, 3});
+        assertMatchedDocIds(reader, "REGEXP_LIKE(\"addresses[*].street\", 
'street-2.*')", new int[]{2});
 
-        matchingDocIds = getMatchingDocIds(indexReader, "\"age\" >= 25");
-        Assert.assertEquals(matchingDocIds.toArray(), new int[]{1, 2, 3});
+        assertMatchedDocIds(reader, "\"age\" > 25", new int[]{2, 3});
 
-        matchingDocIds = getMatchingDocIds(indexReader, "\"age\" < 25");
-        Assert.assertEquals(matchingDocIds.toArray(), new int[]{0});
+        assertMatchedDocIds(reader, "\"age\" >= 25", new int[]{1, 2, 3});
 
-        matchingDocIds = getMatchingDocIds(indexReader, "\"age\" <= 25");
-        Assert.assertEquals(matchingDocIds.toArray(), new int[]{0, 1});
+        assertMatchedDocIds(reader, "\"age\" < 25", new int[]{0});
 
-        matchingDocIds = getMatchingDocIds(indexReader, "\"name\" > 'adam'");
-        Assert.assertEquals(matchingDocIds.toArray(), new int[]{1, 2, 3});
+        assertMatchedDocIds(reader, "\"age\" <= 25", new int[]{0, 1});
 
-        matchingDocIds = getMatchingDocIds(indexReader, "\"name\" > 'a'");
-        Assert.assertEquals(matchingDocIds.toArray(), new int[]{0, 1, 2, 3});
+        assertMatchedDocIds(reader, "\"name\" > 'adam'", new int[]{1, 2, 3});
 
-        matchingDocIds = getMatchingDocIds(indexReader, "\"score\" > 1");
-        Assert.assertEquals(matchingDocIds.toArray(), new int[]{0, 1});
+        assertMatchedDocIds(reader, "\"name\" > 'a'", new int[]{0, 1, 2, 3});
 
-        matchingDocIds = getMatchingDocIds(indexReader, "\"score\" > 1.0");
-        Assert.assertEquals(matchingDocIds.toArray(), new int[]{0, 1});
+        assertMatchedDocIds(reader, "\"score\" > 1", new int[]{0, 1});
 
-        matchingDocIds = getMatchingDocIds(indexReader, "\"score\" > 0.99");
-        Assert.assertEquals(matchingDocIds.toArray(), new int[]{0, 1, 3});
+        assertMatchedDocIds(reader, "\"score\" > 1.0", new int[]{0, 1});
 
-        matchingDocIds = getMatchingDocIds(indexReader, 
"REGEXP_LIKE(\"score\", '[0-1]\\.[6-9].*')");
-        Assert.assertEquals(matchingDocIds.toArray(), new int[]{1, 2, 3});
+        assertMatchedDocIds(reader, "\"score\" > 0.99", new int[]{0, 1, 3});
 
-        matchingDocIds = getMatchingDocIds(indexReader, 
"\"addresses[*].street\" NOT IN ('street-10', 'street-22')");
-        Assert.assertEquals(matchingDocIds.toArray(), new int[]{0, 1, 2, 3});
+        assertMatchedDocIds(reader, "REGEXP_LIKE(\"score\", 
'[0-1]\\.[6-9].*')", new int[]{1, 2, 3});
 
-        matchingDocIds = getMatchingDocIds(indexReader, 
"\"addresses[*].country\" != 'ca'");
-        Assert.assertEquals(matchingDocIds.toArray(), new int[]{0, 1, 2});
+        assertMatchedDocIds(reader, "\"addresses[*].street\" NOT IN 
('street-10', 'street-22')",
+            new int[]{0, 1, 2, 3});
 
-        matchingDocIds = getMatchingDocIds(indexReader, "\"skills[*]\" NOT IN 
('english', 'japanese')");
-        Assert.assertEquals(matchingDocIds.toArray(), new int[]{0, 2});
+        assertMatchedDocIds(reader, "\"addresses[*].country\" != 'ca'", new 
int[]{0, 1, 2});
 
-        matchingDocIds = getMatchingDocIds(indexReader, 
"\"addresses[0].country\" IN ('ca', 'us')");
-        Assert.assertEquals(matchingDocIds.toArray(), new int[]{0, 1, 3});
+        assertMatchedDocIds(reader, "\"skills[*]\" NOT IN ('english', 
'japanese')", new int[]{0, 2});
 
-        matchingDocIds = getMatchingDocIds(indexReader, 
"\"addresses[0].country\" NOT IN ('ca', 'us')");
-        Assert.assertEquals(matchingDocIds.toArray(), new int[]{2});
+        assertMatchedDocIds(reader, "\"addresses[0].country\" IN ('ca', 
'us')", new int[]{0, 1, 3});
 
-        matchingDocIds = getMatchingDocIds(indexReader, 
"\"addresses[*].types[1]\" = 'office'");
-        Assert.assertEquals(matchingDocIds.toArray(), new int[]{3});
+        assertMatchedDocIds(reader, "\"addresses[0].country\" NOT IN ('ca', 
'us')", new int[]{2});
 
-        matchingDocIds = getMatchingDocIds(indexReader, 
"\"addresses[0].types[0]\" = 'home'");
-        Assert.assertEquals(matchingDocIds.toArray(), new int[]{3});
+        assertMatchedDocIds(reader, "\"addresses[*].types[1]\" = 'office'", 
new int[]{3});
 
-        matchingDocIds = getMatchingDocIds(indexReader, 
"\"addresses[1].types[*]\" = 'home'");
-        Assert.assertEquals(matchingDocIds.toArray(), new int[0]);
+        assertMatchedDocIds(reader, "\"addresses[0].types[0]\" = 'home'", new 
int[]{3});
 
-        matchingDocIds = getMatchingDocIds(indexReader, 
"\"addresses[*].types[*]\" IS NULL");
-        Assert.assertEquals(matchingDocIds.toArray(), new int[]{0, 1, 2});
+        assertMatchedDocIds(reader, "\"addresses[1].types[*]\" = 'home'", new 
int[0]);
 
-        matchingDocIds = getMatchingDocIds(indexReader, 
"\"addresses[*].types[*]\" IS NOT NULL");
-        Assert.assertEquals(matchingDocIds.toArray(), new int[]{3});
+        assertMatchedDocIds(reader, "\"addresses[*].types[*]\" IS NULL", new 
int[]{0, 1, 2});
 
-        matchingDocIds = getMatchingDocIds(indexReader, 
"\"addresses[1].types[*]\" IS NOT NULL");
-        Assert.assertEquals(matchingDocIds.toArray(), new int[0]);
+        assertMatchedDocIds(reader, "\"addresses[*].types[*]\" IS NOT NULL", 
new int[]{3});
 
-        matchingDocIds = getMatchingDocIds(indexReader, "abc IS NULL");
-        Assert.assertEquals(matchingDocIds.toArray(), new int[]{0, 1, 2, 3});
+        assertMatchedDocIds(reader, "\"addresses[1].types[*]\" IS NOT NULL", 
new int[0]);
 
-        matchingDocIds = getMatchingDocIds(indexReader, "\"skills[*]\" IS NOT 
NULL");
-        Assert.assertEquals(matchingDocIds.toArray(), new int[]{0, 2});
+        assertMatchedDocIds(reader, "abc IS NULL", new int[]{0, 1, 2, 3});
 
-        matchingDocIds =
-            getMatchingDocIds(indexReader, "\"addresses[*].country\" = 'ca' 
AND \"skills[*]\" IS NOT NULL");
-        Assert.assertEquals(matchingDocIds.toArray(), new int[]{0});
+        assertMatchedDocIds(reader, "\"skills[*]\" IS NOT NULL", new int[]{0, 
2});
+
+        assertMatchedDocIds(reader, "\"addresses[*].country\" = 'ca' AND 
\"skills[*]\" IS NOT NULL", new int[]{0});
 
-        matchingDocIds = getMatchingDocIds(indexReader, 
"\"addresses[*].country\" = 'us' OR \"skills[*]\" IS NOT NULL");
-        Assert.assertEquals(matchingDocIds.toArray(), new int[]{0, 1, 2});
+        assertMatchedDocIds(reader, "\"addresses[*].country\" = 'us' OR 
\"skills[*]\" IS NOT NULL",
+            new int[]{0, 1, 2});
 
         // Nested exclusive predicates
 
-        matchingDocIds = getMatchingDocIds(indexReader,
-            "\"addresses[0].street\" = 'street-00' AND 
\"addresses[0].country\" != 'ca'");
-        Assert.assertEquals(matchingDocIds.toArray(), new int[]{0});
+        assertMatchedDocIds(reader, "\"addresses[0].street\" = 'street-00' AND 
\"addresses[0].country\" != 'ca'",
+            new int[]{0});
 
-        matchingDocIds = getMatchingDocIds(indexReader,
-            "\"age\" = '20' AND \"addresses[*].country\" NOT IN ('us')");
-        Assert.assertEquals(matchingDocIds.toArray(), new int[]{0});
+        assertMatchedDocIds(reader, "\"age\" = '20' AND 
\"addresses[*].country\" NOT IN ('us')", new int[]{0});
 
-        matchingDocIds = getMatchingDocIds(indexReader,
-            "\"age\" = '20' AND \"addresses[*].country\" NOT IN ('us', 'ca')");
-        Assert.assertEquals(matchingDocIds.toArray(), new int[0]);
+        assertMatchedDocIds(reader, "\"age\" = '20' AND 
\"addresses[*].country\" NOT IN ('us', 'ca')", new int[0]);
 
-        matchingDocIds = getMatchingDocIds(indexReader,
-            "\"addresses[*].street\" = 'street-21' AND 
\"addresses[*].country\" != 'kr'");
-        Assert.assertEquals(matchingDocIds.toArray(), new int[0]);
+        assertMatchedDocIds(reader, "\"addresses[*].street\" = 'street-21' AND 
\"addresses[*].country\" != 'kr'",
+            new int[0]);
 
-        matchingDocIds = getMatchingDocIds(indexReader,
-            "\"addresses[*].street\" = 'street-21' AND 
\"addresses[*].country\" != 'us'");
-        Assert.assertEquals(matchingDocIds.toArray(), new int[]{2});
+        assertMatchedDocIds(reader, "\"addresses[*].street\" = 'street-21' AND 
\"addresses[*].country\" != 'us'",
+            new int[]{2});
 
-        matchingDocIds = getMatchingDocIds(indexReader,
-            "\"addresses[*].street\" = 'street-30' AND 
\"addresses[*].country\" NOT IN ('us', 'kr')");
-        Assert.assertEquals(matchingDocIds.toArray(), new int[]{3});
+        assertMatchedDocIds(reader,
+            "\"addresses[*].street\" = 'street-30' AND 
\"addresses[*].country\" NOT IN ('us', 'kr')", new int[]{3});
 
-        matchingDocIds = getMatchingDocIds(indexReader,
-            "REGEXP_LIKE(\"addresses[*].street\", 'street-0.*') AND 
\"addresses[*].country\" NOT IN ('us', 'ca')");
-        Assert.assertEquals(matchingDocIds.toArray(), new int[0]);
+        assertMatchedDocIds(reader,
+            "REGEXP_LIKE(\"addresses[*].street\", 'street-0.*') AND 
\"addresses[*].country\" NOT IN ('us', 'ca')",
+            new int[0]);
 
-        matchingDocIds = getMatchingDocIds(indexReader,
-            "REGEXP_LIKE(\"addresses[*].street\", 'street-3.*') AND 
\"addresses[*].country\" != 'us'");
-        Assert.assertEquals(matchingDocIds.toArray(), new int[]{3});
+        assertMatchedDocIds(reader,
+            "REGEXP_LIKE(\"addresses[*].street\", 'street-3.*') AND 
\"addresses[*].country\" != 'us'", new int[]{3});
 
         // A single matching flattened doc ID will result in the overall doc 
being matched
-        matchingDocIds = getMatchingDocIds(indexReader,
-            "\"addresses[*].street\" = 'street-21' AND \"skills[*]\" != 
'japanese'");
-        Assert.assertEquals(matchingDocIds.toArray(), new int[]{2});
+        assertMatchedDocIds(reader, "\"addresses[*].street\" = 'street-21' AND 
\"skills[*]\" != 'japanese'",
+            new int[]{2});
       }
     }
   }
 
+  private void assertMatchedDocIds(JsonIndexReader indexReader, String filter, 
int[] expected) {
+    MutableRoaringBitmap matchingDocIds = getMatchingDocIds(indexReader, 
filter);
+    try {
+      Assert.assertEquals(matchingDocIds.toArray(), expected);
+    } catch (AssertionError ae) {
+      throw new AssertionError(" index: " + 
indexReader.getClass().getSimpleName() + " " + ae.getMessage(), ae);
+    }
+  }
+
   @Test
   public void testLargeIndex()
       throws Exception {
@@ -270,31 +290,31 @@ public class JsonIndexTest implements 
PinotBuffersAfterMethodCheckRule {
       for (String record : records) {
         mutableJsonIndex.add(record);
       }
+
       JsonIndexReader[] indexReaders = new 
JsonIndexReader[]{onHeapIndexReader, offHeapIndexReader, mutableJsonIndex};
-      for (JsonIndexReader indexReader : indexReaders) {
-        MutableRoaringBitmap matchingDocIds = getMatchingDocIds(indexReader, 
"name = 'adam-123'");
-        Assert.assertEquals(matchingDocIds.toArray(), new int[]{123});
 
-        matchingDocIds = getMatchingDocIds(indexReader, 
"\"addresses[*].street\" = 'us-456'");
-        Assert.assertEquals(matchingDocIds.toArray(), new int[]{456});
+      for (JsonIndexReader reader : indexReaders) {
+        assertMatchedDocIds(reader, "name = 'adam-123'", new int[]{123});
 
-        matchingDocIds = getMatchingDocIds(indexReader, 
"\"addresses[1].street\" = 'us-456'");
-        Assert.assertEquals(matchingDocIds.toArray(), new int[0]);
+        assertMatchedDocIds(reader, "\"addresses[*].street\" = 'us-456'", new 
int[]{456});
 
-        matchingDocIds =
-            getMatchingDocIds(indexReader, "\"addresses[*].street\" = 'us-456' 
AND \"addresses[*].country\" = 'ca'");
-        Assert.assertEquals(matchingDocIds.toArray(), new int[0]);
+        assertMatchedDocIds(reader, "\"addresses[1].street\" = 'us-456'", new 
int[0]);
 
-        matchingDocIds = getMatchingDocIds(indexReader,
-            "name = 'adam-100000' AND \"addresses[*].street\" = 'us-100000' 
AND \"addresses[*].country\" = 'us'");
-        Assert.assertEquals(matchingDocIds.toArray(), new int[]{100000});
+        assertMatchedDocIds(reader, "\"addresses[*].street\" = 'us-456' AND 
\"addresses[*].country\" = 'ca'",
+            new int[0]);
 
-        matchingDocIds =
-            getMatchingDocIds(indexReader, "name = 'adam-100000' AND 
\"addresses[*].street\" = 'us-100001'");
-        Assert.assertEquals(matchingDocIds.toArray(), new int[0]);
+        assertMatchedDocIds(reader,
+            "name = 'adam-100000' AND \"addresses[*].street\" = 'us-100000' 
AND \"addresses[*].country\" = 'us'",
+            new int[]{100000});
+
+        assertMatchedDocIds(reader, "name = 'adam-100000' AND 
\"addresses[*].street\" = 'us-100001'", new int[0]);
 
-        matchingDocIds = getMatchingDocIds(indexReader, "name != 
'adam-100000'");
-        Assert.assertEquals(matchingDocIds.getCardinality(), 123_455);
+        MutableRoaringBitmap matchingDocIds = getMatchingDocIds(reader, "name 
!= 'adam-100000'");
+        try {
+          Assert.assertEquals(matchingDocIds.getCardinality(), 123_455);
+        } catch (AssertionError ae) {
+          throw new AssertionError(" index: " + 
reader.getClass().getSimpleName() + " " + ae.getMessage(), ae);
+        }
       }
     }
   }
@@ -385,14 +405,20 @@ public class JsonIndexTest implements 
PinotBuffersAfterMethodCheckRule {
     // @formatter: off
     // CHECKSTYLE:OFF
     String[] records = new String[]{
-        "{\"arrField\": " + "[{\"intKey01\": 1, \"stringKey01\": \"abc\"},"
-            + " {\"intKey01\": 1, \"stringKey01\": \"foo\"}, " + " 
{\"intKey01\": 3, \"stringKey01\": \"bar\"},"
+        "{\"arrField\": "
+            + "[{\"intKey01\": 1, \"stringKey01\": \"abc\"},"
+            + " {\"intKey01\": 1, \"stringKey01\": \"foo\"}, "
+            + " {\"intKey01\": 3, \"stringKey01\": \"bar\"},"
             + " {\"intKey01\": 5, \"stringKey01\": \"fuzz\"}]}",
-        "{\"arrField\": " + "[{\"intKey01\": 7, \"stringKey01\": \"pqrS\"},"
-            + " {\"intKey01\": 6, \"stringKey01\": \"foo\"}, " + " 
{\"intKey01\": 8, \"stringKey01\": \"test\"},"
+        "{\"arrField\": "
+            + "[{\"intKey01\": 7, \"stringKey01\": \"pqrS\"},"
+            + " {\"intKey01\": 6, \"stringKey01\": \"foo\"}, "
+            + " {\"intKey01\": 8, \"stringKey01\": \"test\"},"
             + " {\"intKey01\": 9, \"stringKey01\": \"testf2\"}]}",
-        "{\"arrField\": " + "[{\"intKey01\": 1, \"stringKey01\": \"pqr\"},"
-            + " {\"intKey01\": 1, \"stringKey01\": \"foo\"}, " + " 
{\"intKey01\": 6, \"stringKey01\": \"test\"},"
+        "{\"arrField\": "
+            + "[{\"intKey01\": 1, \"stringKey01\": \"pqr\"},"
+            + " {\"intKey01\": 1, \"stringKey01\": \"foo\"}, "
+            + " {\"intKey01\": 6, \"stringKey01\": \"test\"},"
             + " {\"intKey01\": 3, \"stringKey01\": \"testf2\"}]}",
     };
     // CHECKSTYLE:ON
@@ -570,6 +596,199 @@ public class JsonIndexTest implements 
PinotBuffersAfterMethodCheckRule {
     }
   }
 
+  @Test
+  public void testWhenDisableCrossArrayUnnestIsOffThenJsonArraysAreSeparated()
+      throws IOException {
+    JsonIndexConfig jsonIndexConfig = new JsonIndexConfig();
+    jsonIndexConfig.setDisableCrossArrayUnnest(true);
+
+    List<Map<String, String>> result = JsonUtils.flatten(TEST_RECORD, 
jsonIndexConfig);
+
+    Assert.assertEquals(result.toString(),
+        "["
+            + "{.addresses.$index=0, .addresses..country=us, 
.addresses..number=1, .addresses..street=main st, "
+            + ".age=20, .name=adam}, "
+            + "{.addresses.$index=1, .addresses..country=ca, 
.addresses..number=2, .addresses..street=second st, "
+            + ".age=20, .name=adam}, "
+            + "{.age=20, .name=adam, .skills.=english, .skills.$index=0}, "
+            + "{.age=20, .name=adam, .skills.=programming, 
.skills.$index=1}]");
+  }
+
+  @Test
+  public void testWhenDisableCrossArrayUnnestIsOnThenJsonArraysAreCombined()
+      throws IOException {
+    JsonIndexConfig jsonIndexConfig = new JsonIndexConfig();
+    jsonIndexConfig.setDisableCrossArrayUnnest(false);
+
+    List<Map<String, String>> result = JsonUtils.flatten(TEST_RECORD, 
jsonIndexConfig);
+
+    Assert.assertEquals(result.toString(),
+        "["
+            + "{.addresses.$index=0, .addresses..country=us, 
.addresses..number=1, .addresses..street=main st, "
+            + ".age=20, .name=adam, "
+            + ".skills.=english, .skills.$index=0}, "
+            + "{.addresses.$index=0, .addresses..country=us, 
.addresses..number=1, .addresses..street=main st, "
+            + ".age=20, .name=adam, "
+            + ".skills.=programming, .skills.$index=1}, "
+            + "{.addresses.$index=1, .addresses..country=ca, 
.addresses..number=2, .addresses..street=second st, "
+            + ".age=20, .name=adam, "
+            + ".skills.=english, .skills.$index=0}, "
+            + "{.addresses.$index=1, .addresses..country=ca, 
.addresses..number=2, .addresses..street=second st, "
+            + ".age=20, .name=adam, "
+            + ".skills.=programming, .skills.$index=1}]");
+  }
+
+  @Test
+  public void 
testWhenDisableCrossArrayUnnestIsOnThenQueriesOnMultipleArraysReturnEmptyResult()
+      throws IOException {
+    RoaringBitmap expectedBitmap = RoaringBitmap.bitmapOf();
+    boolean disableCrossArrayUnnest = true;
+
+    assertWhenCrossArrayUnnestIs(disableCrossArrayUnnest, expectedBitmap);
+  }
+
+  @Test
+  public void 
testWhenDisableCrossArrayUnnestIsOffThenQueriesOnMultipleArraysReturnGoodResult()
+      throws IOException {
+    RoaringBitmap expectedBitmap = RoaringBitmap.bitmapOf(0);
+    boolean disableCrossArrayUnnest = false;
+
+    assertWhenCrossArrayUnnestIs(disableCrossArrayUnnest, expectedBitmap);
+  }
+
+  private void assertWhenCrossArrayUnnestIs(boolean disableCrossArrayUnnest, 
RoaringBitmap expectedBitmap)
+      throws IOException {
+    JsonIndexConfig jsonIndexConfig = new JsonIndexConfig();
+    jsonIndexConfig.setDisableCrossArrayUnnest(disableCrossArrayUnnest);
+
+    String[] records = {TEST_RECORD};
+
+    createIndex(true, jsonIndexConfig, records);
+    File onHeapIndexFile = new File(INDEX_DIR, ON_HEAP_COLUMN_NAME + 
V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION);
+    assertTrue(onHeapIndexFile.exists());
+
+    createIndex(false, jsonIndexConfig, records);
+    File offHeapIndexFile = new File(INDEX_DIR, OFF_HEAP_COLUMN_NAME + 
V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION);
+    assertTrue(offHeapIndexFile.exists());
+
+    try (PinotDataBuffer onHeapBuffer = 
PinotDataBuffer.mapReadOnlyBigEndianFile(onHeapIndexFile);
+        PinotDataBuffer offHeapBuffer = 
PinotDataBuffer.mapReadOnlyBigEndianFile(offHeapIndexFile);
+        JsonIndexReader onHeapIndex = new 
ImmutableJsonIndexReader(onHeapBuffer, records.length);
+        JsonIndexReader offHeapIndex = new 
ImmutableJsonIndexReader(offHeapBuffer, records.length);
+        MutableJsonIndexImpl mutableIndex = new 
MutableJsonIndexImpl(jsonIndexConfig)) {
+      for (String record : records) {
+        mutableIndex.add(record);
+      }
+
+      String filter = "\"$.addresses[*].country\" = 'us' and \"$.skills[*]\" = 
'english'";
+
+      assertEquals(onHeapIndex.getMatchingDocIds(filter), expectedBitmap);
+      assertEquals(offHeapIndex.getMatchingDocIds(filter), expectedBitmap);
+      assertEquals(mutableIndex.getMatchingDocIds(filter), expectedBitmap);
+    }
+  }
+
+  @Test
+  public void 
testWhenDisableCrossArrayUnnestIsOnThenJsonFlatteningBreaksWhen100kCombinationLimitIsExceeded()
+      throws IOException {
+    // flattening record with arrays whose combinations reach 100k returns 
exception
+    StringBuilder record = generateRecordWith100kArrayElementCombinations();
+
+    try {
+      JsonIndexConfig jsonIndexConfig = new JsonIndexConfig();
+      jsonIndexConfig.setDisableCrossArrayUnnest(false);
+      createIndex(true, jsonIndexConfig, new String[]{record.toString()});
+      Assert.fail("expected exception");
+    } catch (IllegalArgumentException e) {
+      assertEquals(e.getCause().getMessage(), "Got too many combinations");
+    }
+  }
+
+  private static StringBuilder 
generateRecordWith100kArrayElementCombinations() {
+    StringBuilder record = new StringBuilder();
+    record.append('{');
+
+    //address
+    record.append("\n \"addresses\": [");
+    for (int i = 0; i < 100; i++) {
+      if (i > 0) {
+        record.append(',');
+      }
+      record.append("{ ")
+          .append(" \"street\": \"").append("st").append(i).append("\"")
+          .append(" }");
+    }
+    record.append("],");
+
+    //skill
+    record.append("\n \"skills\": [");
+    for (int i = 0; i < 100; i++) {
+      if (i > 0) {
+        record.append(',');
+      }
+      record.append("\"skill").append(i).append("\"");
+    }
+    record.append("],");
+
+    //hobby
+    record.append("\n \"hobbies\": [");
+    for (int i = 0; i < 10; i++) {
+      if (i > 0) {
+        record.append(',');
+      }
+      record.append("\"hobby").append(i).append("\"");
+    }
+    record.append(']');
+    record.append("\n}");
+    return record;
+  }
+
+  @Test
+  public void 
testSettingMaxValueLengthCausesLongValuesToBeReplacedWithSKIPPED()
+      throws IOException {
+    JsonIndexConfig jsonIndexConfig = new JsonIndexConfig();
+    jsonIndexConfig.setMaxValueLength(10);
+    // value is longer than max length
+    String[] records = {"{\"key1\":\"value_is_longer_than_10_characters\"}"};
+
+    createIndex(true, jsonIndexConfig, records);
+    File onHeapIndexFile = new File(INDEX_DIR, ON_HEAP_COLUMN_NAME + 
V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION);
+    assertTrue(onHeapIndexFile.exists());
+
+    createIndex(false, jsonIndexConfig, records);
+    File offHeapIndexFile = new File(INDEX_DIR, OFF_HEAP_COLUMN_NAME + 
V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION);
+    assertTrue(offHeapIndexFile.exists());
+
+    try (PinotDataBuffer onHeapBuffer = 
PinotDataBuffer.mapReadOnlyBigEndianFile(onHeapIndexFile);
+        PinotDataBuffer offHeapBuffer = 
PinotDataBuffer.mapReadOnlyBigEndianFile(offHeapIndexFile);
+        JsonIndexReader onHeapIndex = new 
ImmutableJsonIndexReader(onHeapBuffer, records.length);
+        JsonIndexReader offHeapIndex = new 
ImmutableJsonIndexReader(offHeapBuffer, records.length);
+        MutableJsonIndexImpl mutableIndex = new 
MutableJsonIndexImpl(jsonIndexConfig)) {
+      for (String record : records) {
+        mutableIndex.add(record);
+      }
+
+      Map<String, RoaringBitmap> expectedMap = 
Collections.singletonMap(JsonUtils.SKIPPED_VALUE_REPLACEMENT,
+          RoaringBitmap.bitmapOf(0));
+
+      assertEquals(expectedMap, getMatchingDocsMap(onHeapIndex, "$.key1"));
+      assertEquals(expectedMap, getMatchingDocsMap(offHeapIndex, "$.key1"));
+      assertEquals(expectedMap, getMatchingDocsMap(mutableIndex, "$.key1"));
+
+      // skipped values can be found for the key
+      String filter = "\"$.key1\"='" + JsonUtils.SKIPPED_VALUE_REPLACEMENT + 
"'";
+
+      RoaringBitmap expectedBitmap = RoaringBitmap.bitmapOf(0);
+      assertEquals(expectedBitmap, onHeapIndex.getMatchingDocIds(filter));
+      assertEquals(expectedBitmap, offHeapIndex.getMatchingDocIds(filter));
+      assertEquals(expectedBitmap, mutableIndex.getMatchingDocIds(filter));
+    }
+  }
+
+  private static Map<String, RoaringBitmap> getMatchingDocsMap(JsonIndexReader 
onHeapIndex, String key) {
+    return onHeapIndex.getMatchingFlattenedDocsMap(key, null);
+  }
+
   @Test
   public void testSkipInvalidJsonEnable() throws Exception {
     JsonIndexConfig jsonIndexConfig = new JsonIndexConfig();
@@ -593,8 +812,8 @@ public class JsonIndexTest implements 
PinotBuffersAfterMethodCheckRule {
       for (String record : records) {
         mutableJsonIndex.add(record);
       }
-      Map<String, RoaringBitmap> onHeapRes = 
onHeapIndexReader.getMatchingFlattenedDocsMap("$", null);
-      Map<String, RoaringBitmap> offHeapRes = 
offHeapIndexReader.getMatchingFlattenedDocsMap("$", null);
+      Map<String, RoaringBitmap> onHeapRes = 
getMatchingDocsMap(onHeapIndexReader, "$");
+      Map<String, RoaringBitmap> offHeapRes = 
getMatchingDocsMap(offHeapIndexReader, "$");
       Map<String, RoaringBitmap> mutableRes = 
mutableJsonIndex.getMatchingFlattenedDocsMap("$", null);
       Map<String, RoaringBitmap> expectedRes = 
Collections.singletonMap(JsonUtils.SKIPPED_VALUE_REPLACEMENT,
           RoaringBitmap.bitmapOf(0));
@@ -614,13 +833,15 @@ public class JsonIndexTest implements 
PinotBuffersAfterMethodCheckRule {
     createIndex(true, jsonIndexConfig, records);
   }
 
-
   @Test
   public void testGetMatchingValDocIdsPairForArrayPath() throws Exception {
-    String[] records = {
-            
"{\"foo\":[{\"bar\":[\"x\",\"y\"]},{\"bar\":[\"a\",\"b\"]}],\"foo2\":[\"u\"]}",
-            "{\"foo\":[{\"bar\":[\"y\",\"z\"]}],\"foo2\":[\"u\"]}"
-    };
+    String[] records = Arrays.asList(
+            "{'foo':[ {'bar':['x','y'] }, {'bar':['a','b']} ],'foo2':['u']}",
+            "{'foo':[ {'bar':['y','z']}], 'foo2':['u']}"
+        ).stream()
+        .map(r -> r.replace("'", "\""))
+        .collect(Collectors.toList())
+        .toArray(new String[2]);
     JsonIndexConfig jsonIndexConfig = new JsonIndexConfig();
 
     createIndex(true, jsonIndexConfig, records);
@@ -631,8 +852,15 @@ public class JsonIndexTest implements 
PinotBuffersAfterMethodCheckRule {
     File offHeapIndexFile = new File(INDEX_DIR, OFF_HEAP_COLUMN_NAME + 
V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION);
     Assert.assertTrue(offHeapIndexFile.exists());
 
-    String[] keys = {"$.foo[0].bar[1]", "$.foo[1].bar[0]", "$.foo2[0]", 
"$.foo[100].bar[100]", "$.foo[0].bar[*]",
-            "$.foo[*].bar[0]", "$.foo[*].bar[*]"};
+    String[] keys = {
+        "$.foo[0].bar[1]",
+        "$.foo[1].bar[0]",
+        "$.foo2[0]",
+        "$.foo[100].bar[100]",
+        "$.foo[0].bar[*]",
+        "$.foo[*].bar[0]",
+        "$.foo[*].bar[*]"
+    };
     List<Map<String, RoaringBitmap>> expected = List.of(
             Map.of("y", RoaringBitmap.bitmapOf(0), "z", 
RoaringBitmap.bitmapOf(1)),
             Map.of("a", RoaringBitmap.bitmapOf(0)),
@@ -667,9 +895,10 @@ public class JsonIndexTest implements 
PinotBuffersAfterMethodCheckRule {
         offHeapIndexReader.convertFlattenedDocIdsToDocIds(offHeapRes);
         Map<String, RoaringBitmap> mutableRes = 
mutableJsonIndex.getMatchingFlattenedDocsMap(keys[i], null);
         mutableJsonIndex.convertFlattenedDocIdsToDocIds(mutableRes);
-        Assert.assertEquals(expected.get(i), onHeapRes);
-        Assert.assertEquals(expected.get(i), offHeapRes);
-        Assert.assertEquals(mutableRes, expected.get(i));
+
+        Assert.assertEquals(expected.get(i), (Object) onHeapRes, keys[i]);
+        Assert.assertEquals(expected.get(i), (Object) offHeapRes, keys[i]);
+        Assert.assertEquals(expected.get(i), (Object) mutableRes, keys[i]);
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to