This is an automated email from the ASF dual-hosted git repository.

saurabhd336 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ddc3d0b1d1 Json extract index mv (#12532)
ddc3d0b1d1 is described below

commit ddc3d0b1d137ec420e5b68d376fdd9265be7729c
Author: Saurabh Dubey <[email protected]>
AuthorDate: Thu Mar 21 15:33:05 2024 +0530

    Json extract index mv (#12532)
    
    * Json extract index mv
    ---------
    
    Co-authored-by: Saurabh Dubey <[email protected]>
    Co-authored-by: Saurabh Dubey 
<[email protected]>
---
 .../JsonExtractIndexTransformFunction.java         | 109 +++++++++++----
 .../function/BaseTransformFunctionTest.java        |  25 +++-
 .../JsonExtractIndexTransformFunctionTest.java     | 124 +++++++++++++++--
 .../realtime/impl/json/MutableJsonIndexImpl.java   | 155 +++++++++++++++------
 .../readers/json/ImmutableJsonIndexReader.java     | 125 ++++++++++++-----
 .../segment/local/segment/index/JsonIndexTest.java | 143 ++++++++++++++++---
 .../segment/spi/index/reader/JsonIndexReader.java  |  35 +++--
 7 files changed, 568 insertions(+), 148 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/JsonExtractIndexTransformFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/JsonExtractIndexTransformFunction.java
index fac2ab5fdb..160ed36b0f 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/JsonExtractIndexTransformFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/JsonExtractIndexTransformFunction.java
@@ -44,7 +44,7 @@ public class JsonExtractIndexTransformFunction extends 
BaseTransformFunction {
   private TransformResultMetadata _resultMetadata;
   private JsonIndexReader _jsonIndexReader;
   private Object _defaultValue;
-  private Map<String, RoaringBitmap> _matchingDocsMap;
+  private Map<String, RoaringBitmap> _valueToMatchingDocsMap;
 
   @Override
   public String getName() {
@@ -90,15 +90,12 @@ public class JsonExtractIndexTransformFunction extends 
BaseTransformFunction {
     }
     String resultsType = ((LiteralTransformFunction) 
thirdArgument).getStringLiteral().toUpperCase();
     boolean isSingleValue = !resultsType.endsWith("_ARRAY");
-    // TODO: will support array type; the underlying 
jsonIndexReader.getMatchingDocsMap supports the json path [*]
-    if (!isSingleValue) {
-      throw new IllegalArgumentException("jsonExtractIndex only supports 
single value type");
-    }
     if (isSingleValue && inputJsonPath.contains("[*]")) {
-      throw new IllegalArgumentException("[*] syntax in json path is 
unsupported as json_extract_index"
-          + "currently does not support returning array types");
+      throw new IllegalArgumentException(
+          "[*] syntax in json path is unsupported for singleValue field 
json_extract_index");
     }
-    DataType dataType = DataType.valueOf(resultsType);
+    DataType dataType = isSingleValue ? DataType.valueOf(resultsType)
+        : DataType.valueOf(resultsType.substring(0, resultsType.length() - 6));
 
     if (arguments.size() == 4) {
       TransformFunction fourthArgument = arguments.get(3);
@@ -108,8 +105,12 @@ public class JsonExtractIndexTransformFunction extends 
BaseTransformFunction {
       _defaultValue = dataType.convert(((LiteralTransformFunction) 
fourthArgument).getStringLiteral());
     }
 
-    _resultMetadata = new TransformResultMetadata(dataType, true, false);
-    _matchingDocsMap = _jsonIndexReader.getMatchingDocsMap(_jsonPathString);
+    _resultMetadata = new TransformResultMetadata(dataType, isSingleValue, 
false);
+    _valueToMatchingDocsMap = 
_jsonIndexReader.getMatchingFlattenedDocsMap(_jsonPathString);
+    if (isSingleValue) {
+      // For single value result type, it's more efficient to use original 
docIDs map
+      _jsonIndexReader.convertFlattenedDocIdsToDocIds(_valueToMatchingDocsMap);
+    }
   }
 
   @Override
@@ -122,8 +123,8 @@ public class JsonExtractIndexTransformFunction extends 
BaseTransformFunction {
     int numDocs = valueBlock.getNumDocs();
     int[] inputDocIds = valueBlock.getDocIds();
     initIntValuesSV(numDocs);
-    String[] valuesFromIndex =
-        _jsonIndexReader.getValuesForKeyAndDocs(valueBlock.getDocIds(), 
_matchingDocsMap);
+    String[] valuesFromIndex = 
_jsonIndexReader.getValuesSV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
+        _valueToMatchingDocsMap, false);
     for (int i = 0; i < numDocs; i++) {
       String value = valuesFromIndex[inputDocIds[i]];
       if (value == null) {
@@ -144,8 +145,8 @@ public class JsonExtractIndexTransformFunction extends 
BaseTransformFunction {
     int numDocs = valueBlock.getNumDocs();
     int[] inputDocIds = valueBlock.getDocIds();
     initLongValuesSV(numDocs);
-    String[] valuesFromIndex =
-        _jsonIndexReader.getValuesForKeyAndDocs(valueBlock.getDocIds(), 
_matchingDocsMap);
+    String[] valuesFromIndex = 
_jsonIndexReader.getValuesSV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
+        _valueToMatchingDocsMap, false);
     for (int i = 0; i < numDocs; i++) {
       String value = valuesFromIndex[i];
       if (value == null) {
@@ -166,8 +167,8 @@ public class JsonExtractIndexTransformFunction extends 
BaseTransformFunction {
     int numDocs = valueBlock.getNumDocs();
     int[] inputDocIds = valueBlock.getDocIds();
     initFloatValuesSV(numDocs);
-    String[] valuesFromIndex =
-        _jsonIndexReader.getValuesForKeyAndDocs(valueBlock.getDocIds(), 
_matchingDocsMap);
+    String[] valuesFromIndex = 
_jsonIndexReader.getValuesSV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
+        _valueToMatchingDocsMap, false);
     for (int i = 0; i < numDocs; i++) {
       String value = valuesFromIndex[i];
       if (value == null) {
@@ -188,8 +189,8 @@ public class JsonExtractIndexTransformFunction extends 
BaseTransformFunction {
     int numDocs = valueBlock.getNumDocs();
     int[] inputDocIds = valueBlock.getDocIds();
     initDoubleValuesSV(numDocs);
-    String[] valuesFromIndex =
-        _jsonIndexReader.getValuesForKeyAndDocs(valueBlock.getDocIds(), 
_matchingDocsMap);
+    String[] valuesFromIndex = 
_jsonIndexReader.getValuesSV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
+        _valueToMatchingDocsMap, false);
     for (int i = 0; i < numDocs; i++) {
       String value = valuesFromIndex[i];
       if (value == null) {
@@ -210,8 +211,8 @@ public class JsonExtractIndexTransformFunction extends 
BaseTransformFunction {
     int numDocs = valueBlock.getNumDocs();
     int[] inputDocIds = valueBlock.getDocIds();
     initBigDecimalValuesSV(numDocs);
-    String[] valuesFromIndex =
-        _jsonIndexReader.getValuesForKeyAndDocs(valueBlock.getDocIds(), 
_matchingDocsMap);
+    String[] valuesFromIndex = 
_jsonIndexReader.getValuesSV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
+        _valueToMatchingDocsMap, false);
     for (int i = 0; i < numDocs; i++) {
       String value = valuesFromIndex[i];
       if (value == null) {
@@ -232,8 +233,8 @@ public class JsonExtractIndexTransformFunction extends 
BaseTransformFunction {
     int numDocs = valueBlock.getNumDocs();
     int[] inputDocIds = valueBlock.getDocIds();
     initStringValuesSV(numDocs);
-    String[] valuesFromIndex =
-        _jsonIndexReader.getValuesForKeyAndDocs(valueBlock.getDocIds(), 
_matchingDocsMap);
+    String[] valuesFromIndex = 
_jsonIndexReader.getValuesSV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
+        _valueToMatchingDocsMap, false);
     for (int i = 0; i < numDocs; i++) {
       String value = valuesFromIndex[i];
       if (value == null) {
@@ -251,26 +252,80 @@ public class JsonExtractIndexTransformFunction extends 
BaseTransformFunction {
 
   @Override
   public int[][] transformToIntValuesMV(ValueBlock valueBlock) {
-    throw new UnsupportedOperationException("jsonExtractIndex does not support 
transforming to multi-value columns");
+    int numDocs = valueBlock.getNumDocs();
+    initIntValuesMV(numDocs);
+    String[][] valuesFromIndex = 
_jsonIndexReader.getValuesMV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
+        _valueToMatchingDocsMap);
+
+    for (int i = 0; i < numDocs; i++) {
+      String[] value = valuesFromIndex[i];
+      _intValuesMV[i] = new int[value.length];
+      for (int j = 0; j < value.length; j++) {
+        _intValuesMV[i][j] = Integer.parseInt(value[j]);
+      }
+    }
+    return _intValuesMV;
   }
 
   @Override
   public long[][] transformToLongValuesMV(ValueBlock valueBlock) {
-    throw new UnsupportedOperationException("jsonExtractIndex does not support 
transforming to multi-value columns");
+    int numDocs = valueBlock.getNumDocs();
+    initLongValuesMV(numDocs);
+    String[][] valuesFromIndex = 
_jsonIndexReader.getValuesMV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
+        _valueToMatchingDocsMap);
+    for (int i = 0; i < numDocs; i++) {
+      String[] value = valuesFromIndex[i];
+      _longValuesMV[i] = new long[value.length];
+      for (int j = 0; j < value.length; j++) {
+        _longValuesMV[i][j] = Long.parseLong(value[j]);
+      }
+    }
+    return _longValuesMV;
   }
 
   @Override
   public float[][] transformToFloatValuesMV(ValueBlock valueBlock) {
-    throw new UnsupportedOperationException("jsonExtractIndex does not support 
transforming to multi-value columns");
+    int numDocs = valueBlock.getNumDocs();
+    initFloatValuesMV(numDocs);
+    String[][] valuesFromIndex = 
_jsonIndexReader.getValuesMV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
+        _valueToMatchingDocsMap);
+    for (int i = 0; i < numDocs; i++) {
+      String[] value = valuesFromIndex[i];
+      _floatValuesMV[i] = new float[value.length];
+      for (int j = 0; j < value.length; j++) {
+        _floatValuesMV[i][j] = Float.parseFloat(value[j]);
+      }
+    }
+    return _floatValuesMV;
   }
 
   @Override
   public double[][] transformToDoubleValuesMV(ValueBlock valueBlock) {
-    throw new UnsupportedOperationException("jsonExtractIndex does not support 
transforming to multi-value columns");
+    int numDocs = valueBlock.getNumDocs();
+    initDoubleValuesMV(numDocs);
+    String[][] valuesFromIndex = 
_jsonIndexReader.getValuesMV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
+        _valueToMatchingDocsMap);
+    for (int i = 0; i < numDocs; i++) {
+      String[] value = valuesFromIndex[i];
+      _doubleValuesMV[i] = new double[value.length];
+      for (int j = 0; j < value.length; j++) {
+        _doubleValuesMV[i][j] = Double.parseDouble(value[j]);
+      }
+    }
+    return _doubleValuesMV;
   }
 
   @Override
   public String[][] transformToStringValuesMV(ValueBlock valueBlock) {
-    throw new UnsupportedOperationException("jsonExtractIndex does not support 
transforming to multi-value columns");
+    int numDocs = valueBlock.getNumDocs();
+    initStringValuesMV(numDocs);
+    String[][] valuesFromIndex = 
_jsonIndexReader.getValuesMV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
+        _valueToMatchingDocsMap);
+    for (int i = 0; i < numDocs; i++) {
+      String[] value = valuesFromIndex[i];
+      _stringValuesMV[i] = new String[value.length];
+      System.arraycopy(value, 0, _stringValuesMV[i], 0, value.length);
+    }
+    return _stringValuesMV;
   }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/BaseTransformFunctionTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/BaseTransformFunctionTest.java
index 5ad67f5681..5451e77eb3 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/BaseTransformFunctionTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/BaseTransformFunctionTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pinot.core.operator.transform.function;
 
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import java.io.File;
 import java.math.BigDecimal;
 import java.sql.Timestamp;
@@ -46,6 +48,7 @@ import 
org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.spi.config.table.FieldConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -155,10 +158,13 @@ public abstract class BaseTransformFunctionTest {
       _stringSVValues[i] = df.format(_intSVValues[i] * RANDOM.nextDouble());
       _jsonSVValues[i] = String.format(
           "{\"intVal\":%s, \"longVal\":%s, \"floatVal\":%s, \"doubleVal\":%s, 
\"bigDecimalVal\":%s, "
-              + "\"stringVal\":\"%s\", "
+              + "\"stringVal\":\"%s\", \"arrayField\": [{\"arrIntField\": 1, 
\"arrStringField\": \"abc\"}, "
+              + "{\"arrIntField\": 2, \"arrStringField\": \"xyz\"},"
+              + "{\"arrIntField\": 5, \"arrStringField\": \"wxy\"},"
+              + "{\"arrIntField\": 0}], "
               + "\"intVals\":[0,1], \"longVals\":[0,1], 
\"floatVals\":[0.0,1.0], \"doubleVals\":[0.0,1.0], "
               + "\"bigDecimalVals\":[0.0,1.0], \"stringVals\":[\"0\",\"1\"]}",
-              RANDOM.nextInt(), RANDOM.nextLong(), RANDOM.nextFloat(), 
RANDOM.nextDouble(),
+          RANDOM.nextInt(), RANDOM.nextLong(), RANDOM.nextFloat(), 
RANDOM.nextDouble(),
           
BigDecimal.valueOf(RANDOM.nextDouble()).multiply(BigDecimal.valueOf(RANDOM.nextInt())),
           df.format(RANDOM.nextInt() * RANDOM.nextDouble()));
       _stringAlphaNumericSVValues[i] = 
RandomStringUtils.randomAlphanumeric(26);
@@ -276,7 +282,7 @@ public abstract class BaseTransformFunctionTest {
         .addSingleValueDimension(DOUBLE_SV_COLUMN, FieldSpec.DataType.DOUBLE)
         .addMetric(BIG_DECIMAL_SV_COLUMN, FieldSpec.DataType.BIG_DECIMAL)
         .addSingleValueDimension(STRING_SV_COLUMN, FieldSpec.DataType.STRING)
-        .addSingleValueDimension(JSON_STRING_SV_COLUMN, 
FieldSpec.DataType.STRING)
+        .addSingleValueDimension(JSON_STRING_SV_COLUMN, 
FieldSpec.DataType.STRING, 5000, "{}")
         .addSingleValueDimension(STRING_SV_NULL_COLUMN, 
FieldSpec.DataType.STRING)
         .addSingleValueDimension(STRING_ALPHANUM_SV_COLUMN, 
FieldSpec.DataType.STRING)
         .addSingleValueDimension(STRING_ALPHANUM_NULL_SV_COLUMN, 
FieldSpec.DataType.STRING)
@@ -303,10 +309,19 @@ public abstract class BaseTransformFunctionTest {
         .addDateTime(TIMESTAMP_COLUMN, FieldSpec.DataType.TIMESTAMP, 
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
         .addDateTime(TIMESTAMP_COLUMN_NULL, FieldSpec.DataType.TIMESTAMP, 
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
         .addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG, 
TimeUnit.MILLISECONDS, TIME_COLUMN), null).build();
+
+    List<FieldConfig> fieldConfigList = new ArrayList<>();
+    ObjectNode jsonIndexProps = JsonNodeFactory.instance.objectNode();
+    jsonIndexProps.put("disableCrossArrayUnnest", true);
+    ObjectNode indexNode = JsonNodeFactory.instance.objectNode();
+    indexNode.put("json", jsonIndexProps);
+    FieldConfig jsonFieldConfig =
+        new FieldConfig(JSON_STRING_SV_COLUMN, 
FieldConfig.EncodingType.DICTIONARY, null, null, null, null, indexNode,
+            null, null);
+    fieldConfigList.add(jsonFieldConfig);
     TableConfig tableConfig =
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName("test").setTimeColumnName(TIME_COLUMN)
-            .setJsonIndexColumns(List.of(JSON_STRING_SV_COLUMN))
-            .setNullHandlingEnabled(true).build();
+            
.setFieldConfigList(fieldConfigList).setNullHandlingEnabled(true).build();
 
     SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, 
schema);
     config.setOutDir(INDEX_DIR_PATH);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/JsonExtractIndexTransformFunctionTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/JsonExtractIndexTransformFunctionTest.java
index 6704c303f3..c61084c430 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/JsonExtractIndexTransformFunctionTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/JsonExtractIndexTransformFunctionTest.java
@@ -22,6 +22,7 @@ import com.jayway.jsonpath.Configuration;
 import com.jayway.jsonpath.JsonPath;
 import com.jayway.jsonpath.Option;
 import com.jayway.jsonpath.ParseContext;
+import com.jayway.jsonpath.TypeRef;
 import com.jayway.jsonpath.spi.json.JacksonJsonProvider;
 import com.jayway.jsonpath.spi.mapper.JacksonMappingProvider;
 import java.math.BigDecimal;
@@ -38,6 +39,17 @@ import org.testng.annotations.Test;
 
 
 public class JsonExtractIndexTransformFunctionTest extends 
BaseTransformFunctionTest {
+  private static final TypeRef<List<Integer>> INTEGER_LIST_TYPE = new 
TypeRef<List<Integer>>() {
+  };
+  private static final TypeRef<List<Long>> LONG_LIST_TYPE = new 
TypeRef<List<Long>>() {
+  };
+  private static final TypeRef<List<Float>> FLOAT_LIST_TYPE = new 
TypeRef<List<Float>>() {
+  };
+  private static final TypeRef<List<Double>> DOUBLE_LIST_TYPE = new 
TypeRef<List<Double>>() {
+  };
+  private static final TypeRef<List<String>> STRING_LIST_TYPE = new 
TypeRef<List<String>>() {
+  };
+
   // Used to verify index value extraction
   private static final ParseContext JSON_PARSER_CONTEXT = JsonPath.using(
       new Configuration.ConfigurationBuilder().jsonProvider(new 
JacksonJsonProvider())
@@ -94,6 +106,61 @@ public class JsonExtractIndexTransformFunctionTest extends 
BaseTransformFunction
         default:
           throw new UnsupportedOperationException("Not support data type - " + 
resultsDataType);
       }
+    } else {
+      switch (resultsDataType) {
+        case INT:
+          int[][] intValues = 
transformFunction.transformToIntValuesMV(_projectionBlock);
+          for (int i = 0; i < NUM_ROWS; i++) {
+            List<Integer> values = getValueForKey(_jsonSVValues[i], jsonPath, 
INTEGER_LIST_TYPE);
+            Assert.assertEquals(intValues[i].length, values.size());
+            for (int j = 0; j < intValues[i].length; j++) {
+              Assert.assertEquals(intValues[i][j], values.get(j));
+            }
+          }
+          break;
+        case LONG:
+          long[][] longValues = 
transformFunction.transformToLongValuesMV(_projectionBlock);
+          for (int i = 0; i < NUM_ROWS; i++) {
+            List<Long> values = getValueForKey(_jsonSVValues[i], jsonPath, 
LONG_LIST_TYPE);
+            Assert.assertEquals(longValues[i].length, values.size());
+            for (int j = 0; j < longValues[i].length; j++) {
+              Assert.assertEquals(longValues[i][j], values.get(j));
+            }
+          }
+          break;
+        case FLOAT:
+          float[][] floatValues = 
transformFunction.transformToFloatValuesMV(_projectionBlock);
+          for (int i = 0; i < NUM_ROWS; i++) {
+            List<Float> values = getValueForKey(_jsonSVValues[i], jsonPath, 
FLOAT_LIST_TYPE);
+            Assert.assertEquals(floatValues[i].length, values.size());
+            for (int j = 0; j < floatValues[i].length; j++) {
+              Assert.assertEquals(floatValues[i][j], values.get(j));
+            }
+          }
+          break;
+        case DOUBLE:
+          double[][] doubleValues = 
transformFunction.transformToDoubleValuesMV(_projectionBlock);
+          for (int i = 0; i < NUM_ROWS; i++) {
+            List<Double> values = getValueForKey(_jsonSVValues[i], jsonPath, 
DOUBLE_LIST_TYPE);
+            Assert.assertEquals(doubleValues[i].length, values.size());
+            for (int j = 0; j < doubleValues[i].length; j++) {
+              Assert.assertEquals(doubleValues[i][j], values.get(j));
+            }
+          }
+          break;
+        case STRING:
+          String[][] stringValues = 
transformFunction.transformToStringValuesMV(_projectionBlock);
+          for (int i = 0; i < NUM_ROWS; i++) {
+            List<String> values = getValueForKey(_jsonSVValues[i], jsonPath, 
STRING_LIST_TYPE);
+            Assert.assertEquals(stringValues[i].length, values.size());
+            for (int j = 0; j < stringValues[i].length; j++) {
+              Assert.assertEquals(stringValues[i][j], values.get(j));
+            }
+          }
+          break;
+        default:
+          throw new UnsupportedOperationException("Not support data type - " + 
resultsDataType);
+      }
     }
   }
 
@@ -127,32 +194,61 @@ public class JsonExtractIndexTransformFunctionTest 
extends BaseTransformFunction
     });
 
     testArguments.add(new Object[]{
-            String.format("jsonExtractIndex(%s,'%s','INT')", 
JSON_STRING_SV_COLUMN,
-                    "$.intVals[0]"), "$.intVals[0]", DataType.INT, true
+        String.format("jsonExtractIndex(%s,'%s','INT')", JSON_STRING_SV_COLUMN,
+            "$.intVals[0]"), "$.intVals[0]", DataType.INT, true
     });
     testArguments.add(new Object[]{
-            String.format("jsonExtractIndex(%s,'%s','LONG')", 
JSON_STRING_SV_COLUMN,
-                    "$.longVals[1]"), "$.longVals[1]", DataType.LONG, true
+        String.format("jsonExtractIndex(%s,'%s','LONG')", 
JSON_STRING_SV_COLUMN,
+            "$.longVals[1]"), "$.longVals[1]", DataType.LONG, true
     });
     testArguments.add(new Object[]{
-            String.format("jsonExtractIndex(%s,'%s','FLOAT')", 
JSON_STRING_SV_COLUMN,
-                    "$.floatVals[0]"), "$.floatVals[0]", DataType.FLOAT, true
+        String.format("jsonExtractIndex(%s,'%s','FLOAT')", 
JSON_STRING_SV_COLUMN,
+            "$.floatVals[0]"), "$.floatVals[0]", DataType.FLOAT, true
     });
     testArguments.add(new Object[]{
-            String.format("jsonExtractIndex(%s,'%s','DOUBLE')", 
JSON_STRING_SV_COLUMN,
-                    "$.doubleVals[1]"), "$.doubleVals[1]", DataType.DOUBLE, 
true
+        String.format("jsonExtractIndex(%s,'%s','DOUBLE')", 
JSON_STRING_SV_COLUMN,
+            "$.doubleVals[1]"), "$.doubleVals[1]", DataType.DOUBLE, true
     });
     testArguments.add(new Object[]{
-            String.format("jsonExtractIndex(%s,'%s','BIG_DECIMAL')", 
JSON_STRING_SV_COLUMN,
-                    "$.bigDecimalVals[0]"), "$.bigDecimalVals[0]", 
DataType.BIG_DECIMAL, true
+        String.format("jsonExtractIndex(%s,'%s','BIG_DECIMAL')", 
JSON_STRING_SV_COLUMN,
+            "$.bigDecimalVals[0]"), "$.bigDecimalVals[0]", 
DataType.BIG_DECIMAL, true
     });
     testArguments.add(new Object[]{
-            String.format("jsonExtractIndex(%s,'%s','STRING')", 
JSON_STRING_SV_COLUMN,
-                    "$.stringVals[1]"), "$.stringVals[1]", DataType.STRING, 
true
+        String.format("jsonExtractIndex(%s,'%s','STRING')", 
JSON_STRING_SV_COLUMN,
+            "$.stringVals[1]"), "$.stringVals[1]", DataType.STRING, true
     });
+
+    addMvTests(testArguments);
     return testArguments.toArray(new Object[0][]);
   }
 
+  private void addMvTests(List<Object[]> testArguments) {
+    testArguments.add(new Object[]{
+        String.format("jsonExtractIndex(%s,'%s','INT_ARRAY')", 
JSON_STRING_SV_COLUMN,
+            "$.intVals[*]"), "$.intVals[*]", DataType.INT, false
+    });
+    testArguments.add(new Object[]{
+        String.format("jsonExtractIndex(%s,'%s','LONG_ARRAY')", 
JSON_STRING_SV_COLUMN,
+            "$.longVals[*]"), "$.longVals[*]", DataType.LONG, false
+    });
+    testArguments.add(new Object[]{
+        String.format("jsonExtractIndex(%s,'%s','DOUBLE_ARRAY')", 
JSON_STRING_SV_COLUMN,
+            "$.doubleVals[*]"), "$.doubleVals[*]", DataType.DOUBLE, false
+    });
+    testArguments.add(new Object[]{
+        String.format("jsonExtractIndex(%s,'%s','STRING_ARRAY')", 
JSON_STRING_SV_COLUMN,
+            "$.stringVals[*]"), "$.stringVals[*]", DataType.STRING, false
+    });
+    testArguments.add(new Object[]{
+        String.format("jsonExtractIndex(%s,'%s','INT_ARRAY')", 
JSON_STRING_SV_COLUMN,
+            "$.arrayField[*].arrIntField"), "$.arrayField[*].arrIntField", 
DataType.INT, false
+    });
+    testArguments.add(new Object[]{
+        String.format("jsonExtractIndex(%s,'%s','STRING_ARRAY')", 
JSON_STRING_SV_COLUMN,
+            "$.arrayField[*].arrStringField"), 
"$.arrayField[*].arrStringField", DataType.STRING, false
+    });
+  }
+
   @Test(dataProvider = "testJsonExtractIndexDefaultValue")
   public void testJsonExtractIndexDefaultValue(String expressionStr, String 
jsonPathString, DataType resultsDataType,
       boolean isSingleValue) {
@@ -246,4 +342,8 @@ public class JsonExtractIndexTransformFunctionTest extends 
BaseTransformFunction
     }
     return out.toString();
   }
+
+  private <T> T getValueForKey(String blob, JsonPath path, TypeRef<T> typeRef) 
{
+    return JSON_PARSER_CONTEXT.parse(blob).read(path, typeRef);
+  }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java
index d072314d75..8a5ca799cd 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java
@@ -23,9 +23,13 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 import it.unimi.dsi.fastutil.ints.IntArrayList;
 import it.unimi.dsi.fastutil.ints.IntList;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.PriorityQueue;
 import java.util.TreeMap;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.regex.Pattern;
@@ -48,7 +52,6 @@ import 
org.apache.pinot.spi.exception.BadQueryRequestException;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.sql.parsers.CalciteSqlParser;
 import org.roaringbitmap.IntConsumer;
-import org.roaringbitmap.PeekableIntIterator;
 import org.roaringbitmap.RoaringBitmap;
 import org.roaringbitmap.buffer.MutableRoaringBitmap;
 
@@ -214,7 +217,7 @@ public class MutableJsonIndexImpl implements 
MutableJsonIndex {
     } else {
       key = JsonUtils.KEY_SEPARATOR + key;
     }
-    Pair<String, RoaringBitmap> pair = getKeyAndFlattenDocId(key);
+    Pair<String, RoaringBitmap> pair = getKeyAndFlattenedDocIds(key);
     key = pair.getLeft();
     RoaringBitmap matchingDocIds = pair.getRight();
     if (matchingDocIds != null && matchingDocIds.isEmpty()) {
@@ -350,58 +353,45 @@ public class MutableJsonIndexImpl implements 
MutableJsonIndex {
     }
   }
 
+  public void convertFlattenedDocIdsToDocIds(Map<String, RoaringBitmap> 
valueToFlattenedDocIds) {
+    _readLock.lock();
+    try {
+      valueToFlattenedDocIds.replaceAll((key, value) -> {
+        RoaringBitmap docIds = new RoaringBitmap();
+        value.forEach((IntConsumer) flattenedDocId -> 
docIds.add(_docIdMapping.getInt(flattenedDocId)));
+        return docIds;
+      });
+    } finally {
+      _readLock.unlock();
+    }
+  }
+
   @Override
-  public Map<String, RoaringBitmap> getMatchingDocsMap(String key) {
-    Map<String, RoaringBitmap> matchingDocsMap = new HashMap<>();
+  public Map<String, RoaringBitmap> getMatchingFlattenedDocsMap(String 
jsonPathKey) {
+    Map<String, RoaringBitmap> valueToMatchingFlattenedDocIdsMap = new 
HashMap<>();
     _readLock.lock();
     try {
-      Pair<String, RoaringBitmap> result = getKeyAndFlattenDocId(key);
-      key = result.getLeft();
+      Pair<String, RoaringBitmap> result = 
getKeyAndFlattenedDocIds(jsonPathKey);
+      jsonPathKey = result.getLeft();
       RoaringBitmap arrayIndexFlattenDocIds = result.getRight();
       if (arrayIndexFlattenDocIds != null && 
arrayIndexFlattenDocIds.isEmpty()) {
-        return matchingDocsMap;
+        return valueToMatchingFlattenedDocIdsMap;
       }
-      Map<String, RoaringBitmap> subMap = getMatchingKeysMap(key);
+      Map<String, RoaringBitmap> subMap = getMatchingKeysMap(jsonPathKey);
       for (Map.Entry<String, RoaringBitmap> entry : subMap.entrySet()) {
-        RoaringBitmap kvPairFlattenedDocIds = entry.getValue();
-        PeekableIntIterator it = arrayIndexFlattenDocIds == null ? 
kvPairFlattenedDocIds.getIntIterator()
-                : RoaringBitmap.and(arrayIndexFlattenDocIds, 
kvPairFlattenedDocIds).getIntIterator();
-        if (!it.hasNext()) {
-          continue;
+        RoaringBitmap flattenedDocIds = entry.getValue().clone();
+        if (arrayIndexFlattenDocIds != null) {
+          flattenedDocIds.and(arrayIndexFlattenDocIds);
         }
-        MutableRoaringBitmap postingList = new MutableRoaringBitmap();
-        while (it.hasNext()) {
-          postingList.add(_docIdMapping.getInt(it.next()));
+        if (!flattenedDocIds.isEmpty()) {
+          
valueToMatchingFlattenedDocIdsMap.put(entry.getKey().substring(jsonPathKey.length()
 + 1), flattenedDocIds);
         }
-        String val = entry.getKey().substring(key.length() + 1);
-        matchingDocsMap.put(val, postingList.toRoaringBitmap());
       }
+
+      return valueToMatchingFlattenedDocIdsMap;
     } finally {
       _readLock.unlock();
     }
-    return matchingDocsMap;
-  }
-
-  @Override
-  public String[] getValuesForKeyAndDocs(int[] docIds, Map<String, 
RoaringBitmap> matchingDocsMap) {
-    Int2ObjectOpenHashMap<String> docIdToValues = new 
Int2ObjectOpenHashMap<>(docIds.length);
-    RoaringBitmap docIdMask = RoaringBitmap.bitmapOf(docIds);
-
-    for (Map.Entry<String, RoaringBitmap> entry : matchingDocsMap.entrySet()) {
-      RoaringBitmap intersection = RoaringBitmap.and(entry.getValue(), 
docIdMask);
-      if (intersection.isEmpty()) {
-        continue;
-      }
-      for (int docId : intersection) {
-        docIdToValues.put(docId, entry.getKey());
-      }
-    }
-
-    String[] values = new String[docIds.length];
-    for (int i = 0; i < docIds.length; i++) {
-      values[i] = docIdToValues.get(docIds[i]);
-    }
-    return values;
   }
 
   /**
@@ -410,7 +400,7 @@ public class MutableJsonIndexImpl implements 
MutableJsonIndex {
    *  Else, return the json path that is generated by replacing array index 
with . on the original key
    *  and the associated flattenDocId bitmap
    */
-  private Pair<String, RoaringBitmap> getKeyAndFlattenDocId(String key) {
+  private Pair<String, RoaringBitmap> getKeyAndFlattenedDocIds(String key) {
     // Process the array index within the key if exists
     // E.g. "[*]"=1 -> "."='1'
     // E.g. "[0]"=1 -> ".$index"='0' && "."='1'
@@ -454,6 +444,87 @@ public class MutableJsonIndexImpl implements 
MutableJsonIndex {
         key + JsonIndexCreator.KEY_VALUE_SEPARATOR_NEXT_CHAR, false);
   }
 
+  @Override
+  public String[][] getValuesMV(int[] docIds, int length,
+      Map<String, RoaringBitmap> valueToMatchingFlattenedDocs) {
+    String[][] result = new String[length][];
+    List<PriorityQueue<Pair<String, Integer>>> docIdToFlattenedDocIdsAndValues 
= new ArrayList<>();
+    for (int i = 0; i < length; i++) {
+      // Sort based on flattened doc id
+      docIdToFlattenedDocIdsAndValues.add(new 
PriorityQueue<>(Comparator.comparingInt(Pair::getRight)));
+    }
+    Map<Integer, Integer> docIdToPos = new HashMap<>();
+    for (int i = 0; i < length; i++) {
+      docIdToPos.put(docIds[i], i);
+    }
+
+    _readLock.lock();
+    try {
+      for (Map.Entry<String, RoaringBitmap> entry : 
valueToMatchingFlattenedDocs.entrySet()) {
+        String value = entry.getKey();
+        RoaringBitmap matchingFlattenedDocIds = entry.getValue();
+        matchingFlattenedDocIds.forEach((IntConsumer) flattenedDocId -> {
+          int docId = _docIdMapping.getInt(flattenedDocId);
+          if (docIdToPos.containsKey(docId)) {
+            
docIdToFlattenedDocIdsAndValues.get(docIdToPos.get(docId)).add(Pair.of(value, 
flattenedDocId));
+          }
+        });
+      }
+    } finally {
+      _readLock.unlock();
+    }
+
+    for (int i = 0; i < length; i++) {
+      PriorityQueue<Pair<String, Integer>> pq = 
docIdToFlattenedDocIdsAndValues.get(i);
+      result[i] = new String[pq.size()];
+      int j = 0;
+      while (!pq.isEmpty()) {
+        result[i][j++] = pq.poll().getLeft();
+      }
+    }
+
+    return result;
+  }
+
+  @Override
+  public String[] getValuesSV(int[] docIds, int length, Map<String, 
RoaringBitmap> valueToMatchingFlattenedDocs,
+      boolean isFlattenedDocIds) {
+    Int2ObjectOpenHashMap<String> docIdToValues = new 
Int2ObjectOpenHashMap<>(length);
+    RoaringBitmap docIdMask = 
RoaringBitmap.bitmapOf(Arrays.copyOfRange(docIds, 0, length));
+    _readLock.lock();
+    try {
+      for (Map.Entry<String, RoaringBitmap> entry : 
valueToMatchingFlattenedDocs.entrySet()) {
+        String value = entry.getKey();
+        RoaringBitmap matchingDocIds = entry.getValue();
+
+        if (isFlattenedDocIds) {
+          matchingDocIds.forEach((IntConsumer) flattenedDocId -> {
+            int docId = _docIdMapping.getInt(flattenedDocId);
+            if (docIdMask.contains(docId)) {
+              docIdToValues.put(docId, value);
+            }
+          });
+        } else {
+          RoaringBitmap intersection = RoaringBitmap.and(entry.getValue(), 
docIdMask);
+          if (intersection.isEmpty()) {
+            continue;
+          }
+          for (int docId : intersection) {
+            docIdToValues.put(docId, entry.getKey());
+          }
+        }
+      }
+    } finally {
+      _readLock.unlock();
+    }
+
+    String[] values = new String[length];
+    for (int i = 0; i < length; i++) {
+      values[i] = docIdToValues.get(docIds[i]);
+    }
+    return values;
+  }
+
   @Override
   public void close() {
   }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/json/ImmutableJsonIndexReader.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/json/ImmutableJsonIndexReader.java
index d692a978b6..9af37b50fb 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/json/ImmutableJsonIndexReader.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/json/ImmutableJsonIndexReader.java
@@ -21,9 +21,12 @@ package 
org.apache.pinot.segment.local.segment.index.readers.json;
 import com.google.common.base.Preconditions;
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.PriorityQueue;
 import java.util.regex.Pattern;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.common.request.context.ExpressionContext;
@@ -62,6 +65,7 @@ public class ImmutableJsonIndexReader implements 
JsonIndexReader {
   private final int _version;
   private final StringDictionary _dictionary;
   private final BitmapInvertedIndexReader _invertedIndex;
+  private final long _numFlattenedDocs;
   private final PinotDataBuffer _docIdMapping;
 
   public ImmutableJsonIndexReader(PinotDataBuffer dataBuffer, int numDocs) {
@@ -84,6 +88,7 @@ public class ImmutableJsonIndexReader implements 
JsonIndexReader {
     _invertedIndex = new BitmapInvertedIndexReader(
         dataBuffer.view(dictionaryEndOffset, invertedIndexEndOffset, 
ByteOrder.BIG_ENDIAN), _dictionary.length());
     long docIdMappingEndOffset = invertedIndexEndOffset + docIdMappingLength;
+    _numFlattenedDocs = (docIdMappingLength / Integer.BYTES);
     _docIdMapping = dataBuffer.view(invertedIndexEndOffset, 
docIdMappingEndOffset, ByteOrder.LITTLE_ENDIAN);
   }
 
@@ -181,7 +186,7 @@ public class ImmutableJsonIndexReader implements 
JsonIndexReader {
         key = key.substring(2);
       }
     }
-    Pair<String, MutableRoaringBitmap> pair = getKeyAndFlattenDocId(key);
+    Pair<String, MutableRoaringBitmap> pair = getKeyAndFlattenedDocIds(key);
     key = pair.getLeft();
     MutableRoaringBitmap matchingDocIds = pair.getRight();
     if (matchingDocIds != null && matchingDocIds.isEmpty()) {
@@ -317,51 +322,109 @@ public class ImmutableJsonIndexReader implements 
JsonIndexReader {
     return _docIdMapping.getInt((long) flattenedDocId << 2);
   }
 
+  public void convertFlattenedDocIdsToDocIds(Map<String, RoaringBitmap> 
valueToFlattenedDocIds) {
+    valueToFlattenedDocIds.replaceAll((key, value) -> {
+      RoaringBitmap docIds = new RoaringBitmap();
+      value.forEach((IntConsumer) flattenedDocId -> 
docIds.add(getDocId(flattenedDocId)));
+      return docIds;
+    });
+  }
+
   @Override
-  public Map<String, RoaringBitmap> getMatchingDocsMap(String key) {
-    Map<String, RoaringBitmap> matchingDocsMap = new HashMap<>();
-    Pair<String, MutableRoaringBitmap> result = getKeyAndFlattenDocId(key);
-    key = result.getLeft();
-    MutableRoaringBitmap arrayIndexFlattenDocIds = result.getRight();
-    if (arrayIndexFlattenDocIds != null && arrayIndexFlattenDocIds.isEmpty()) {
-      return matchingDocsMap;
+  public Map<String, RoaringBitmap> getMatchingFlattenedDocsMap(String 
jsonPathKey) {
+    Map<String, RoaringBitmap> result = new HashMap<>();
+    Pair<String, MutableRoaringBitmap> pathKey = 
getKeyAndFlattenedDocIds(jsonPathKey);
+    if (pathKey.getRight() != null && pathKey.getRight().isEmpty()) {
+      return result;
+    }
+
+    jsonPathKey = pathKey.getLeft();
+    RoaringBitmap arrayIndexFlattenDocIds = null;
+    if (pathKey.getRight() != null) {
+      arrayIndexFlattenDocIds = pathKey.getRight().toRoaringBitmap();
     }
-    int[] dictIds = getDictIdRangeForKey(key);
+    int[] dictIds = getDictIdRangeForKey(jsonPathKey);
     for (int dictId = dictIds[0]; dictId < dictIds[1]; dictId++) {
-      // get docIds from posting list, convert these to the actual docIds
-      ImmutableRoaringBitmap flattenedDocIds = 
_invertedIndex.getDocIds(dictId);
-      PeekableIntIterator it = arrayIndexFlattenDocIds == null ? 
flattenedDocIds.getIntIterator()
-              : intersect(arrayIndexFlattenDocIds.clone(), flattenedDocIds);
-      if (!it.hasNext()) {
-        continue;
+      String key = _dictionary.getStringValue(dictId);
+      RoaringBitmap docIds = 
_invertedIndex.getDocIds(dictId).toRoaringBitmap();
+      if (arrayIndexFlattenDocIds != null) {
+        docIds.and(arrayIndexFlattenDocIds);
+      }
+
+      if (!docIds.isEmpty()) {
+        result.put(key.substring(jsonPathKey.length() + 1), docIds);
       }
-      RoaringBitmap realDocIds = new RoaringBitmap();
-      while (it.hasNext()) {
-        realDocIds.add(getDocId(it.next()));
+    }
+
+    return result;
+  }
+
+  @Override
+  public String[][] getValuesMV(int[] docIds, int length,
+      Map<String, RoaringBitmap> valueToMatchingFlattenedDocs) {
+    String[][] result = new String[length][];
+    List<PriorityQueue<Pair<String, Integer>>> docIdToFlattenedDocIdsAndValues 
= new ArrayList<>();
+    for (int i = 0; i < length; i++) {
+      // Sort based on flattened doc id
+      docIdToFlattenedDocIdsAndValues.add(new 
PriorityQueue<>(Comparator.comparingInt(Pair::getRight)));
+    }
+    Map<Integer, Integer> docIdToPos = new HashMap<>();
+    for (int i = 0; i < length; i++) {
+      docIdToPos.put(docIds[i], i);
+    }
+
+    for (Map.Entry<String, RoaringBitmap> entry : 
valueToMatchingFlattenedDocs.entrySet()) {
+      String value = entry.getKey();
+      RoaringBitmap matchingFlattenedDocIds = entry.getValue();
+      matchingFlattenedDocIds.forEach((IntConsumer) flattenedDocId -> {
+        int docId = getDocId(flattenedDocId);
+        if (docIdToPos.containsKey(docId)) {
+          
docIdToFlattenedDocIdsAndValues.get(docIdToPos.get(docId)).add(Pair.of(value, 
flattenedDocId));
+        }
+      });
+    }
+
+    for (int i = 0; i < length; i++) {
+      PriorityQueue<Pair<String, Integer>> pq = 
docIdToFlattenedDocIdsAndValues.get(i);
+      result[i] = new String[pq.size()];
+      int j = 0;
+      while (!pq.isEmpty()) {
+        result[i][j++] = pq.poll().getLeft();
       }
-      
matchingDocsMap.put(_dictionary.getStringValue(dictId).substring(key.length() + 
1), realDocIds);
     }
 
-    return matchingDocsMap;
+    return result;
   }
 
   @Override
-  public String[] getValuesForKeyAndDocs(int[] docIds, Map<String, 
RoaringBitmap> matchingDocsMap) {
+  public String[] getValuesSV(int[] docIds, int length, Map<String, 
RoaringBitmap> valueToMatchingDocs,
+      boolean isFlattenedDocIds) {
     Int2ObjectOpenHashMap<String> docIdToValues = new 
Int2ObjectOpenHashMap<>(docIds.length);
     RoaringBitmap docIdMask = RoaringBitmap.bitmapOf(docIds);
 
-    for (Map.Entry<String, RoaringBitmap> entry : matchingDocsMap.entrySet()) {
-      RoaringBitmap intersection = RoaringBitmap.and(entry.getValue(), 
docIdMask);
-      if (intersection.isEmpty()) {
-        continue;
-      }
-      for (int docId : intersection) {
-        docIdToValues.put(docId, entry.getKey());
+    for (Map.Entry<String, RoaringBitmap> entry : 
valueToMatchingDocs.entrySet()) {
+      String value = entry.getKey();
+      RoaringBitmap matchingDocIds = entry.getValue();
+      if (isFlattenedDocIds) {
+        matchingDocIds.forEach((IntConsumer) flattenedDocId -> {
+          int docId = getDocId(flattenedDocId);
+          if (docIdMask.contains(docId)) {
+            docIdToValues.put(docId, value);
+          }
+        });
+      } else {
+        RoaringBitmap intersection = RoaringBitmap.and(matchingDocIds, 
docIdMask);
+        if (intersection.isEmpty()) {
+          continue;
+        }
+        for (int docId : intersection) {
+          docIdToValues.put(docId, entry.getKey());
+        }
       }
     }
 
-    String[] values = new String[docIds.length];
-    for (int i = 0; i < docIds.length; i++) {
+    String[] values = new String[length];
+    for (int i = 0; i < length; i++) {
       values[i] = docIdToValues.get(docIds[i]);
     }
     return values;
@@ -396,7 +459,7 @@ public class ImmutableJsonIndexReader implements 
JsonIndexReader {
    *  Else, return the json path that is generated by replacing array index 
with . on the original key
    *  and the associated flattenDocId bitmap
    */
-  private Pair<String, MutableRoaringBitmap> getKeyAndFlattenDocId(String key) 
{
+  private Pair<String, MutableRoaringBitmap> getKeyAndFlattenedDocIds(String 
key) {
     MutableRoaringBitmap matchingDocIds = null;
     if (_version == BaseJsonIndexCreator.VERSION_2) {
       // Process the array index within the key if exists
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/JsonIndexTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/JsonIndexTest.java
index 3b357e711a..1b2a910da3 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/JsonIndexTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/JsonIndexTest.java
@@ -328,12 +328,74 @@ public class JsonIndexTest {
     return indexReader.getMatchingDocIds(filter);
   }
 
+  @Test
+  public void testGetValueToFlattenedDocIdsMap()
+      throws Exception {
+    // @formatter: off
+    // CHECKSTYLE:OFF
+    String[] records = new String[]{
+        "{\"arrField\": " + "[{\"intKey01\": 1, \"stringKey01\": \"abc\"},"
+            + " {\"intKey01\": 1, \"stringKey01\": \"foo\"}, " + " 
{\"intKey01\": 3, \"stringKey01\": \"bar\"},"
+            + " {\"intKey01\": 5, \"stringKey01\": \"fuzz\"}]}",
+        "{\"arrField\": " + "[{\"intKey01\": 7, \"stringKey01\": \"pqrS\"},"
+            + " {\"intKey01\": 6, \"stringKey01\": \"foo\"}, " + " 
{\"intKey01\": 8, \"stringKey01\": \"test\"},"
+            + " {\"intKey01\": 9, \"stringKey01\": \"testf2\"}]}",
+        "{\"arrField\": " + "[{\"intKey01\": 1, \"stringKey01\": \"pqr\"},"
+            + " {\"intKey01\": 1, \"stringKey01\": \"foo\"}, " + " 
{\"intKey01\": 6, \"stringKey01\": \"test\"},"
+            + " {\"intKey01\": 3, \"stringKey01\": \"testf2\"}]}",
+    };
+    // CHECKSTYLE:ON
+    // @formatter: on
+
+    String[] testKeys = new String[]{
+        ".arrField[*].intKey01",
+        ".arrField[*].stringKey01",
+    };
+
+    String colName = "col";
+    try (JsonIndexCreator offHeapIndexCreator = new 
OffHeapJsonIndexCreator(INDEX_DIR, colName, new JsonIndexConfig());
+        MutableJsonIndexImpl mutableJsonIndex = new MutableJsonIndexImpl(new 
JsonIndexConfig())) {
+      for (String record : records) {
+        offHeapIndexCreator.add(record);
+        mutableJsonIndex.add(record);
+      }
+      offHeapIndexCreator.seal();
+
+      File offHeapIndexFile = new File(INDEX_DIR, colName + 
V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION);
+      Assert.assertTrue(offHeapIndexFile.exists());
+
+      try (PinotDataBuffer offHeapDataBuffer = 
PinotDataBuffer.mapReadOnlyBigEndianFile(offHeapIndexFile);
+          ImmutableJsonIndexReader offHeapIndexReader = new 
ImmutableJsonIndexReader(offHeapDataBuffer,
+              records.length)) {
+        int[] docMask = new int[]{0, 2, 1};
+        int docIdValidLength = 2;
+        String[][][] expectedValues = new String[][][]{
+            {{"1", "1", "3", "5"}, {"1", "1", "6", "3"}},
+            {{"abc", "foo", "bar", "fuzz"}, {"pqr", "foo", "test", "testf2"}}
+        };
+        for (int i = 0; i < testKeys.length; i++) {
+          Map<String, RoaringBitmap> context =
+              offHeapIndexReader.getMatchingFlattenedDocsMap(testKeys[i]);
+          String[][] values = offHeapIndexReader.getValuesMV(docMask, 
docIdValidLength, context);
+
+          for (int j = 0; j < docIdValidLength; j++) {
+            Assert.assertEquals(values[j], expectedValues[i][j]);
+          }
+
+          context = mutableJsonIndex.getMatchingFlattenedDocsMap(testKeys[i]);
+          values = mutableJsonIndex.getValuesMV(docMask, docIdValidLength, 
context);
+          Assert.assertEquals(values, expectedValues[i]);
+        }
+      }
+    }
+  }
+
   @Test
   public void testGetValuesForKeyAndDocs()
       throws Exception {
     // @formatter: off
     // CHECKSTYLE:OFF
-    String[] records = new String[] {
+    String[] records = new String[]{
         "{\"field1\":\"value1\",\"field2\":\"value2\",\"field3\":\"value3\"}",
         "{\"field1\":\"value2\", \"field2\":[\"value1\",\"value2\"]}",
         "{\"field1\":\"value1\",\"field2\":\"value4\"}",
@@ -365,12 +427,20 @@ public class JsonIndexTest {
             new String[][]{{"value1", "value2", "value1"}, {"value2", null, 
"value4"}, {"value3", null, null},
                 {null, null, null}};
         for (int i = 0; i < testKeys.length; i++) {
-          Map<String, RoaringBitmap> context = 
offHeapIndexReader.getMatchingDocsMap(testKeys[i]);
-          String[] values = offHeapIndexReader.getValuesForKeyAndDocs(docMask, 
context);
+          Map<String, RoaringBitmap> context = 
offHeapIndexReader.getMatchingFlattenedDocsMap(testKeys[i]);
+          String[] values = offHeapIndexReader.getValuesSV(docMask, 
docMask.length, context, true);
+          Assert.assertEquals(values, expectedValues[i]);
+
+          offHeapIndexReader.convertFlattenedDocIdsToDocIds(context);
+          values = offHeapIndexReader.getValuesSV(docMask, docMask.length, 
context, false);
           Assert.assertEquals(values, expectedValues[i]);
 
-          context = mutableJsonIndex.getMatchingDocsMap(testKeys[i]);
-          values = mutableJsonIndex.getValuesForKeyAndDocs(docMask, context);
+          context = mutableJsonIndex.getMatchingFlattenedDocsMap(testKeys[i]);
+          values = mutableJsonIndex.getValuesSV(docMask, docMask.length, 
context, true);
+          Assert.assertEquals(values, expectedValues[i]);
+
+          mutableJsonIndex.convertFlattenedDocIdsToDocIds(context);
+          values = mutableJsonIndex.getValuesSV(docMask, docMask.length, 
context, false);
           Assert.assertEquals(values, expectedValues[i]);
         }
 
@@ -378,31 +448,55 @@ public class JsonIndexTest {
         docMask = new int[]{1, 2};
         expectedValues = new String[][]{{"value2", "value1"}, {null, 
"value4"}, {null, null}, {null, null}};
         for (int i = 0; i < testKeys.length; i++) {
-          Map<String, RoaringBitmap> context = 
offHeapIndexReader.getMatchingDocsMap(testKeys[i]);
-          String[] values = offHeapIndexReader.getValuesForKeyAndDocs(docMask, 
context);
+          Map<String, RoaringBitmap> context = 
offHeapIndexReader.getMatchingFlattenedDocsMap(testKeys[i]);
+          String[] values = offHeapIndexReader.getValuesSV(docMask, 
docMask.length, context, true);
+          Assert.assertEquals(values, expectedValues[i]);
+
+          offHeapIndexReader.convertFlattenedDocIdsToDocIds(context);
+          values = offHeapIndexReader.getValuesSV(docMask, docMask.length, 
context, false);
+          Assert.assertEquals(values, expectedValues[i]);
+
+          context = mutableJsonIndex.getMatchingFlattenedDocsMap(testKeys[i]);
+          values = mutableJsonIndex.getValuesSV(docMask, docMask.length, 
context, true);
           Assert.assertEquals(values, expectedValues[i]);
 
-          context = mutableJsonIndex.getMatchingDocsMap(testKeys[i]);
-          values = mutableJsonIndex.getValuesForKeyAndDocs(docMask, context);
+          mutableJsonIndex.convertFlattenedDocIdsToDocIds(context);
+          values = mutableJsonIndex.getValuesSV(docMask, docMask.length, 
context, false);
           Assert.assertEquals(values, expectedValues[i]);
         }
 
         // Immutable index, context is reused for the second method call
-        Map<String, RoaringBitmap> context = 
offHeapIndexReader.getMatchingDocsMap(".field1");
+        Map<String, RoaringBitmap> context = 
offHeapIndexReader.getMatchingFlattenedDocsMap(".field1");
         docMask = new int[]{0};
-        String[] values = offHeapIndexReader.getValuesForKeyAndDocs(docMask, 
context);
+        String[] values = offHeapIndexReader.getValuesSV(docMask, 
docMask.length, context, true);
         Assert.assertEquals(values, new String[]{"value1"});
         docMask = new int[]{1, 2};
-        values = offHeapIndexReader.getValuesForKeyAndDocs(docMask, context);
+        values = offHeapIndexReader.getValuesSV(docMask, docMask.length, 
context, true);
+        Assert.assertEquals(values, new String[]{"value2", "value1"});
+
+        offHeapIndexReader.convertFlattenedDocIdsToDocIds(context);
+        docMask = new int[]{0};
+        values = offHeapIndexReader.getValuesSV(docMask, docMask.length, 
context, false);
+        Assert.assertEquals(values, new String[]{"value1"});
+        docMask = new int[]{1, 2};
+        values = offHeapIndexReader.getValuesSV(docMask, docMask.length, 
context, false);
         Assert.assertEquals(values, new String[]{"value2", "value1"});
 
         // Mutable index, context is reused for the second method call
-        context = mutableJsonIndex.getMatchingDocsMap(".field1");;
+        context = mutableJsonIndex.getMatchingFlattenedDocsMap(".field1");;
+        docMask = new int[]{0};
+        values = mutableJsonIndex.getValuesSV(docMask, docMask.length, 
context, true);
+        Assert.assertEquals(values, new String[]{"value1"});
+        docMask = new int[]{1, 2};
+        values = mutableJsonIndex.getValuesSV(docMask, docMask.length, 
context, true);
+        Assert.assertEquals(values, new String[]{"value2", "value1"});
+
+        mutableJsonIndex.convertFlattenedDocIdsToDocIds(context);
         docMask = new int[]{0};
-        values = mutableJsonIndex.getValuesForKeyAndDocs(docMask, context);
+        values = mutableJsonIndex.getValuesSV(docMask, docMask.length, 
context, false);
         Assert.assertEquals(values, new String[]{"value1"});
         docMask = new int[]{1, 2};
-        values = mutableJsonIndex.getValuesForKeyAndDocs(docMask, context);
+        values = mutableJsonIndex.getValuesSV(docMask, docMask.length, 
context, false);
         Assert.assertEquals(values, new String[]{"value2", "value1"});
       }
     }
@@ -431,9 +525,9 @@ public class JsonIndexTest {
       for (String record : records) {
         mutableJsonIndex.add(record);
       }
-      Map<String, RoaringBitmap> onHeapRes = 
onHeapIndexReader.getMatchingDocsMap("");
-      Map<String, RoaringBitmap> offHeapRes = 
offHeapIndexReader.getMatchingDocsMap("");
-      Map<String, RoaringBitmap> mutableRes = 
mutableJsonIndex.getMatchingDocsMap("");
+      Map<String, RoaringBitmap> onHeapRes = 
onHeapIndexReader.getMatchingFlattenedDocsMap("");
+      Map<String, RoaringBitmap> offHeapRes = 
offHeapIndexReader.getMatchingFlattenedDocsMap("");
+      Map<String, RoaringBitmap> mutableRes = 
mutableJsonIndex.getMatchingFlattenedDocsMap("");
       Map<String, RoaringBitmap> expectedRes = 
Collections.singletonMap(JsonUtils.SKIPPED_VALUE_REPLACEMENT,
           RoaringBitmap.bitmapOf(0));
       Assert.assertEquals(expectedRes, onHeapRes);
@@ -491,17 +585,20 @@ public class JsonIndexTest {
 
     try (PinotDataBuffer onHeapDataBuffer = 
PinotDataBuffer.mapReadOnlyBigEndianFile(onHeapIndexFile);
          PinotDataBuffer offHeapDataBuffer = 
PinotDataBuffer.mapReadOnlyBigEndianFile(offHeapIndexFile);
-         JsonIndexReader onHeapIndexReader = new 
ImmutableJsonIndexReader(onHeapDataBuffer, records.length);
-         JsonIndexReader offHeapIndexReader = new 
ImmutableJsonIndexReader(offHeapDataBuffer, records.length);
+         ImmutableJsonIndexReader onHeapIndexReader = new 
ImmutableJsonIndexReader(onHeapDataBuffer, records.length);
+         ImmutableJsonIndexReader offHeapIndexReader = new 
ImmutableJsonIndexReader(offHeapDataBuffer, records.length);
          MutableJsonIndexImpl mutableJsonIndex = new 
MutableJsonIndexImpl(jsonIndexConfig)) {
       for (String record : records) {
         mutableJsonIndex.add(record);
       }
 
       for (int i = 0; i < keys.length; i++) {
-        Map<String, RoaringBitmap> onHeapRes = 
onHeapIndexReader.getMatchingDocsMap(keys[i]);
-        Map<String, RoaringBitmap> offHeapRes = 
offHeapIndexReader.getMatchingDocsMap(keys[i]);
-        Map<String, RoaringBitmap> mutableRes = 
mutableJsonIndex.getMatchingDocsMap(keys[i]);
+        Map<String, RoaringBitmap> onHeapRes = 
onHeapIndexReader.getMatchingFlattenedDocsMap(keys[i]);
+        onHeapIndexReader.convertFlattenedDocIdsToDocIds(onHeapRes);
+        Map<String, RoaringBitmap> offHeapRes = 
offHeapIndexReader.getMatchingFlattenedDocsMap(keys[i]);
+        offHeapIndexReader.convertFlattenedDocIdsToDocIds(offHeapRes);
+        Map<String, RoaringBitmap> mutableRes = 
mutableJsonIndex.getMatchingFlattenedDocsMap(keys[i]);
+        mutableJsonIndex.convertFlattenedDocIdsToDocIds(mutableRes);
         Assert.assertEquals(expected.get(i), onHeapRes);
         Assert.assertEquals(expected.get(i), offHeapRes);
         Assert.assertEquals(mutableRes, expected.get(i));
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/JsonIndexReader.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/JsonIndexReader.java
index be80930a88..73ef8450ee 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/JsonIndexReader.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/JsonIndexReader.java
@@ -35,16 +35,35 @@ public interface JsonIndexReader extends IndexReader {
   MutableRoaringBitmap getMatchingDocIds(String filterString);
 
   /**
-   * For an array of docIds and context specific to a JSON key, returns the 
corresponding values for each docId. The
-   * context should be created from the getMatchingDocsMap method.
-   *
-   * @return String[] where String[i] is the value for docIds[i]
+   * For an array of docIds and context specific to a JSON key, returns the 
corresponding sv value for each docId.
+   * @param docIds array of docIds
+   * @param length length of the array
+   * @param matchingValueToDocs Map from each unique value for the jsonPathKey 
value to the flattened docId
+   *                                     posting list
+   * @param isFlattenedDocIds whether the docIds are flattened or unflattened
+   * @return String[] where String[i] is the sv value for docIds[i]
    */
-  String[] getValuesForKeyAndDocs(int[] docIds, Map<String, RoaringBitmap> 
context);
+  String[] getValuesSV(int[] docIds, int length, Map<String, RoaringBitmap> 
matchingValueToDocs,
+      boolean isFlattenedDocIds);
 
   /**
-   * For a JSON key, returns a Map from each value to the docId posting list. 
This map should be  used to avoid reading
-   * and converting the posting list of flattened docIds to real docIds
+   * For an array of docIds and context specific to a JSON key, returns the 
corresponding mv array for each docId.
+   * @param docIds array of docIds
+   * @param length length of the array
+   * @param matchingValueToFlattenedDocs Map from each unique value for the 
jsonPathKey value to the flattened docId
+   *                                     posting list
+   * @return String[][] where String[i] is the mv array for docIds[i]
    */
-  Map<String, RoaringBitmap> getMatchingDocsMap(String key);
+  String[][] getValuesMV(int[] docIds, int length, Map<String, RoaringBitmap> 
matchingValueToFlattenedDocs);
+
+  /**
+   * For a JSON key, returns a Map from each value to the flattened docId 
posting list. This map should be used to
+   * avoid reading and converting the posting list of flattened docIds to real 
docIds
+   */
+  Map<String, RoaringBitmap> getMatchingFlattenedDocsMap(String key);
+
+  /**
+   * Converts the flattened docIds to real docIds using the map returned by 
getMatchingFlattenedDocsMap
+   */
+  void convertFlattenedDocIdsToDocIds(Map<String, RoaringBitmap> 
flattenedDocIdsMap);
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to