This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c6521a5ccd Add JsonIndexDistinct benchmark and optimize same-path 
JSON_MATCH (#17921)
0c6521a5ccd is described below

commit 0c6521a5ccdb26443e042c4bc2a6f4ec6a5e1a5c
Author: Xiang Fu <[email protected]>
AuthorDate: Fri Apr 3 00:00:36 2026 -0700

    Add JsonIndexDistinct benchmark and optimize same-path JSON_MATCH (#17921)
    
    * Add JsonIndexDistinct benchmark and optimize same-path JSON_MATCH
    
    * Optimize JsonIndexDistinctOperator with single-pass dictionary scan for 
same-path filters
    
    For fully-pushed-down same-path JSON_MATCH predicates, the previous 
implementation
    scanned the dictionary twice (once for filter evaluation, once for 
value-map building)
    and materialized per-value posting list bitmaps + doc ID conversions that 
were never
    used. This adds getMatchingDistinctValues() to JsonIndexReader SPI with a 
fused
    single-pass implementation in ImmutableJsonIndexReader that evaluates the 
predicate
    directly on dictionary value strings — zero posting list reads, zero bitmap 
operations,
    zero doc ID mapping lookups.
    
    Benchmark shows 12x-337x speedup across all cardinality/selectivity 
combinations,
    eliminating the previous regression at high cardinality + low selectivity.
    
    Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
    
    * Add extra-filter benchmark variants for cross-path filter (non-fast-path) 
comparison
    
    Adds THREE_ARG_EXTRA_FILTER and FOUR_ARG_EXTRA_FILTER query variants that 
combine
    the same-path REGEXP_LIKE with a cross-path JSON_MATCH on $.cluster, 
preventing the
    fully-pushed-down fast path. This exercises the bitmap-based code path and 
confirms
    it still outperforms baseline in most configurations (1.2x-4.2x).
    
    Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
    
    ---------
    
    Co-authored-by: Claude Opus 4.6 (1M context) <[email protected]>
---
 .../operator/query/JsonIndexDistinctOperator.java  | 235 +++++++++---
 .../query/JsonIndexDistinctOperatorTest.java       | 306 +++++++++++++++
 .../integration/tests/custom/JsonPathTest.java     | 245 ++++++++++++
 .../pinot/perf/BenchmarkJsonIndexDistinct.java     | 414 +++++++++++++++++++++
 .../readers/json/ImmutableJsonIndexReader.java     | 194 ++++++++++
 .../segment/spi/index/reader/JsonIndexReader.java  |  14 +
 6 files changed, 1346 insertions(+), 62 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/JsonIndexDistinctOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/JsonIndexDistinctOperator.java
index 875ca471edb..b5d5e60c400 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/JsonIndexDistinctOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/JsonIndexDistinctOperator.java
@@ -24,10 +24,15 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.function.JsonPathCache;
 import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FilterContext;
 import org.apache.pinot.common.request.context.OrderByExpressionContext;
+import org.apache.pinot.common.request.context.RequestContextUtils;
+import org.apache.pinot.common.request.context.predicate.JsonMatchPredicate;
+import org.apache.pinot.common.request.context.predicate.Predicate;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.core.common.Operator;
@@ -54,12 +59,17 @@ import org.apache.pinot.segment.spi.index.IndexType;
 import org.apache.pinot.segment.spi.index.reader.JsonIndexReader;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.query.QueryThreadContext;
+import org.apache.pinot.sql.parsers.CalciteSqlParser;
 import org.roaringbitmap.RoaringBitmap;
 
 
 /**
- * Distinct operator that uses the JSON index value→docId map directly instead 
of scanning documents.
- * Avoids the projection/transform pipeline for SELECT DISTINCT 
jsonExtractIndex(...).
+ * Distinct operator for the scalar {@code jsonExtractIndex(column, path, 
type[, defaultValue])} form.
+ *
+ * <p>Execution flow:
+ * 1. Push a same-path {@code JSON_MATCH} predicate into the JSON-index lookup 
when it cannot match missing paths.
+ * 2. Convert matching flattened doc ids back to segment doc ids.
+ * 3. Apply any remaining row-level filter and materialize DISTINCT results, 
including missing-path handling.
  */
 public class JsonIndexDistinctOperator extends 
BaseOperator<DistinctResultsBlock> {
   private static final String EXPLAIN_NAME = "DISTINCT_JSON_INDEX";
@@ -91,10 +101,29 @@ public class JsonIndexDistinctOperator extends 
BaseOperator<DistinctResultsBlock
     ExpressionContext expr = expressions.get(0);
     ParsedJsonExtractIndex parsed = parseJsonExtractIndex(expr);
     if (parsed == null) {
-      throw new IllegalStateException("Expected jsonExtractIndex expression");
+      throw new IllegalStateException("Expected 3/4-arg scalar 
jsonExtractIndex expression");
     }
 
-    // Evaluate the filter first so we can skip the (potentially expensive) 
index map when no docs match
+    DataSource dataSource = _indexSegment.getDataSource(parsed._columnName, 
_queryContext.getSchema());
+    JsonIndexReader jsonIndexReader = getJsonIndexReader(dataSource);
+    if (jsonIndexReader == null) {
+      throw new IllegalStateException("Column " + parsed._columnName + " has 
no JSON index");
+    }
+
+    String pushedDownFilterJson = extractSamePathJsonMatchFilter(parsed, 
_queryContext.getFilter());
+    boolean filterFullyPushedDown = pushedDownFilterJson != null
+        && isOnlySamePathJsonMatchFilter(parsed, _queryContext.getFilter())
+        && !jsonMatchFilterCanMatchMissingPath(pushedDownFilterJson);
+
+    // Fast path: when the filter is fully pushed down into the JSON index, we 
only need the distinct value strings.
+    // This avoids reading posting lists, building per-value bitmaps, and 
converting flattened doc IDs.
+    if (filterFullyPushedDown) {
+      Set<String> distinctValues = jsonIndexReader.getMatchingDistinctValues(
+          parsed._jsonPathString, pushedDownFilterJson);
+      return buildDistinctResultsFromValues(expr, parsed, distinctValues);
+    }
+
+    // Evaluate the filter first so we can skip the (potentially expensive) 
index map when no docs match.
     RoaringBitmap filteredDocIds = buildFilteredDocIds();
     if (filteredDocIds != null && filteredDocIds.isEmpty()) {
       ColumnDataType earlyColumnDataType = 
ColumnDataType.fromDataTypeSV(parsed._dataType);
@@ -107,18 +136,46 @@ public class JsonIndexDistinctOperator extends 
BaseOperator<DistinctResultsBlock
           createDistinctTable(earlyDataSchema, parsed._dataType, 
earlyOrderBy), _queryContext);
     }
 
-    DataSource dataSource = _indexSegment.getDataSource(parsed._columnName, 
_queryContext.getSchema());
-    JsonIndexReader jsonIndexReader = getJsonIndexReader(dataSource);
-    if (jsonIndexReader == null) {
-      throw new IllegalStateException("Column " + parsed._columnName + " has 
no JSON index");
-    }
-
-    // Same logic as 
JsonExtractIndexTransformFunction.getValueToMatchingDocsMap()
+    // All other WHERE filters remain row-level and are applied after 
converting flattened doc IDs to real doc IDs.
     Map<String, RoaringBitmap> valueToMatchingDocs =
-        jsonIndexReader.getMatchingFlattenedDocsMap(parsed._jsonPathString, 
parsed._filterExpression);
+        jsonIndexReader.getMatchingFlattenedDocsMap(parsed._jsonPathString, 
pushedDownFilterJson);
+
     // Always single-value (MV _ARRAY is rejected in parseJsonExtractIndex)
     jsonIndexReader.convertFlattenedDocIdsToDocIds(valueToMatchingDocs);
+    return buildDistinctResultsBlock(expr, parsed, valueToMatchingDocs, 
filteredDocIds,
+        filteredDocIds == null);
+  }
 
+  private DistinctResultsBlock 
buildDistinctResultsFromValues(ExpressionContext expr, ParsedJsonExtractIndex 
parsed,
+      Set<String> distinctValues) {
+    ColumnDataType columnDataType = 
ColumnDataType.fromDataTypeSV(parsed._dataType);
+    DataSchema dataSchema = new DataSchema(
+        new String[]{expr.toString()},
+        new ColumnDataType[]{columnDataType});
+    OrderByExpressionContext orderByExpression = 
_queryContext.getOrderByExpressions() != null
+        ? _queryContext.getOrderByExpressions().get(0) : null;
+    DistinctTable distinctTable = createDistinctTable(dataSchema, 
parsed._dataType, orderByExpression);
+    int limit = _queryContext.getLimit();
+
+    for (String value : distinctValues) {
+      _numEntriesExamined++;
+      
QueryThreadContext.checkTerminationAndSampleUsagePeriodically(_numEntriesExamined,
 EXPLAIN_NAME);
+
+      boolean done = addValueToDistinctTable(distinctTable, value, 
parsed._dataType, orderByExpression);
+      if (done) {
+        break;
+      }
+      if (orderByExpression == null && distinctTable.hasLimit() && 
distinctTable.size() >= limit) {
+        break;
+      }
+    }
+
+    return new DistinctResultsBlock(distinctTable, _queryContext);
+  }
+
+  private DistinctResultsBlock buildDistinctResultsBlock(ExpressionContext 
expr, ParsedJsonExtractIndex parsed,
+      Map<String, RoaringBitmap> valueToMatchingDocs, @Nullable RoaringBitmap 
filteredDocIds,
+      boolean allDocsSelected) {
     ColumnDataType columnDataType = 
ColumnDataType.fromDataTypeSV(parsed._dataType);
     DataSchema dataSchema = new DataSchema(
         new String[]{expr.toString()},
@@ -129,32 +186,30 @@ public class JsonIndexDistinctOperator extends 
BaseOperator<DistinctResultsBlock
 
     int limit = _queryContext.getLimit();
     int totalDocs = _indexSegment.getSegmentMetadata().getTotalDocs();
-    // Track uncovered docs: for the no-filter case, build a union and compare 
against totalDocs.
-    // For the filtered case, use a "remaining" bitmap that shrinks in-place 
(no per-value allocation).
-    RoaringBitmap allMatchedDocs = filteredDocIds == null ? new 
RoaringBitmap() : null;
+    RoaringBitmap coveredDocs = allDocsSelected ? new RoaringBitmap() : null;
     RoaringBitmap remainingDocs = filteredDocIds != null ? 
filteredDocIds.clone() : null;
-    boolean allDocsCovered = filteredDocIds == null ? (totalDocs == 0) : 
filteredDocIds.isEmpty();
+    boolean allDocsCovered = filteredDocIds == null ? !allDocsSelected || 
totalDocs == 0 : filteredDocIds.isEmpty();
     boolean earlyBreak = false;
 
     for (Map.Entry<String, RoaringBitmap> entry : 
valueToMatchingDocs.entrySet()) {
       _numEntriesExamined++;
       
QueryThreadContext.checkTerminationAndSampleUsagePeriodically(_numEntriesExamined,
 EXPLAIN_NAME);
+
       String value = entry.getKey();
       RoaringBitmap docIds = entry.getValue();
 
       boolean includeValue;
       if (filteredDocIds == null) {
         includeValue = true;
-        // Build union for uncovered-docs detection; short-circuit once all 
segment docs are covered
-        if (!allDocsCovered) {
-          allMatchedDocs.or(docIds);
-          if (allMatchedDocs.getLongCardinality() >= totalDocs) {
+        if (!allDocsCovered && allDocsSelected) {
+          coveredDocs.or(docIds);
+          if (coveredDocs.getLongCardinality() >= totalDocs) {
             allDocsCovered = true;
           }
         }
       } else {
         includeValue = RoaringBitmap.intersects(docIds, filteredDocIds);
-        // Remove matched docs from remaining set in-place (no allocation per 
value)
+        // Remove matched docs from remaining set in-place (no allocation per 
value).
         if (!allDocsCovered && includeValue) {
           remainingDocs.andNot(docIds);
           if (remainingDocs.isEmpty()) {
@@ -177,25 +232,92 @@ public class JsonIndexDistinctOperator extends 
BaseOperator<DistinctResultsBlock
       }
     }
 
-    // Handle docs not covered by any value in the index.
-    // Baseline JsonExtractIndexTransformFunction throws when a doc is missing 
the path and no
-    // defaultValue is provided. Match that behavior here unless nullHandling 
is enabled.
-    // allDocsCovered tracks coverage precisely (against totalDocs or 
filteredDocIds cardinality).
     if (!earlyBreak && !allDocsCovered) {
-      if (parsed._defaultValue != null) {
-        addValueToDistinctTable(distinctTable, parsed._defaultValue, 
parsed._dataType, orderByExpression);
-      } else if (_queryContext.isNullHandlingEnabled()) {
-        distinctTable.addNull();
-      } else {
-        throw new RuntimeException(
-            String.format("Illegal Json Path: [%s], for some docIds in segment 
[%s]",
-                parsed._jsonPathString, _indexSegment.getSegmentName()));
-      }
+      handleMissingDocs(distinctTable, parsed, orderByExpression);
     }
 
     return new DistinctResultsBlock(distinctTable, _queryContext);
   }
 
+  private void handleMissingDocs(DistinctTable distinctTable, 
ParsedJsonExtractIndex parsed,
+      @Nullable OrderByExpressionContext orderByExpression) {
+    if (parsed._defaultValueLiteral != null) {
+      addValueToDistinctTable(distinctTable, parsed._defaultValueLiteral, 
parsed._dataType, orderByExpression);
+    } else if (_queryContext.isNullHandlingEnabled()) {
+      distinctTable.addNull();
+    } else {
+      throw new RuntimeException(
+          String.format("Illegal Json Path: [%s], for some docIds in segment 
[%s]",
+              parsed._jsonPathString, _indexSegment.getSegmentName()));
+    }
+  }
+
+  @Nullable
+  private static String extractSamePathJsonMatchFilter(ParsedJsonExtractIndex 
parsed, @Nullable FilterContext filter) {
+    if (filter == null) {
+      return null;
+    }
+    switch (filter.getType()) {
+      case PREDICATE:
+        return extractSamePathJsonMatchFilter(parsed, filter.getPredicate());
+      case AND:
+        String matchingFilter = null;
+        for (FilterContext child : filter.getChildren()) {
+          String childFilter = extractSamePathJsonMatchFilter(parsed, child);
+          if (childFilter == null) {
+            continue;
+          }
+          if (matchingFilter != null) {
+            return null;
+          }
+          matchingFilter = childFilter;
+        }
+        return matchingFilter;
+      default:
+        return null;
+    }
+  }
+
+  private static boolean isOnlySamePathJsonMatchFilter(ParsedJsonExtractIndex 
parsed, @Nullable FilterContext filter) {
+    if (filter == null || filter.getType() != FilterContext.Type.PREDICATE) {
+      return false;
+    }
+    return extractSamePathJsonMatchFilter(parsed, filter.getPredicate()) != 
null;
+  }
+
+  private static boolean jsonMatchFilterCanMatchMissingPath(String 
filterJsonString) {
+    try {
+      FilterContext filter = 
RequestContextUtils.getFilter(CalciteSqlParser.compileToExpression(filterJsonString));
+      return filter.getType() == FilterContext.Type.PREDICATE
+          && filter.getPredicate().getType() == Predicate.Type.IS_NULL;
+    } catch (Exception e) {
+      return false;
+    }
+  }
+
+  @Nullable
+  private static String extractSamePathJsonMatchFilter(ParsedJsonExtractIndex 
parsed, Predicate predicate) {
+    if (!(predicate instanceof JsonMatchPredicate)) {
+      return null;
+    }
+    ExpressionContext lhs = predicate.getLhs();
+    if (lhs.getType() != ExpressionContext.Type.IDENTIFIER
+        || !parsed._columnName.equals(lhs.getIdentifier())) {
+      return null;
+    }
+    String filterJsonString = ((JsonMatchPredicate) predicate).getValue();
+    int start = filterJsonString.indexOf('"');
+    if (start < 0) {
+      return null;
+    }
+    int end = filterJsonString.indexOf('"', start + 1);
+    if (end < 0) {
+      return null;
+    }
+    String filterPath = filterJsonString.substring(start + 1, end);
+    return parsed._jsonPathString.equals(filterPath) ? filterJsonString : null;
+  }
+
   private DistinctTable createDistinctTable(DataSchema dataSchema, 
FieldSpec.DataType dataType,
       @Nullable OrderByExpressionContext orderByExpression) {
     int limit = _queryContext.getLimit();
@@ -380,14 +502,15 @@ public class JsonIndexDistinctOperator extends 
BaseOperator<DistinctResultsBlock
       return null;
     }
     List<ExpressionContext> args = expr.getFunction().getArguments();
-    if (args.size() < 3 || args.size() > 5) {
+    if (args.size() != 3 && args.size() != 4) {
       return null;
     }
     if (args.get(0).getType() != ExpressionContext.Type.IDENTIFIER) {
       return null;
     }
     if (args.get(1).getType() != ExpressionContext.Type.LITERAL
-        || args.get(2).getType() != ExpressionContext.Type.LITERAL) {
+        || args.get(2).getType() != ExpressionContext.Type.LITERAL
+        || (args.size() == 4 && args.get(3).getType() != 
ExpressionContext.Type.LITERAL)) {
       return null;
     }
 
@@ -428,23 +551,17 @@ public class JsonIndexDistinctOperator extends 
BaseOperator<DistinctResultsBlock
       return null;
     }
 
-    String defaultValue = null;
-    if (args.size() >= 4) {
-      if (args.get(3).getType() != ExpressionContext.Type.LITERAL) {
+    String defaultValueLiteral = null;
+    if (args.size() == 4) {
+      defaultValueLiteral = args.get(3).getLiteral().getStringValue();
+      try {
+        dataType.convert(defaultValueLiteral);
+      } catch (Exception e) {
         return null;
       }
-      defaultValue = args.get(3).getLiteral().getStringValue();
     }
 
-    String filterExpression = null;
-    if (args.size() == 5) {
-      if (args.get(4).getType() != ExpressionContext.Type.LITERAL) {
-        return null;
-      }
-      filterExpression = args.get(4).getLiteral().getStringValue();
-    }
-
-    return new ParsedJsonExtractIndex(columnName, jsonPathString, dataType, 
defaultValue, filterExpression);
+    return new ParsedJsonExtractIndex(columnName, jsonPathString, dataType, 
defaultValueLiteral);
   }
 
   private static final class ParsedJsonExtractIndex {
@@ -452,17 +569,14 @@ public class JsonIndexDistinctOperator extends 
BaseOperator<DistinctResultsBlock
     final String _jsonPathString;
     final FieldSpec.DataType _dataType;
     @Nullable
-    final String _defaultValue;
-    @Nullable
-    final String _filterExpression;
+    final String _defaultValueLiteral;
 
-    ParsedJsonExtractIndex(String columnName, String jsonPathString,
-        FieldSpec.DataType dataType, @Nullable String defaultValue, @Nullable 
String filterExpression) {
+    ParsedJsonExtractIndex(String columnName, String jsonPathString, 
FieldSpec.DataType dataType,
+        @Nullable String defaultValueLiteral) {
       _columnName = columnName;
       _jsonPathString = jsonPathString;
       _dataType = dataType;
-      _defaultValue = defaultValue;
-      _filterExpression = filterExpression;
+      _defaultValueLiteral = defaultValueLiteral;
     }
   }
 
@@ -506,9 +620,9 @@ public class JsonIndexDistinctOperator extends 
BaseOperator<DistinctResultsBlock
   }
 
   /**
-   * Returns true if the expression is jsonExtractIndex on a column with JSON 
index and the path is indexed.
-   * For OSS JSON index all paths are indexed. For composite JSON index, only 
paths in invertedIndexConfigs
-   * are indexed per key.
+   * Returns true if the expression is the 3/4-arg scalar jsonExtractIndex 
form on a column with JSON index and the
+   * path is indexed. For OSS JSON index all paths are indexed. For composite 
JSON index, only paths in
+   * invertedIndexConfigs are indexed per key.
    */
   public static boolean canUseJsonIndexDistinct(IndexSegment indexSegment, 
ExpressionContext expr) {
     ParsedJsonExtractIndex parsed = parseJsonExtractIndex(expr);
@@ -526,9 +640,6 @@ public class JsonIndexDistinctOperator extends 
BaseOperator<DistinctResultsBlock
     if (!reader.isPathIndexed(parsed._jsonPathString)) {
       return false;
     }
-    // The 5th arg (_filterExpression) is a JSON filter expression, not a 
plain JSON path,
-    // so isPathIndexed() is not appropriate for it. The reader's 
getMatchingFlattenedDocsMap()
-    // handles filter expressions internally.
     return true;
   }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/operator/query/JsonIndexDistinctOperatorTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/operator/query/JsonIndexDistinctOperatorTest.java
new file mode 100644
index 00000000000..bb8431d0707
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/operator/query/JsonIndexDistinctOperatorTest.java
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.query;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.blocks.results.DistinctResultsBlock;
+import org.apache.pinot.core.operator.filter.BaseFilterOperator;
+import org.apache.pinot.core.operator.filter.BitmapCollection;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import 
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.SegmentContext;
+import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.index.reader.JsonIndexReader;
+import org.roaringbitmap.RoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.expectThrows;
+
+
+/**
+ * Unit tests for {@link JsonIndexDistinctOperator}.
+ */
+public class JsonIndexDistinctOperatorTest {
+  private static final String STRING_EXTRACT = "JSON_EXTRACT_INDEX(tags, 
'$.instance', 'STRING')";
+  private static final String STRING_EXTRACT_WITH_EMPTY_DEFAULT =
+      "JSON_EXTRACT_INDEX(tags, '$.instance', 'STRING', '')";
+  private static final String STRING_EXTRACT_WITH_DEFAULT =
+      "JSON_EXTRACT_INDEX(tags, '$.instance', 'STRING', 'missing')";
+  private static final String INVALID_INT_DEFAULT_EXTRACT =
+      "JSON_EXTRACT_INDEX(tags, '$.instance', 'INT', 'abc')";
+  private static final String SAME_PATH_FILTER = "REGEXP_LIKE(\"$.instance\", 
'.*test.*')";
+  private static final String CROSS_PATH_FILTER = "REGEXP_LIKE(\"$.env\", 
'prod.*')";
+  private static final String SAME_PATH_IS_NULL_FILTER = "\"$.instance\" IS 
NULL";
+
+  @Test
+  public void 
testSamePathJsonMatchUsesDistinctValuesFastPathForFourArgScalarForm() {
+    QueryContext queryContext = 
distinctQuery(STRING_EXTRACT_WITH_EMPTY_DEFAULT, SAME_PATH_FILTER);
+
+    JsonIndexReader jsonIndexReader = mock(JsonIndexReader.class);
+    when(jsonIndexReader.getMatchingDistinctValues("$.instance", 
SAME_PATH_FILTER))
+        .thenReturn(Set.of("test-east", "test-west"));
+
+    DistinctResultsBlock resultsBlock =
+        buildOperator(queryContext, jsonIndexReader, bufferBitmap(0, 1), 
2).nextBlock();
+
+    assertEquals(extractValues(resultsBlock), Set.of("test-east", 
"test-west"));
+    verify(jsonIndexReader).getMatchingDistinctValues("$.instance", 
SAME_PATH_FILTER);
+    verify(jsonIndexReader, never()).getMatchingFlattenedDocsMap(any(), any());
+    verify(jsonIndexReader, never()).convertFlattenedDocIdsToDocIds(any());
+  }
+
+  @Test
+  public void 
testSamePathJsonMatchUsesDistinctValuesFastPathForThreeArgScalarForm() {
+    QueryContext queryContext = distinctQuery(STRING_EXTRACT, 
SAME_PATH_FILTER);
+
+    JsonIndexReader jsonIndexReader = mock(JsonIndexReader.class);
+    when(jsonIndexReader.getMatchingDistinctValues("$.instance", 
SAME_PATH_FILTER))
+        .thenReturn(Set.of("test-east", "test-west"));
+
+    DistinctResultsBlock resultsBlock =
+        buildOperator(queryContext, jsonIndexReader, bufferBitmap(0, 1), 
2).nextBlock();
+
+    assertEquals(extractValues(resultsBlock), Set.of("test-east", 
"test-west"));
+    verify(jsonIndexReader).getMatchingDistinctValues("$.instance", 
SAME_PATH_FILTER);
+    verify(jsonIndexReader, never()).getMatchingFlattenedDocsMap(any(), any());
+    verify(jsonIndexReader, never()).convertFlattenedDocIdsToDocIds(any());
+  }
+
+  @Test
+  public void testDifferentPathJsonMatchIsAppliedAtDocLevel() {
+    QueryContext queryContext = distinctQuery(STRING_EXTRACT, 
CROSS_PATH_FILTER);
+
+    JsonIndexReader jsonIndexReader = mock(JsonIndexReader.class);
+    Map<String, RoaringBitmap> flattenedDocsByValue = new HashMap<>();
+    flattenedDocsByValue.put("prod-a", bitmap(100));
+    flattenedDocsByValue.put("prod-b", bitmap(200));
+    flattenedDocsByValue.put("other-doc", bitmap(300));
+    when(jsonIndexReader.getMatchingFlattenedDocsMap("$.instance", 
null)).thenReturn(flattenedDocsByValue);
+    stubConvertedDocIds(jsonIndexReader, Map.of("prod-a", bitmap(0), "prod-b", 
bitmap(1), "other-doc", bitmap(2)));
+
+    DistinctResultsBlock resultsBlock =
+        buildOperator(queryContext, jsonIndexReader, bufferBitmap(0, 1), 
3).nextBlock();
+
+    assertEquals(extractValues(resultsBlock), Set.of("prod-a", "prod-b"));
+    verify(jsonIndexReader).getMatchingFlattenedDocsMap("$.instance", null);
+    verify(jsonIndexReader, never()).getMatchingFlattenedDocsMap("$.instance",
+        "REGEXP_LIKE(\"$.env\", ''prod.*'')");
+    verify(jsonIndexReader).convertFlattenedDocIdsToDocIds(any());
+  }
+
+  @Test
+  public void testCanUseJsonIndexDistinctAllowsThreeArgScalarForm() {
+    QueryContext queryContext = distinctQuery(STRING_EXTRACT, 
CROSS_PATH_FILTER);
+
+    JsonIndexReader jsonIndexReader = mock(JsonIndexReader.class);
+    when(jsonIndexReader.isPathIndexed("$.instance")).thenReturn(true);
+    IndexSegment indexSegment = buildCanUseIndexSegment(jsonIndexReader);
+
+    assertTrue(JsonIndexDistinctOperator.canUseJsonIndexDistinct(indexSegment,
+        queryContext.getSelectExpressions().get(0)));
+  }
+
+  @Test
+  public void testCanUseJsonIndexDistinctAllowsFourArgScalarForm() {
+    QueryContext queryContext = 
distinctQuery(STRING_EXTRACT_WITH_EMPTY_DEFAULT, CROSS_PATH_FILTER);
+
+    JsonIndexReader jsonIndexReader = mock(JsonIndexReader.class);
+    when(jsonIndexReader.isPathIndexed("$.instance")).thenReturn(true);
+    IndexSegment indexSegment = buildCanUseIndexSegment(jsonIndexReader);
+
+    assertTrue(JsonIndexDistinctOperator.canUseJsonIndexDistinct(indexSegment,
+        queryContext.getSelectExpressions().get(0)));
+  }
+
+  @Test
+  public void testCanUseJsonIndexDistinctRejectsInvalidDefaultArgument() {
+    QueryContext queryContext = distinctQuery(INVALID_INT_DEFAULT_EXTRACT, 
CROSS_PATH_FILTER);
+
+    JsonIndexReader jsonIndexReader = mock(JsonIndexReader.class);
+    when(jsonIndexReader.isPathIndexed("$.instance")).thenReturn(true);
+    IndexSegment indexSegment = buildCanUseIndexSegment(jsonIndexReader);
+
+    assertFalse(JsonIndexDistinctOperator.canUseJsonIndexDistinct(indexSegment,
+        queryContext.getSelectExpressions().get(0)));
+  }
+
+  @Test
+  public void testFourArgAddsDefaultForDocsWithoutJsonPath() {
+    QueryContext queryContext = distinctQuery(STRING_EXTRACT_WITH_DEFAULT, 
CROSS_PATH_FILTER);
+
+    JsonIndexReader jsonIndexReader = mock(JsonIndexReader.class);
+    Map<String, RoaringBitmap> flattenedDocsByValue = new HashMap<>();
+    flattenedDocsByValue.put("prod-a", bitmap(100));
+    flattenedDocsByValue.put("prod-b", bitmap(200));
+    when(jsonIndexReader.getMatchingFlattenedDocsMap("$.instance", 
null)).thenReturn(flattenedDocsByValue);
+    stubConvertedDocIds(jsonIndexReader, Map.of("prod-a", bitmap(0), "prod-b", 
bitmap(1)));
+
+    DistinctResultsBlock resultsBlock =
+        buildOperator(queryContext, jsonIndexReader, bufferBitmap(0, 1, 2), 
3).nextBlock();
+
+    assertEquals(extractValues(resultsBlock), Set.of("prod-a", "prod-b", 
"missing"));
+  }
+
+  @Test
+  public void testSamePathIsNullStillAddsDefaultForMissingPath() {
+    QueryContext queryContext = distinctQuery(STRING_EXTRACT_WITH_DEFAULT, 
SAME_PATH_IS_NULL_FILTER);
+
+    JsonIndexReader jsonIndexReader = mock(JsonIndexReader.class);
+    when(jsonIndexReader.getMatchingFlattenedDocsMap("$.instance", 
SAME_PATH_IS_NULL_FILTER)).thenReturn(
+        new HashMap<>());
+
+    DistinctResultsBlock resultsBlock =
+        buildOperator(queryContext, jsonIndexReader, bufferBitmap(2), 
3).nextBlock();
+
+    assertEquals(extractValues(resultsBlock), Set.of("missing"));
+    verify(jsonIndexReader).getMatchingFlattenedDocsMap("$.instance", 
SAME_PATH_IS_NULL_FILTER);
+    verify(jsonIndexReader).convertFlattenedDocIdsToDocIds(any());
+  }
+
+  @Test
+  public void testMissingPathWithoutDefaultThrows() {
+    QueryContext queryContext = distinctQuery(STRING_EXTRACT, 
SAME_PATH_IS_NULL_FILTER);
+
+    JsonIndexReader jsonIndexReader = mock(JsonIndexReader.class);
+    when(jsonIndexReader.getMatchingFlattenedDocsMap("$.instance", 
SAME_PATH_IS_NULL_FILTER)).thenReturn(
+        new HashMap<>());
+
+    RuntimeException exception = expectThrows(RuntimeException.class,
+        () -> buildOperator(queryContext, jsonIndexReader, bufferBitmap(2), 
3).nextBlock());
+
+    assertTrue(exception.getMessage().contains("Illegal Json Path"));
+  }
+
+  private static QueryContext distinctQuery(String expression, String 
filterJsonString) {
+    return QueryContextConverterUtils.getQueryContext(
+        "SELECT DISTINCT " + expression + " AS tag_value FROM myTable WHERE 
JSON_MATCH(tags, '"
+            + filterJsonString.replace("'", "''") + "')");
+  }
+
+  private static void stubConvertedDocIds(JsonIndexReader jsonIndexReader,
+      Map<String, RoaringBitmap> convertedDocIds) {
+    doAnswer(invocation -> {
+      @SuppressWarnings("unchecked")
+      Map<String, RoaringBitmap> docsByValue = (Map<String, RoaringBitmap>) 
invocation.getArgument(0);
+      docsByValue.clear();
+      docsByValue.putAll(convertedDocIds);
+      return null;
+    }).when(jsonIndexReader).convertFlattenedDocIdsToDocIds(any());
+  }
+
+  private static IndexSegment buildCanUseIndexSegment(JsonIndexReader 
jsonIndexReader) {
+    DataSource dataSource = mock(DataSource.class);
+    when(dataSource.getJsonIndex()).thenReturn(jsonIndexReader);
+
+    IndexSegment indexSegment = mock(IndexSegment.class);
+    when(indexSegment.getDataSourceNullable("tags")).thenReturn(dataSource);
+    return indexSegment;
+  }
+
+  private static JsonIndexDistinctOperator buildOperator(QueryContext 
queryContext, JsonIndexReader jsonIndexReader,
+      MutableRoaringBitmap filterBitmap, int numDocs) {
+    SegmentMetadata segmentMetadata = mock(SegmentMetadata.class);
+    when(segmentMetadata.getTotalDocs()).thenReturn(numDocs);
+
+    DataSource dataSource = mock(DataSource.class);
+    when(dataSource.getJsonIndex()).thenReturn(jsonIndexReader);
+
+    IndexSegment indexSegment = mock(IndexSegment.class);
+    when(indexSegment.getSegmentMetadata()).thenReturn(segmentMetadata);
+    when(indexSegment.getSegmentName()).thenReturn("testSegment");
+    when(indexSegment.getDataSource(eq("tags"), any())).thenReturn(dataSource);
+    when(indexSegment.getDataSourceNullable("tags")).thenReturn(dataSource);
+
+    return new JsonIndexDistinctOperator(indexSegment, new 
SegmentContext(indexSegment), queryContext,
+        new StaticBitmapFilterOperator(numDocs, filterBitmap));
+  }
+
+  private static RoaringBitmap bitmap(int... docIds) {
+    RoaringBitmap bitmap = new RoaringBitmap();
+    for (int docId : docIds) {
+      bitmap.add(docId);
+    }
+    return bitmap;
+  }
+
+  private static MutableRoaringBitmap bufferBitmap(int... docIds) {
+    MutableRoaringBitmap bitmap = new MutableRoaringBitmap();
+    for (int docId : docIds) {
+      bitmap.add(docId);
+    }
+    return bitmap;
+  }
+
+  private static Set<String> extractValues(DistinctResultsBlock resultsBlock) {
+    List<Object[]> rows = resultsBlock.getRows();
+    return rows.stream().map(row -> (String) 
row[0]).collect(Collectors.toSet());
+  }
+
+  private static final class StaticBitmapFilterOperator extends 
BaseFilterOperator {
+    private final MutableRoaringBitmap _bitmap;
+
+    StaticBitmapFilterOperator(int numDocs, MutableRoaringBitmap bitmap) {
+      super(numDocs, false);
+      _bitmap = bitmap;
+    }
+
+    @Override
+    public boolean canProduceBitmaps() {
+      return true;
+    }
+
+    @Override
+    public BitmapCollection getBitmaps() {
+      return new BitmapCollection(_numDocs, false, _bitmap);
+    }
+
+    @Override
+    public List<Operator> getChildOperators() {
+      return List.of();
+    }
+
+    @Override
+    protected org.apache.pinot.core.common.BlockDocIdSet getTrues() {
+      throw new UnsupportedOperationException("Bitmap path only");
+    }
+
+    @Override
+    public String toExplainString() {
+      return "STATIC_BITMAP_FILTER";
+    }
+  }
+}
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/JsonPathTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/JsonPathTest.java
index 267410e9e5e..520f55126ab 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/JsonPathTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/JsonPathTest.java
@@ -733,6 +733,251 @@ public class JsonPathTest extends 
CustomDataQueryClusterIntegrationTest {
         "JsonIndexDistinctOperator should throw for missing JSON path without 
defaultValue");
   }
 
+  // --- Same-path JSON_MATCH predicate tests (trigger 
getMatchingDistinctValues fast path) ---
+
+  /**
+   * Same-path REGEXP_LIKE: fully pushed down, single dict scan, no posting 
list reads.
+   */
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testJsonIndexDistinctSamePathRegexpLike(boolean 
useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+
+    // REGEXP_LIKE on $.k1 matching a subset of values
+    String query = "SELECT DISTINCT jsonExtractIndex(" + MY_MAP_STR_FIELD_NAME 
+ ", '$.k1', 'STRING') FROM "
+        + getTableName() + " WHERE JSON_MATCH(" + MY_MAP_STR_FIELD_NAME
+        + ", 'REGEXP_LIKE(\"$.k1\", ''value-k1-[0-9]'')')"
+        + " ORDER BY jsonExtractIndex(" + MY_MAP_STR_FIELD_NAME + ", '$.k1', 
'STRING') LIMIT 10000";
+
+    JsonNode baselineResponse = postQuery(query);
+    Assert.assertEquals(baselineResponse.get("exceptions").size(), 0);
+
+    JsonNode optimizedResponse = postQueryWithOptions(query, 
USE_INDEX_BASED_DISTINCT_OPERATOR + "=true");
+    Assert.assertEquals(optimizedResponse.get("exceptions").size(), 0);
+
+    List<String> baselineRows = extractOrderedDistinctValues(baselineResponse);
+    List<String> optimizedRows = 
extractOrderedDistinctValues(optimizedResponse);
+    Assert.assertFalse(baselineRows.isEmpty(), "REGEXP_LIKE should match 
single-digit k1 values");
+    Assert.assertEquals(optimizedRows, baselineRows,
+        "Same-path REGEXP_LIKE fast path should match baseline");
+
+    if (!useMultiStageQueryEngine) {
+      
Assert.assertEquals(optimizedResponse.get("numEntriesScannedPostFilter").asLong(),
 0L);
+    }
+  }
+
+  /**
+   * Same-path EQ: fully pushed down.
+   */
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testJsonIndexDistinctSamePathEq(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+
+    String query = "SELECT DISTINCT jsonExtractIndex(" + MY_MAP_STR_FIELD_NAME 
+ ", '$.k1', 'STRING') FROM "
+        + getTableName() + " WHERE JSON_MATCH(" + MY_MAP_STR_FIELD_NAME
+        + ", '\"$.k1\" = ''value-k1-0''')"
+        + " ORDER BY jsonExtractIndex(" + MY_MAP_STR_FIELD_NAME + ", '$.k1', 
'STRING') LIMIT 10000";
+
+    JsonNode baselineResponse = postQuery(query);
+    Assert.assertEquals(baselineResponse.get("exceptions").size(), 0);
+
+    JsonNode optimizedResponse = postQueryWithOptions(query, 
USE_INDEX_BASED_DISTINCT_OPERATOR + "=true");
+    Assert.assertEquals(optimizedResponse.get("exceptions").size(), 0);
+
+    List<String> baselineRows = extractOrderedDistinctValues(baselineResponse);
+    List<String> optimizedRows = 
extractOrderedDistinctValues(optimizedResponse);
+    Assert.assertEquals(optimizedRows, baselineRows,
+        "Same-path EQ fast path should match baseline");
+    Assert.assertTrue(optimizedRows.contains("value-k1-0"));
+
+    if (!useMultiStageQueryEngine) {
+      
Assert.assertEquals(optimizedResponse.get("numEntriesScannedPostFilter").asLong(),
 0L);
+    }
+  }
+
+  /**
+   * Same-path NOT_EQ: fully pushed down.
+   */
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testJsonIndexDistinctSamePathNotEq(boolean 
useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+
+    String query = "SELECT DISTINCT jsonExtractIndex(" + MY_MAP_STR_FIELD_NAME 
+ ", '$.k1', 'STRING') FROM "
+        + getTableName() + " WHERE JSON_MATCH(" + MY_MAP_STR_FIELD_NAME
+        + ", '\"$.k1\" != ''value-k1-0''')"
+        + " ORDER BY jsonExtractIndex(" + MY_MAP_STR_FIELD_NAME + ", '$.k1', 
'STRING') LIMIT 10000";
+
+    JsonNode baselineResponse = postQuery(query);
+    Assert.assertEquals(baselineResponse.get("exceptions").size(), 0);
+
+    JsonNode optimizedResponse = postQueryWithOptions(query, 
USE_INDEX_BASED_DISTINCT_OPERATOR + "=true");
+    Assert.assertEquals(optimizedResponse.get("exceptions").size(), 0);
+
+    List<String> baselineRows = extractOrderedDistinctValues(baselineResponse);
+    List<String> optimizedRows = 
extractOrderedDistinctValues(optimizedResponse);
+    Assert.assertEquals(optimizedRows, baselineRows,
+        "Same-path NOT_EQ fast path should match baseline");
+    Assert.assertFalse(optimizedRows.contains("value-k1-0"));
+
+    if (!useMultiStageQueryEngine) {
+      
Assert.assertEquals(optimizedResponse.get("numEntriesScannedPostFilter").asLong(),
 0L);
+    }
+  }
+
+  /**
+   * Same-path IN: fully pushed down.
+   */
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testJsonIndexDistinctSamePathIn(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+
+    String query = "SELECT DISTINCT jsonExtractIndex(" + MY_MAP_STR_FIELD_NAME 
+ ", '$.k1', 'STRING') FROM "
+        + getTableName() + " WHERE JSON_MATCH(" + MY_MAP_STR_FIELD_NAME
+        + ", '\"$.k1\" IN (''value-k1-0'', ''value-k1-1'', ''value-k1-2'')')"
+        + " ORDER BY jsonExtractIndex(" + MY_MAP_STR_FIELD_NAME + ", '$.k1', 
'STRING') LIMIT 10000";
+
+    JsonNode baselineResponse = postQuery(query);
+    Assert.assertEquals(baselineResponse.get("exceptions").size(), 0);
+
+    JsonNode optimizedResponse = postQueryWithOptions(query, 
USE_INDEX_BASED_DISTINCT_OPERATOR + "=true");
+    Assert.assertEquals(optimizedResponse.get("exceptions").size(), 0);
+
+    List<String> baselineRows = extractOrderedDistinctValues(baselineResponse);
+    List<String> optimizedRows = 
extractOrderedDistinctValues(optimizedResponse);
+    Assert.assertEquals(optimizedRows, baselineRows,
+        "Same-path IN fast path should match baseline");
+    Assert.assertEquals(optimizedRows.size(), 3);
+
+    if (!useMultiStageQueryEngine) {
+      
Assert.assertEquals(optimizedResponse.get("numEntriesScannedPostFilter").asLong(),
 0L);
+    }
+  }
+
+  /**
+   * Same-path NOT_IN: fully pushed down.
+   */
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testJsonIndexDistinctSamePathNotIn(boolean 
useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+
+    String query = "SELECT DISTINCT jsonExtractIndex(" + MY_MAP_STR_FIELD_NAME 
+ ", '$.k1', 'STRING') FROM "
+        + getTableName() + " WHERE JSON_MATCH(" + MY_MAP_STR_FIELD_NAME
+        + ", '\"$.k1\" NOT IN (''value-k1-0'', ''value-k1-1'')')"
+        + " ORDER BY jsonExtractIndex(" + MY_MAP_STR_FIELD_NAME + ", '$.k1', 
'STRING') LIMIT 10000";
+
+    JsonNode baselineResponse = postQuery(query);
+    Assert.assertEquals(baselineResponse.get("exceptions").size(), 0);
+
+    JsonNode optimizedResponse = postQueryWithOptions(query, 
USE_INDEX_BASED_DISTINCT_OPERATOR + "=true");
+    Assert.assertEquals(optimizedResponse.get("exceptions").size(), 0);
+
+    List<String> baselineRows = extractOrderedDistinctValues(baselineResponse);
+    List<String> optimizedRows = 
extractOrderedDistinctValues(optimizedResponse);
+    Assert.assertEquals(optimizedRows, baselineRows,
+        "Same-path NOT_IN fast path should match baseline");
+    Assert.assertFalse(optimizedRows.contains("value-k1-0"));
+    Assert.assertFalse(optimizedRows.contains("value-k1-1"));
+
+    if (!useMultiStageQueryEngine) {
+      
Assert.assertEquals(optimizedResponse.get("numEntriesScannedPostFilter").asLong(),
 0L);
+    }
+  }
+
+  /**
+   * Same-path IS NOT NULL: fully pushed down.
+   */
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testJsonIndexDistinctSamePathIsNotNull(boolean 
useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+
+    String query = "SELECT DISTINCT jsonExtractIndex(" + MY_MAP_STR_FIELD_NAME 
+ ", '$.k1', 'STRING') FROM "
+        + getTableName() + " WHERE JSON_MATCH(" + MY_MAP_STR_FIELD_NAME + ", 
'\"$.k1\" IS NOT NULL')"
+        + " ORDER BY jsonExtractIndex(" + MY_MAP_STR_FIELD_NAME + ", '$.k1', 
'STRING') LIMIT 10000";
+
+    JsonNode baselineResponse = postQuery(query);
+    Assert.assertEquals(baselineResponse.get("exceptions").size(), 0);
+
+    JsonNode optimizedResponse = postQueryWithOptions(query, 
USE_INDEX_BASED_DISTINCT_OPERATOR + "=true");
+    Assert.assertEquals(optimizedResponse.get("exceptions").size(), 0);
+
+    List<String> baselineRows = extractOrderedDistinctValues(baselineResponse);
+    List<String> optimizedRows = 
extractOrderedDistinctValues(optimizedResponse);
+    Assert.assertEquals(optimizedRows, baselineRows,
+        "Same-path IS NOT NULL fast path should match baseline");
+    Assert.assertEquals(optimizedRows.size(), NUM_DOCS_PER_SEGMENT,
+        "IS NOT NULL should return all values since every doc has $.k1");
+
+    if (!useMultiStageQueryEngine) {
+      
Assert.assertEquals(optimizedResponse.get("numEntriesScannedPostFilter").asLong(),
 0L);
+    }
+  }
+
+  /**
+   * Same-path REGEXP_LIKE with 4-arg form (defaultValue): fully pushed down 
fast path still works with defaults.
+   */
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testJsonIndexDistinctSamePathRegexpLikeWithDefault(boolean 
useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+
+    String query = "SELECT DISTINCT jsonExtractIndex(" + MY_MAP_STR_FIELD_NAME
+        + ", '$.k1', 'STRING', 'fallback') FROM "
+        + getTableName() + " WHERE JSON_MATCH(" + MY_MAP_STR_FIELD_NAME
+        + ", 'REGEXP_LIKE(\"$.k1\", ''value-k1-[0-9]'')')"
+        + " ORDER BY jsonExtractIndex(" + MY_MAP_STR_FIELD_NAME + ", '$.k1', 
'STRING', 'fallback') LIMIT 10000";
+
+    JsonNode baselineResponse = postQuery(query);
+    Assert.assertEquals(baselineResponse.get("exceptions").size(), 0);
+
+    JsonNode optimizedResponse = postQueryWithOptions(query, 
USE_INDEX_BASED_DISTINCT_OPERATOR + "=true");
+    Assert.assertEquals(optimizedResponse.get("exceptions").size(), 0);
+
+    List<String> baselineRows = extractOrderedDistinctValues(baselineResponse);
+    List<String> optimizedRows = 
extractOrderedDistinctValues(optimizedResponse);
+    Assert.assertEquals(optimizedRows, baselineRows,
+        "Same-path REGEXP_LIKE 4-arg fast path should match baseline");
+    // The default should NOT appear since the filter only matches docs that 
HAVE $.k1
+    Assert.assertFalse(optimizedRows.contains("fallback"),
+        "Same-path filter ensures all matching docs have the path, so no 
default should appear");
+
+    if (!useMultiStageQueryEngine) {
+      
Assert.assertEquals(optimizedResponse.get("numEntriesScannedPostFilter").asLong(),
 0L);
+    }
+  }
+
+  /**
+   * Same-path REGEXP_LIKE without ORDER BY: verify LIMIT is respected with 
fast path.
+   */
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testJsonIndexDistinctSamePathWithLimit(boolean 
useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+
+    String query = "SELECT DISTINCT jsonExtractIndex(" + MY_MAP_STR_FIELD_NAME 
+ ", '$.k1', 'STRING') FROM "
+        + getTableName() + " WHERE JSON_MATCH(" + MY_MAP_STR_FIELD_NAME
+        + ", '\"$.k1\" IS NOT NULL') LIMIT 5";
+
+    JsonNode baselineResponse = postQuery(query);
+    Assert.assertEquals(baselineResponse.get("exceptions").size(), 0);
+
+    JsonNode optimizedResponse = postQueryWithOptions(query, 
USE_INDEX_BASED_DISTINCT_OPERATOR + "=true");
+    Assert.assertEquals(optimizedResponse.get("exceptions").size(), 0);
+
+    List<String> baselineRows = extractOrderedDistinctValues(baselineResponse);
+    List<String> optimizedRows = 
extractOrderedDistinctValues(optimizedResponse);
+    Assert.assertEquals(optimizedRows.size(), 5, "LIMIT 5 should be respected 
by fast path");
+    Assert.assertEquals(baselineRows.size(), 5);
+
+    if (!useMultiStageQueryEngine) {
+      
Assert.assertEquals(optimizedResponse.get("numEntriesScannedPostFilter").asLong(),
 0L);
+    }
+  }
+
   private static List<String> extractOrderedDistinctValues(JsonNode response) {
     List<String> values = new ArrayList<>();
     JsonNode rows = response.get("resultTable").get("rows");
diff --git 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkJsonIndexDistinct.java
 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkJsonIndexDistinct.java
new file mode 100644
index 00000000000..cd1b5a9bdc7
--- /dev/null
+++ 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkJsonIndexDistinct.java
@@ -0,0 +1,414 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.perf;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.function.IntSupplier;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.ExecutionStatistics;
+import org.apache.pinot.core.operator.blocks.results.DistinctResultsBlock;
+import org.apache.pinot.core.operator.query.DistinctOperator;
+import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
+import org.apache.pinot.core.plan.maker.PlanMaker;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import 
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import org.apache.pinot.core.util.GapfillUtils;
+import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.SegmentContext;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.sql.parsers.CalciteSqlParser;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+
+/**
+ * JMH benchmark for DISTINCT over scalar {@code JSON_EXTRACT_INDEX(...)} on a 
table that is explicitly configured with
+ * a JSON index on {@code tags}. The benchmark state is intended for 
single-threaded JMH execution and is not
+ * thread-safe.
+ *
+ * <p>The base sample queries are:
+ * {@code SELECT DISTINCT JSON_EXTRACT_INDEX(tags, '$.instance', 'STRING') AS 
tag_value FROM myTable WHERE
+ * JSON_MATCH(tags, 'REGEXP_LIKE("$.instance", ''.*test.*'')')}
+ * and
+ * {@code SELECT DISTINCT JSON_EXTRACT_INDEX(tags, '$.instance', 'STRING', '') 
AS tag_value FROM myTable WHERE
+ * JSON_MATCH(tags, 'REGEXP_LIKE("$.instance", ''.*test.*'')')}.
+ *
+ * <p>Pinot assigns {@code SELECT DISTINCT} queries without an explicit limit 
a default limit of {@code 10}. The
+ * benchmark validates the exact sample query for planning, then executes the 
same query with an explicit large limit so
+ * the full matching distinct set is measured and compared across operators.
+ */
+@State(Scope.Benchmark)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@Fork(1)
+@Warmup(iterations = 3, time = 1)
+@Measurement(iterations = 5, time = 1)
+public class BenchmarkJsonIndexDistinct {
+  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), 
"BenchmarkJsonIndexDistinct");
+  private static final String TABLE_NAME = "myTable";
+  private static final String SEGMENT_NAME = "jsonIndexDistinctSegment";
+  private static final String ID_COLUMN = "id";
+  private static final String TAGS_COLUMN = "tags";
+  private static final String JSON_INDEX_DISTINCT_OPERATOR_NAME = 
"JsonIndexDistinctOperator";
+  private static final String SAME_PATH_FILTER =
+      "WHERE JSON_MATCH(tags, 'REGEXP_LIKE(\"$.instance\", ''.*test.*'')')";
+  private static final String EXTRA_FILTER =
+      "WHERE JSON_MATCH(tags, 'REGEXP_LIKE(\"$.instance\", ''.*test.*'')') "
+          + "AND JSON_MATCH(tags, '\"$.cluster\" = ''cluster-0''')";
+  private static final String THREE_ARG_SAMPLE_QUERY =
+      "SELECT DISTINCT JSON_EXTRACT_INDEX(tags, '$.instance', 'STRING') AS 
tag_value "
+          + "FROM myTable " + SAME_PATH_FILTER;
+  private static final String FOUR_ARG_SAMPLE_QUERY =
+      "SELECT DISTINCT JSON_EXTRACT_INDEX(tags, '$.instance', 'STRING', '') AS 
tag_value "
+          + "FROM myTable " + SAME_PATH_FILTER;
+  private static final String THREE_ARG_EXTRA_FILTER_SAMPLE_QUERY =
+      "SELECT DISTINCT JSON_EXTRACT_INDEX(tags, '$.instance', 'STRING') AS 
tag_value "
+          + "FROM myTable " + EXTRA_FILTER;
+  private static final String FOUR_ARG_EXTRA_FILTER_SAMPLE_QUERY =
+      "SELECT DISTINCT JSON_EXTRACT_INDEX(tags, '$.instance', 'STRING', '') AS 
tag_value "
+          + "FROM myTable " + EXTRA_FILTER;
+  private static final PlanMaker PLAN_MAKER = new InstancePlanMakerImplV2();
+  private static final boolean ASSERT_SPEEDUP =
+      
Boolean.parseBoolean(System.getProperty("pinot.perf.jsonIndexDistinct.assertSpeedup",
 "true"));
+  private static final int REGRESSION_WARMUP_ITERATIONS = 3;
+  private static final int REGRESSION_MEASURE_ITERATIONS = 7;
+  private static final int REGRESSION_BATCH_SIZE = 10;
+
+  private static final TableConfig TABLE_CONFIG = new 
TableConfigBuilder(TableType.OFFLINE)
+      .setTableName(TABLE_NAME)
+      .setJsonIndexColumns(Collections.singletonList(TAGS_COLUMN))
+      .build();
+
+  private static final Schema SCHEMA = new Schema.SchemaBuilder()
+      .setSchemaName(TABLE_NAME)
+      .addSingleValueDimension(ID_COLUMN, FieldSpec.DataType.INT)
+      .addSingleValueDimension(TAGS_COLUMN, FieldSpec.DataType.JSON)
+      .build();
+
+  @Param({"500000"})
+  int _numRows;
+
+  @Param({"10", "100", "1000", "10000", "100000"})
+  int _instanceCardinality;
+
+  @Param({"0.01", "0.05", "0.1", "0.3", "0.5", "0.75", "0.9", "1.0"})
+  double _testInstanceFraction;
+
+  @Param({"1000000"})
+  int _distinctLimit;
+
+  @Param({"THREE_ARG", "FOUR_ARG", "THREE_ARG_EXTRA_FILTER", 
"FOUR_ARG_EXTRA_FILTER"})
+  String _queryVariant;
+
+  private IndexSegment _indexSegment;
+  private QueryContext _baselineQueryContext;
+  private QueryContext _optimizedQueryContext;
+  private int _expectedDistinctCount;
+  private volatile int _regressionSink;
+
+  @Setup(Level.Trial)
+  public void setup()
+      throws Exception {
+    boolean extraFilter = _queryVariant.contains("EXTRA_FILTER");
+    Preconditions.checkState(_numRows >= _instanceCardinality,
+        "Benchmark requires numRows >= instanceCardinality but got numRows=%s 
instanceCardinality=%s",
+        _numRows, _instanceCardinality);
+    int matchingInstances = Math.max(1, Math.min(_instanceCardinality,
+        (int) Math.round(_instanceCardinality * _testInstanceFraction)));
+    // With the extra cluster-0 filter, only instances where instanceId % 32 
== 0 pass the cross-path filter.
+    // Among matching instances [0, matchingInstances), those with id % 32 == 
0 survive.
+    _expectedDistinctCount = extraFilter
+        ? (int) java.util.stream.IntStream.range(0, 
matchingInstances).filter(i -> i % 32 == 0).count()
+        : matchingInstances;
+    _expectedDistinctCount = Math.max(_expectedDistinctCount, extraFilter ? 0 
: 1);
+    Preconditions.checkState(_distinctLimit >= 
Math.max(_expectedDistinctCount, 1),
+        "Distinct limit must cover all matching values for deterministic 
validation. limit=%s expectedDistinct=%s",
+        _distinctLimit, _expectedDistinctCount);
+
+    FileUtils.deleteQuietly(INDEX_DIR);
+    buildSegment();
+
+    IndexLoadingConfig indexLoadingConfig = new 
IndexLoadingConfig(TABLE_CONFIG, SCHEMA);
+    _indexSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, 
SEGMENT_NAME), indexLoadingConfig);
+
+    
Preconditions.checkState(TABLE_CONFIG.getIndexingConfig().getJsonIndexColumns().contains(TAGS_COLUMN),
+        "Table config must include JSON index on %s", TAGS_COLUMN);
+    Preconditions.checkState(_indexSegment.getDataSource(TAGS_COLUMN, 
SCHEMA).getJsonIndex() != null,
+        "Loaded segment must expose JSON index on %s", TAGS_COLUMN);
+
+    String sampleQuery = getSampleQuery();
+    PinotQuery samplePinotQuery = 
CalciteSqlParser.compileToPinotQuery(sampleQuery);
+    Preconditions.checkState(samplePinotQuery.getLimit() == 10,
+        "Unexpected default DISTINCT limit for sample query: %s", 
samplePinotQuery.getLimit());
+
+    QueryContext sampleBaselineQueryContext = compileQueryContext(sampleQuery, 
Collections.emptyMap());
+    QueryContext sampleOptimizedQueryContext = compileQueryContext(sampleQuery,
+        
Collections.singletonMap(CommonConstants.Broker.Request.QueryOptionKey.USE_INDEX_BASED_DISTINCT_OPERATOR,
+            "true"));
+    Preconditions.checkState(planOperator(sampleBaselineQueryContext) 
instanceof DistinctOperator,
+        "Exact %s sample query must plan to DistinctOperator without %s", 
_queryVariant,
+        
CommonConstants.Broker.Request.QueryOptionKey.USE_INDEX_BASED_DISTINCT_OPERATOR);
+    assertJsonIndexDistinctOperator(
+        "Exact " + _queryVariant + " sample query", 
planOperator(sampleOptimizedQueryContext));
+
+    String benchmarkQuery = sampleQuery + " LIMIT " + _distinctLimit;
+    _baselineQueryContext = compileQueryContext(benchmarkQuery, 
Collections.emptyMap());
+    _optimizedQueryContext = compileQueryContext(benchmarkQuery,
+        
Collections.singletonMap(CommonConstants.Broker.Request.QueryOptionKey.USE_INDEX_BASED_DISTINCT_OPERATOR,
+            "true"));
+
+    Preconditions.checkState(planOperator(_baselineQueryContext) instanceof 
DistinctOperator,
+        "Benchmark query must plan to DistinctOperator without %s",
+        
CommonConstants.Broker.Request.QueryOptionKey.USE_INDEX_BASED_DISTINCT_OPERATOR);
+    assertJsonIndexDistinctOperator("Benchmark query", 
planOperator(_optimizedQueryContext));
+
+    QueryExecution baselineExecution = 
executeAndCollect(_baselineQueryContext);
+    QueryExecution optimizedExecution = 
executeAndCollect(_optimizedQueryContext);
+    
Preconditions.checkState(baselineExecution._values.equals(optimizedExecution._values),
+        "Result mismatch. baseline=%s optimized=%s variant=%s",
+        baselineExecution._values.size(), optimizedExecution._values.size(), 
_queryVariant);
+    if (!extraFilter) {
+      Preconditions.checkState(optimizedExecution._values.size() == 
_expectedDistinctCount,
+          "Unexpected distinct count. expected=%s actual=%s variant=%s",
+          _expectedDistinctCount, optimizedExecution._values.size(), 
_queryVariant);
+      // Fast path (fully pushed down): no docs scanned, no entries scanned 
post-filter
+      Preconditions.checkState(optimizedExecution._stats.getNumDocsScanned() 
== 0,
+          "JsonIndexDistinctOperator should not scan docs post-filter but 
scanned %s",
+          optimizedExecution._stats.getNumDocsScanned());
+      
Preconditions.checkState(optimizedExecution._stats.getNumEntriesScannedPostFilter()
 == 0,
+          "JsonIndexDistinctOperator should not scan post-filter entries but 
scanned %s",
+          optimizedExecution._stats.getNumEntriesScannedPostFilter());
+    }
+    
Preconditions.checkState(baselineExecution._stats.getNumEntriesScannedPostFilter()
 > 0,
+        "Baseline DistinctOperator should scan post-filter entries");
+  }
+
+  @Benchmark
+  public int distinctOperatorPath() {
+    return runQueryCount(_baselineQueryContext);
+  }
+
+  @Benchmark
+  public int jsonIndexDistinctOperatorPath() {
+    return runQueryCount(_optimizedQueryContext);
+  }
+
+  @TearDown(Level.Trial)
+  public void tearDown() {
+    try {
+      verifySpeedup();
+    } finally {
+      if (_indexSegment != null) {
+        _indexSegment.destroy();
+      }
+      FileUtils.deleteQuietly(INDEX_DIR);
+    }
+  }
+
+  private void verifySpeedup() {
+    if (!ASSERT_SPEEDUP || _queryVariant.contains("EXTRA_FILTER")) {
+      return;
+    }
+
+    long optimizedNs = measureMedianNs(() -> 
runQueryCount(_optimizedQueryContext));
+    long baselineNs = measureMedianNs(() -> 
runQueryCount(_baselineQueryContext));
+    Preconditions.checkState(optimizedNs < baselineNs,
+        "JsonIndexDistinctOperator must stay faster. rows=%s 
instanceCardinality=%s testFraction=%s optimized=%sns "
+            + "baseline=%sns expectedDistinct=%s",
+        _numRows, _instanceCardinality, _testInstanceFraction, optimizedNs, 
baselineNs, _expectedDistinctCount);
+  }
+
+  private void buildSegment()
+      throws Exception {
+    SegmentGeneratorConfig config = new SegmentGeneratorConfig(TABLE_CONFIG, 
SCHEMA);
+    config.setOutDir(INDEX_DIR.getPath());
+    config.setTableName(TABLE_NAME);
+    config.setSegmentName(SEGMENT_NAME);
+
+    SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
+    try (RecordReader recordReader = new 
GeneratedDataRecordReader(createTestData())) {
+      driver.init(config, recordReader);
+      driver.build();
+    }
+  }
+
+  private LazyDataGenerator createTestData() {
+    int matchingInstances = _expectedDistinctCount;
+    String[] tagsValues = new String[_instanceCardinality];
+    for (int instanceId = 0; instanceId < _instanceCardinality; instanceId++) {
+      boolean matchesFilter = instanceId < matchingInstances;
+      String instanceValue = matchesFilter ? "service-test-" + instanceId : 
"service-prod-" + instanceId;
+      tagsValues[instanceId] =
+          "{\"instance\":\"" + instanceValue + "\",\"cluster\":\"cluster-" + 
(instanceId % 32) + "\"}";
+    }
+
+    return new LazyDataGenerator() {
+      @Override
+      public int size() {
+        return _numRows;
+      }
+
+      @Override
+      public GenericRow next(GenericRow row, int index) {
+        row.putValue(ID_COLUMN, index);
+        row.putValue(TAGS_COLUMN, tagsValues[index % tagsValues.length]);
+        return row;
+      }
+
+      @Override
+      public void rewind() {
+      }
+    };
+  }
+
+  private QueryContext compileQueryContext(String query, Map<String, String> 
queryOptions) {
+    PinotQuery pinotQuery = CalciteSqlParser.compileToPinotQuery(query);
+    if (!queryOptions.isEmpty()) {
+      pinotQuery.setQueryOptions(new HashMap<>(queryOptions));
+    }
+    PinotQuery serverPinotQuery = GapfillUtils.stripGapfill(pinotQuery);
+    if (serverPinotQuery != pinotQuery && !queryOptions.isEmpty()) {
+      serverPinotQuery.setQueryOptions(new HashMap<>(queryOptions));
+    }
+    QueryContext queryContext = 
QueryContextConverterUtils.getQueryContext(serverPinotQuery);
+    queryContext.setSchema(SCHEMA);
+    return queryContext;
+  }
+
+  private Operator planOperator(QueryContext queryContext) {
+    return PLAN_MAKER.makeSegmentPlanNode(new SegmentContext(_indexSegment), 
queryContext).run();
+  }
+
+  private static boolean isJsonIndexDistinctOperator(Operator operator) {
+    return 
JSON_INDEX_DISTINCT_OPERATOR_NAME.equals(operator.getClass().getSimpleName());
+  }
+
+  private static void assertJsonIndexDistinctOperator(String queryDescription, 
Operator operator) {
+    Preconditions.checkState(isJsonIndexDistinctOperator(operator),
+        "%s must plan to JsonIndexDistinctOperator with %s=true but planned 
%s",
+        queryDescription, 
CommonConstants.Broker.Request.QueryOptionKey.USE_INDEX_BASED_DISTINCT_OPERATOR,
+        operator.getClass().getName());
+  }
+
+  private int runQueryCount(QueryContext queryContext) {
+    Operator operator = planOperator(queryContext);
+    DistinctResultsBlock resultsBlock = (DistinctResultsBlock) 
operator.nextBlock();
+    return resultsBlock.getNumRows();
+  }
+
+  private QueryExecution executeAndCollect(QueryContext queryContext) {
+    Operator operator = planOperator(queryContext);
+    DistinctResultsBlock resultsBlock = (DistinctResultsBlock) 
operator.nextBlock();
+    ExecutionStatistics stats = operator.getExecutionStatistics();
+    Set<String> values = new TreeSet<>();
+    for (Object[] row : resultsBlock.getRows()) {
+      values.add((String) row[0]);
+    }
+    return new QueryExecution(values, stats);
+  }
+
+  private String getSampleQuery() {
+    switch (_queryVariant) {
+      case "THREE_ARG":
+        return THREE_ARG_SAMPLE_QUERY;
+      case "FOUR_ARG":
+        return FOUR_ARG_SAMPLE_QUERY;
+      case "THREE_ARG_EXTRA_FILTER":
+        return THREE_ARG_EXTRA_FILTER_SAMPLE_QUERY;
+      case "FOUR_ARG_EXTRA_FILTER":
+        return FOUR_ARG_EXTRA_FILTER_SAMPLE_QUERY;
+      default:
+        throw new IllegalStateException("Unsupported query variant: " + 
_queryVariant);
+    }
+  }
+
+  private long measureMedianNs(IntSupplier runner) {
+    for (int i = 0; i < REGRESSION_WARMUP_ITERATIONS; i++) {
+      _regressionSink = runRegressionBatch(runner);
+    }
+
+    long[] samples = new long[REGRESSION_MEASURE_ITERATIONS];
+    for (int i = 0; i < REGRESSION_MEASURE_ITERATIONS; i++) {
+      long startNs = System.nanoTime();
+      _regressionSink = runRegressionBatch(runner);
+      samples[i] = System.nanoTime() - startNs;
+    }
+    Arrays.sort(samples);
+    return samples[samples.length / 2];
+  }
+
+  private int runRegressionBatch(IntSupplier runner) {
+    int sum = 0;
+    for (int i = 0; i < REGRESSION_BATCH_SIZE; i++) {
+      sum += runner.getAsInt();
+    }
+    return sum;
+  }
+
+  public static void main(String[] args)
+      throws Exception {
+    new Runner(new OptionsBuilder()
+        .include(BenchmarkJsonIndexDistinct.class.getSimpleName())
+        .build()).run();
+  }
+
+  private static final class QueryExecution {
+    final Set<String> _values;
+    final ExecutionStatistics _stats;
+
+    QueryExecution(Set<String> values, ExecutionStatistics stats) {
+      _values = values;
+      _stats = stats;
+    }
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/json/ImmutableJsonIndexReader.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/json/ImmutableJsonIndexReader.java
index 49655411821..fc03e03c614 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/json/ImmutableJsonIndexReader.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/json/ImmutableJsonIndexReader.java
@@ -29,9 +29,11 @@ import java.nio.ByteOrder;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.PriorityQueue;
+import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.common.request.context.ExpressionContext;
@@ -550,6 +552,198 @@ public class ImmutableJsonIndexReader implements 
JsonIndexReader {
     return result;
   }
 
+  @Override
+  public Set<String> getMatchingDistinctValues(String jsonPathKey, @Nullable 
String filterString) {
+    // Normalize the path key (same logic as getMatchingFlattenedDocsMap)
+    if (_version == BaseJsonIndexCreator.VERSION_2) {
+      if (jsonPathKey.startsWith("$")) {
+        jsonPathKey = jsonPathKey.substring(1);
+      } else {
+        jsonPathKey = JsonUtils.KEY_SEPARATOR + jsonPathKey;
+      }
+    } else {
+      if (jsonPathKey.startsWith("$.")) {
+        jsonPathKey = jsonPathKey.substring(2);
+      }
+    }
+
+    Pair<String, ImmutableRoaringBitmap> pathKey = 
getKeyAndFlattenedDocIds(jsonPathKey);
+    if (pathKey.getRight() != null && pathKey.getRight().isEmpty()) {
+      return new HashSet<>();
+    }
+    jsonPathKey = pathKey.getLeft();
+
+    // Array index paths need bitmap intersection — fall back to the default 
implementation
+    if (pathKey.getRight() != null) {
+      return collectValuesFromFlattenedDocsMap(jsonPathKey, filterString);
+    }
+
+    if (filterString == null) {
+      return collectAllValues(jsonPathKey);
+    }
+
+    // Parse the filter and attempt single-pass evaluation for same-path 
predicates
+    FilterContext filter;
+    try {
+      filter = 
RequestContextUtils.getFilter(CalciteSqlParser.compileToExpression(filterString));
+      Preconditions.checkArgument(!filter.isConstant());
+    } catch (Exception e) {
+      throw new BadQueryRequestException("Invalid json match filter: " + 
filterString);
+    }
+
+    // Only optimize simple single-predicate, non-exclusive, same-path filters
+    if (filter.getType() != FilterContext.Type.PREDICATE || 
isExclusive(filter.getPredicate().getType())) {
+      return collectValuesFromFlattenedDocsMap(jsonPathKey, filterString);
+    }
+
+    Predicate predicate = filter.getPredicate();
+    String predicateKey = normalizePredicateKey(predicate);
+    if (predicateKey == null || !predicateKey.equals(jsonPathKey)) {
+      return collectValuesFromFlattenedDocsMap(jsonPathKey, filterString);
+    }
+
+    return collectDistinctValuesByPredicate(predicate, jsonPathKey);
+  }
+
+  /// Collects all distinct values for the key without any filter — no posting 
list reads.
+  private Set<String> collectAllValues(String jsonPathKey) {
+    int[] dictIds = getDictIdRangeForKey(jsonPathKey);
+    if (dictIds[0] < 0) {
+      return new HashSet<>();
+    }
+    Set<String> result = new HashSet<>();
+    byte[] dictBuffer = _dictionary.getBuffer();
+    int valueStart = jsonPathKey.length() + 1;
+    for (int dictId = dictIds[0]; dictId < dictIds[1]; dictId++) {
+      String keyValue = _dictionary.getStringValue(dictId, dictBuffer);
+      result.add(keyValue.substring(valueStart));
+      
QueryThreadContext.checkTerminationAndSampleUsagePeriodically(result.size(),
+          "ImmutableJsonIndexReader.getMatchingDistinctValues");
+    }
+    return result;
+  }
+
+  /// Single-pass dictionary scan: evaluates the predicate on each value 
string directly, no posting list reads.
+  private Set<String> collectDistinctValuesByPredicate(Predicate predicate, 
String jsonPathKey) {
+    int[] dictIds = getDictIdRangeForKey(jsonPathKey);
+    if (dictIds[0] < 0) {
+      return new HashSet<>();
+    }
+    Set<String> result = new HashSet<>();
+    byte[] dictBuffer = _dictionary.getBuffer();
+    int valueStart = jsonPathKey.length() + 1;
+    java.util.function.Predicate<String> valueMatcher = 
buildValueMatcher(predicate, jsonPathKey);
+
+    for (int dictId = dictIds[0]; dictId < dictIds[1]; dictId++) {
+      String keyValue = _dictionary.getStringValue(dictId, dictBuffer);
+      String value = keyValue.substring(valueStart);
+      if (valueMatcher.test(value)) {
+        result.add(value);
+      }
+      QueryThreadContext.checkTerminationAndSampleUsagePeriodically(dictId - 
dictIds[0],
+          "ImmutableJsonIndexReader.getMatchingDistinctValues");
+    }
+    return result;
+  }
+
+  /// Builds a string predicate that evaluates the filter condition on a value 
string.
+  private static java.util.function.Predicate<String> 
buildValueMatcher(Predicate predicate, String key) {
+    switch (predicate.getType()) {
+      case EQ: {
+        String eqValue = ((EqPredicate) predicate).getValue();
+        return value -> value.equals(eqValue);
+      }
+      case NOT_EQ: {
+        String notEqValue = ((NotEqPredicate) predicate).getValue();
+        return value -> !value.equals(notEqValue);
+      }
+      case IN: {
+        java.util.Set<String> inValues = new HashSet<>(((InPredicate) 
predicate).getValues());
+        return inValues::contains;
+      }
+      case NOT_IN: {
+        java.util.Set<String> notInValues = new HashSet<>(((NotInPredicate) 
predicate).getValues());
+        return value -> !notInValues.contains(value);
+      }
+      case REGEXP_LIKE: {
+        Pattern pattern = ((RegexpLikePredicate) predicate).getPattern();
+        Matcher matcher = pattern.matcher("");
+        return value -> matcher.reset(value).matches();
+      }
+      case RANGE: {
+        RangePredicate rangePredicate = (RangePredicate) predicate;
+        FieldSpec.DataType rangeDataType = rangePredicate.getRangeDataType();
+        if (rangeDataType.isNumeric()) {
+          rangeDataType = FieldSpec.DataType.DOUBLE;
+        } else {
+          rangeDataType = FieldSpec.DataType.STRING;
+        }
+        boolean lowerUnbounded = 
rangePredicate.getLowerBound().equals(RangePredicate.UNBOUNDED);
+        boolean upperUnbounded = 
rangePredicate.getUpperBound().equals(RangePredicate.UNBOUNDED);
+        boolean lowerInclusive = lowerUnbounded || 
rangePredicate.isLowerInclusive();
+        boolean upperInclusive = upperUnbounded || 
rangePredicate.isUpperInclusive();
+        Object lowerBound = lowerUnbounded ? null : 
rangeDataType.convert(rangePredicate.getLowerBound());
+        Object upperBound = upperUnbounded ? null : 
rangeDataType.convert(rangePredicate.getUpperBound());
+        FieldSpec.DataType dt = rangeDataType;
+        return value -> {
+          Object valueObj = dt.convert(value);
+          boolean lowerOk = lowerUnbounded || (lowerInclusive ? 
dt.compare(valueObj, lowerBound) >= 0
+              : dt.compare(valueObj, lowerBound) > 0);
+          boolean upperOk = upperUnbounded || (upperInclusive ? 
dt.compare(valueObj, upperBound) <= 0
+              : dt.compare(valueObj, upperBound) < 0);
+          return lowerOk && upperOk;
+        };
+      }
+      case IS_NOT_NULL:
+        return value -> true;
+      default:
+        throw new IllegalStateException("Unsupported predicate type for 
distinct values: " + predicate.getType());
+    }
+  }
+
+  /// Normalizes the predicate's key path to match the internal dictionary 
format.
+  @Nullable
+  private String normalizePredicateKey(Predicate predicate) {
+    ExpressionContext lhs = predicate.getLhs();
+    if (lhs.getType() != ExpressionContext.Type.IDENTIFIER) {
+      return null;
+    }
+    String key = lhs.getIdentifier();
+    if (_version == BaseJsonIndexCreator.VERSION_2) {
+      if (key.startsWith("$")) {
+        key = key.substring(1);
+      } else {
+        key = JsonUtils.KEY_SEPARATOR + key;
+      }
+    } else {
+      if (key.startsWith("$.")) {
+        key = key.substring(2);
+      }
+    }
+    // Only handle simple paths without array indices
+    Pair<String, ImmutableRoaringBitmap> pair = getKeyAndFlattenedDocIds(key);
+    if (pair.getRight() != null) {
+      return null;
+    }
+    return pair.getLeft();
+  }
+
+  /// Falls back to the default approach: build full flattened docs map and 
return keys.
+  private Set<String> collectValuesFromFlattenedDocsMap(String normalizedKey, 
@Nullable String filterString) {
+    // Need to denormalize the key back to original format for 
getMatchingFlattenedDocsMap
+    // Instead, use the default interface method which handles normalization
+    return JsonIndexReader.super.getMatchingDistinctValues(
+        denormalizeKey(normalizedKey), filterString);
+  }
+
+  private String denormalizeKey(String normalizedKey) {
+    if (_version == BaseJsonIndexCreator.VERSION_2) {
+      return "$" + normalizedKey;
+    } else {
+      return "$." + normalizedKey;
+    }
+  }
+
   @Override
   public String[][] getValuesMV(int[] docIds, int length, Map<String, 
RoaringBitmap> valueToMatchingFlattenedDocs) {
     String[][] result = new String[length][];
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/JsonIndexReader.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/JsonIndexReader.java
index 489f2dd003c..06a095b4e20 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/JsonIndexReader.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/JsonIndexReader.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.segment.spi.index.reader;
 
 import java.util.Map;
+import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.pinot.segment.spi.index.IndexReader;
 import org.roaringbitmap.RoaringBitmap;
@@ -78,6 +79,19 @@ public interface JsonIndexReader extends IndexReader {
    */
   void convertFlattenedDocIdsToDocIds(Map<String, RoaringBitmap> 
flattenedDocIdsMap);
 
+  /**
+   * Returns only the distinct value strings for the given key that match the 
optional filter, without materializing
+   * per-value posting list bitmaps or converting flattened doc IDs. This is 
significantly faster than
+   * {@link #getMatchingFlattenedDocsMap} followed by {@link 
#convertFlattenedDocIdsToDocIds} when only the distinct
+   * values are needed (e.g. fully-pushed-down DISTINCT queries).
+   *
+   * <p>The default implementation delegates to {@link 
#getMatchingFlattenedDocsMap} and returns its key set.
+   * Implementations should override for better performance.
+   */
+  default Set<String> getMatchingDistinctValues(String key, @Nullable String 
filterJsonString) {
+    return getMatchingFlattenedDocsMap(key, filterJsonString).keySet();
+  }
+
   /**
    * Returns true if the given JSON path is indexed and can be used for 
index-based operations
    * (e.g. getMatchingFlattenedDocsMap, JsonIndexDistinctOperator).


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to