This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 0c6521a5ccd Add JsonIndexDistinct benchmark and optimize same-path
JSON_MATCH (#17921)
0c6521a5ccd is described below
commit 0c6521a5ccdb26443e042c4bc2a6f4ec6a5e1a5c
Author: Xiang Fu <[email protected]>
AuthorDate: Fri Apr 3 00:00:36 2026 -0700
Add JsonIndexDistinct benchmark and optimize same-path JSON_MATCH (#17921)
* Add JsonIndexDistinct benchmark and optimize same-path JSON_MATCH
* Optimize JsonIndexDistinctOperator with single-pass dictionary scan for
same-path filters
For fully-pushed-down same-path JSON_MATCH predicates, the previous
implementation
scanned the dictionary twice (once for filter evaluation, once for
value-map building)
and materialized per-value posting list bitmaps + doc ID conversions that
were never
used. This adds getMatchingDistinctValues() to JsonIndexReader SPI with a
fused
single-pass implementation in ImmutableJsonIndexReader that evaluates the
predicate
directly on dictionary value strings — zero posting list reads, zero bitmap
operations,
zero doc ID mapping lookups.
Benchmark shows 12x-337x speedup across all cardinality/selectivity
combinations,
eliminating the previous regression at high cardinality + low selectivity.
Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
* Add extra-filter benchmark variants for cross-path filter (non-fast-path)
comparison
Adds THREE_ARG_EXTRA_FILTER and FOUR_ARG_EXTRA_FILTER query variants that
combine
the same-path REGEXP_LIKE with a cross-path JSON_MATCH on $.cluster,
preventing the
fully-pushed-down fast path. This exercises the bitmap-based code path and
confirms
it still outperforms baseline in most configurations (1.2x-4.2x).
Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
---------
Co-authored-by: Claude Opus 4.6 (1M context) <[email protected]>
---
.../operator/query/JsonIndexDistinctOperator.java | 235 +++++++++---
.../query/JsonIndexDistinctOperatorTest.java | 306 +++++++++++++++
.../integration/tests/custom/JsonPathTest.java | 245 ++++++++++++
.../pinot/perf/BenchmarkJsonIndexDistinct.java | 414 +++++++++++++++++++++
.../readers/json/ImmutableJsonIndexReader.java | 194 ++++++++++
.../segment/spi/index/reader/JsonIndexReader.java | 14 +
6 files changed, 1346 insertions(+), 62 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/JsonIndexDistinctOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/JsonIndexDistinctOperator.java
index 875ca471edb..b5d5e60c400 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/JsonIndexDistinctOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/JsonIndexDistinctOperator.java
@@ -24,10 +24,15 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import javax.annotation.Nullable;
import org.apache.pinot.common.function.JsonPathCache;
import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FilterContext;
import org.apache.pinot.common.request.context.OrderByExpressionContext;
+import org.apache.pinot.common.request.context.RequestContextUtils;
+import org.apache.pinot.common.request.context.predicate.JsonMatchPredicate;
+import org.apache.pinot.common.request.context.predicate.Predicate;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.common.Operator;
@@ -54,12 +59,17 @@ import org.apache.pinot.segment.spi.index.IndexType;
import org.apache.pinot.segment.spi.index.reader.JsonIndexReader;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.query.QueryThreadContext;
+import org.apache.pinot.sql.parsers.CalciteSqlParser;
import org.roaringbitmap.RoaringBitmap;
/**
- * Distinct operator that uses the JSON index value→docId map directly instead
of scanning documents.
- * Avoids the projection/transform pipeline for SELECT DISTINCT
jsonExtractIndex(...).
+ * Distinct operator for the scalar {@code jsonExtractIndex(column, path,
type[, defaultValue])} form.
+ *
+ * <p>Execution flow:
+ * 1. Push a same-path {@code JSON_MATCH} predicate into the JSON-index lookup
when it cannot match missing paths.
+ * 2. Convert matching flattened doc ids back to segment doc ids.
+ * 3. Apply any remaining row-level filter and materialize DISTINCT results,
including missing-path handling.
*/
public class JsonIndexDistinctOperator extends
BaseOperator<DistinctResultsBlock> {
private static final String EXPLAIN_NAME = "DISTINCT_JSON_INDEX";
@@ -91,10 +101,29 @@ public class JsonIndexDistinctOperator extends
BaseOperator<DistinctResultsBlock
ExpressionContext expr = expressions.get(0);
ParsedJsonExtractIndex parsed = parseJsonExtractIndex(expr);
if (parsed == null) {
- throw new IllegalStateException("Expected jsonExtractIndex expression");
+ throw new IllegalStateException("Expected 3/4-arg scalar
jsonExtractIndex expression");
}
- // Evaluate the filter first so we can skip the (potentially expensive)
index map when no docs match
+ DataSource dataSource = _indexSegment.getDataSource(parsed._columnName,
_queryContext.getSchema());
+ JsonIndexReader jsonIndexReader = getJsonIndexReader(dataSource);
+ if (jsonIndexReader == null) {
+ throw new IllegalStateException("Column " + parsed._columnName + " has
no JSON index");
+ }
+
+ String pushedDownFilterJson = extractSamePathJsonMatchFilter(parsed,
_queryContext.getFilter());
+ boolean filterFullyPushedDown = pushedDownFilterJson != null
+ && isOnlySamePathJsonMatchFilter(parsed, _queryContext.getFilter())
+ && !jsonMatchFilterCanMatchMissingPath(pushedDownFilterJson);
+
+ // Fast path: when the filter is fully pushed down into the JSON index, we
only need the distinct value strings.
+ // This avoids reading posting lists, building per-value bitmaps, and
converting flattened doc IDs.
+ if (filterFullyPushedDown) {
+ Set<String> distinctValues = jsonIndexReader.getMatchingDistinctValues(
+ parsed._jsonPathString, pushedDownFilterJson);
+ return buildDistinctResultsFromValues(expr, parsed, distinctValues);
+ }
+
+ // Evaluate the filter first so we can skip the (potentially expensive)
index map when no docs match.
RoaringBitmap filteredDocIds = buildFilteredDocIds();
if (filteredDocIds != null && filteredDocIds.isEmpty()) {
ColumnDataType earlyColumnDataType =
ColumnDataType.fromDataTypeSV(parsed._dataType);
@@ -107,18 +136,46 @@ public class JsonIndexDistinctOperator extends
BaseOperator<DistinctResultsBlock
createDistinctTable(earlyDataSchema, parsed._dataType,
earlyOrderBy), _queryContext);
}
- DataSource dataSource = _indexSegment.getDataSource(parsed._columnName,
_queryContext.getSchema());
- JsonIndexReader jsonIndexReader = getJsonIndexReader(dataSource);
- if (jsonIndexReader == null) {
- throw new IllegalStateException("Column " + parsed._columnName + " has
no JSON index");
- }
-
- // Same logic as
JsonExtractIndexTransformFunction.getValueToMatchingDocsMap()
+ // All other WHERE filters remain row-level and are applied after
converting flattened doc IDs to real doc IDs.
Map<String, RoaringBitmap> valueToMatchingDocs =
- jsonIndexReader.getMatchingFlattenedDocsMap(parsed._jsonPathString,
parsed._filterExpression);
+ jsonIndexReader.getMatchingFlattenedDocsMap(parsed._jsonPathString,
pushedDownFilterJson);
+
// Always single-value (MV _ARRAY is rejected in parseJsonExtractIndex)
jsonIndexReader.convertFlattenedDocIdsToDocIds(valueToMatchingDocs);
+ return buildDistinctResultsBlock(expr, parsed, valueToMatchingDocs,
filteredDocIds,
+ filteredDocIds == null);
+ }
+ private DistinctResultsBlock
buildDistinctResultsFromValues(ExpressionContext expr, ParsedJsonExtractIndex
parsed,
+ Set<String> distinctValues) {
+ ColumnDataType columnDataType =
ColumnDataType.fromDataTypeSV(parsed._dataType);
+ DataSchema dataSchema = new DataSchema(
+ new String[]{expr.toString()},
+ new ColumnDataType[]{columnDataType});
+ OrderByExpressionContext orderByExpression =
_queryContext.getOrderByExpressions() != null
+ ? _queryContext.getOrderByExpressions().get(0) : null;
+ DistinctTable distinctTable = createDistinctTable(dataSchema,
parsed._dataType, orderByExpression);
+ int limit = _queryContext.getLimit();
+
+ for (String value : distinctValues) {
+ _numEntriesExamined++;
+
QueryThreadContext.checkTerminationAndSampleUsagePeriodically(_numEntriesExamined,
EXPLAIN_NAME);
+
+ boolean done = addValueToDistinctTable(distinctTable, value,
parsed._dataType, orderByExpression);
+ if (done) {
+ break;
+ }
+ if (orderByExpression == null && distinctTable.hasLimit() &&
distinctTable.size() >= limit) {
+ break;
+ }
+ }
+
+ return new DistinctResultsBlock(distinctTable, _queryContext);
+ }
+
+ private DistinctResultsBlock buildDistinctResultsBlock(ExpressionContext
expr, ParsedJsonExtractIndex parsed,
+ Map<String, RoaringBitmap> valueToMatchingDocs, @Nullable RoaringBitmap
filteredDocIds,
+ boolean allDocsSelected) {
ColumnDataType columnDataType =
ColumnDataType.fromDataTypeSV(parsed._dataType);
DataSchema dataSchema = new DataSchema(
new String[]{expr.toString()},
@@ -129,32 +186,30 @@ public class JsonIndexDistinctOperator extends
BaseOperator<DistinctResultsBlock
int limit = _queryContext.getLimit();
int totalDocs = _indexSegment.getSegmentMetadata().getTotalDocs();
- // Track uncovered docs: for the no-filter case, build a union and compare
against totalDocs.
- // For the filtered case, use a "remaining" bitmap that shrinks in-place
(no per-value allocation).
- RoaringBitmap allMatchedDocs = filteredDocIds == null ? new
RoaringBitmap() : null;
+ RoaringBitmap coveredDocs = allDocsSelected ? new RoaringBitmap() : null;
RoaringBitmap remainingDocs = filteredDocIds != null ?
filteredDocIds.clone() : null;
- boolean allDocsCovered = filteredDocIds == null ? (totalDocs == 0) :
filteredDocIds.isEmpty();
+ boolean allDocsCovered = filteredDocIds == null ? !allDocsSelected ||
totalDocs == 0 : filteredDocIds.isEmpty();
boolean earlyBreak = false;
for (Map.Entry<String, RoaringBitmap> entry :
valueToMatchingDocs.entrySet()) {
_numEntriesExamined++;
QueryThreadContext.checkTerminationAndSampleUsagePeriodically(_numEntriesExamined,
EXPLAIN_NAME);
+
String value = entry.getKey();
RoaringBitmap docIds = entry.getValue();
boolean includeValue;
if (filteredDocIds == null) {
includeValue = true;
- // Build union for uncovered-docs detection; short-circuit once all
segment docs are covered
- if (!allDocsCovered) {
- allMatchedDocs.or(docIds);
- if (allMatchedDocs.getLongCardinality() >= totalDocs) {
+ if (!allDocsCovered && allDocsSelected) {
+ coveredDocs.or(docIds);
+ if (coveredDocs.getLongCardinality() >= totalDocs) {
allDocsCovered = true;
}
}
} else {
includeValue = RoaringBitmap.intersects(docIds, filteredDocIds);
- // Remove matched docs from remaining set in-place (no allocation per
value)
+ // Remove matched docs from remaining set in-place (no allocation per
value).
if (!allDocsCovered && includeValue) {
remainingDocs.andNot(docIds);
if (remainingDocs.isEmpty()) {
@@ -177,25 +232,92 @@ public class JsonIndexDistinctOperator extends
BaseOperator<DistinctResultsBlock
}
}
- // Handle docs not covered by any value in the index.
- // Baseline JsonExtractIndexTransformFunction throws when a doc is missing
the path and no
- // defaultValue is provided. Match that behavior here unless nullHandling
is enabled.
- // allDocsCovered tracks coverage precisely (against totalDocs or
filteredDocIds cardinality).
if (!earlyBreak && !allDocsCovered) {
- if (parsed._defaultValue != null) {
- addValueToDistinctTable(distinctTable, parsed._defaultValue,
parsed._dataType, orderByExpression);
- } else if (_queryContext.isNullHandlingEnabled()) {
- distinctTable.addNull();
- } else {
- throw new RuntimeException(
- String.format("Illegal Json Path: [%s], for some docIds in segment
[%s]",
- parsed._jsonPathString, _indexSegment.getSegmentName()));
- }
+ handleMissingDocs(distinctTable, parsed, orderByExpression);
}
return new DistinctResultsBlock(distinctTable, _queryContext);
}
+ private void handleMissingDocs(DistinctTable distinctTable,
ParsedJsonExtractIndex parsed,
+ @Nullable OrderByExpressionContext orderByExpression) {
+ if (parsed._defaultValueLiteral != null) {
+ addValueToDistinctTable(distinctTable, parsed._defaultValueLiteral,
parsed._dataType, orderByExpression);
+ } else if (_queryContext.isNullHandlingEnabled()) {
+ distinctTable.addNull();
+ } else {
+ throw new RuntimeException(
+ String.format("Illegal Json Path: [%s], for some docIds in segment
[%s]",
+ parsed._jsonPathString, _indexSegment.getSegmentName()));
+ }
+ }
+
+ @Nullable
+ private static String extractSamePathJsonMatchFilter(ParsedJsonExtractIndex
parsed, @Nullable FilterContext filter) {
+ if (filter == null) {
+ return null;
+ }
+ switch (filter.getType()) {
+ case PREDICATE:
+ return extractSamePathJsonMatchFilter(parsed, filter.getPredicate());
+ case AND:
+ String matchingFilter = null;
+ for (FilterContext child : filter.getChildren()) {
+ String childFilter = extractSamePathJsonMatchFilter(parsed, child);
+ if (childFilter == null) {
+ continue;
+ }
+ if (matchingFilter != null) {
+ return null;
+ }
+ matchingFilter = childFilter;
+ }
+ return matchingFilter;
+ default:
+ return null;
+ }
+ }
+
+ private static boolean isOnlySamePathJsonMatchFilter(ParsedJsonExtractIndex
parsed, @Nullable FilterContext filter) {
+ if (filter == null || filter.getType() != FilterContext.Type.PREDICATE) {
+ return false;
+ }
+ return extractSamePathJsonMatchFilter(parsed, filter.getPredicate()) !=
null;
+ }
+
+ private static boolean jsonMatchFilterCanMatchMissingPath(String
filterJsonString) {
+ try {
+ FilterContext filter =
RequestContextUtils.getFilter(CalciteSqlParser.compileToExpression(filterJsonString));
+ return filter.getType() == FilterContext.Type.PREDICATE
+ && filter.getPredicate().getType() == Predicate.Type.IS_NULL;
+ } catch (Exception e) {
+ return false;
+ }
+ }
+
+ @Nullable
+ private static String extractSamePathJsonMatchFilter(ParsedJsonExtractIndex
parsed, Predicate predicate) {
+ if (!(predicate instanceof JsonMatchPredicate)) {
+ return null;
+ }
+ ExpressionContext lhs = predicate.getLhs();
+ if (lhs.getType() != ExpressionContext.Type.IDENTIFIER
+ || !parsed._columnName.equals(lhs.getIdentifier())) {
+ return null;
+ }
+ String filterJsonString = ((JsonMatchPredicate) predicate).getValue();
+ int start = filterJsonString.indexOf('"');
+ if (start < 0) {
+ return null;
+ }
+ int end = filterJsonString.indexOf('"', start + 1);
+ if (end < 0) {
+ return null;
+ }
+ String filterPath = filterJsonString.substring(start + 1, end);
+ return parsed._jsonPathString.equals(filterPath) ? filterJsonString : null;
+ }
+
private DistinctTable createDistinctTable(DataSchema dataSchema,
FieldSpec.DataType dataType,
@Nullable OrderByExpressionContext orderByExpression) {
int limit = _queryContext.getLimit();
@@ -380,14 +502,15 @@ public class JsonIndexDistinctOperator extends
BaseOperator<DistinctResultsBlock
return null;
}
List<ExpressionContext> args = expr.getFunction().getArguments();
- if (args.size() < 3 || args.size() > 5) {
+ if (args.size() != 3 && args.size() != 4) {
return null;
}
if (args.get(0).getType() != ExpressionContext.Type.IDENTIFIER) {
return null;
}
if (args.get(1).getType() != ExpressionContext.Type.LITERAL
- || args.get(2).getType() != ExpressionContext.Type.LITERAL) {
+ || args.get(2).getType() != ExpressionContext.Type.LITERAL
+ || (args.size() == 4 && args.get(3).getType() !=
ExpressionContext.Type.LITERAL)) {
return null;
}
@@ -428,23 +551,17 @@ public class JsonIndexDistinctOperator extends
BaseOperator<DistinctResultsBlock
return null;
}
- String defaultValue = null;
- if (args.size() >= 4) {
- if (args.get(3).getType() != ExpressionContext.Type.LITERAL) {
+ String defaultValueLiteral = null;
+ if (args.size() == 4) {
+ defaultValueLiteral = args.get(3).getLiteral().getStringValue();
+ try {
+ dataType.convert(defaultValueLiteral);
+ } catch (Exception e) {
return null;
}
- defaultValue = args.get(3).getLiteral().getStringValue();
}
- String filterExpression = null;
- if (args.size() == 5) {
- if (args.get(4).getType() != ExpressionContext.Type.LITERAL) {
- return null;
- }
- filterExpression = args.get(4).getLiteral().getStringValue();
- }
-
- return new ParsedJsonExtractIndex(columnName, jsonPathString, dataType,
defaultValue, filterExpression);
+ return new ParsedJsonExtractIndex(columnName, jsonPathString, dataType,
defaultValueLiteral);
}
private static final class ParsedJsonExtractIndex {
@@ -452,17 +569,14 @@ public class JsonIndexDistinctOperator extends
BaseOperator<DistinctResultsBlock
final String _jsonPathString;
final FieldSpec.DataType _dataType;
@Nullable
- final String _defaultValue;
- @Nullable
- final String _filterExpression;
+ final String _defaultValueLiteral;
- ParsedJsonExtractIndex(String columnName, String jsonPathString,
- FieldSpec.DataType dataType, @Nullable String defaultValue, @Nullable
String filterExpression) {
+ ParsedJsonExtractIndex(String columnName, String jsonPathString,
FieldSpec.DataType dataType,
+ @Nullable String defaultValueLiteral) {
_columnName = columnName;
_jsonPathString = jsonPathString;
_dataType = dataType;
- _defaultValue = defaultValue;
- _filterExpression = filterExpression;
+ _defaultValueLiteral = defaultValueLiteral;
}
}
@@ -506,9 +620,9 @@ public class JsonIndexDistinctOperator extends
BaseOperator<DistinctResultsBlock
}
/**
- * Returns true if the expression is jsonExtractIndex on a column with JSON
index and the path is indexed.
- * For OSS JSON index all paths are indexed. For composite JSON index, only
paths in invertedIndexConfigs
- * are indexed per key.
+ * Returns true if the expression is the 3/4-arg scalar jsonExtractIndex
form on a column with JSON index and the
+ * path is indexed. For OSS JSON index all paths are indexed. For composite
JSON index, only paths in
+ * invertedIndexConfigs are indexed per key.
*/
public static boolean canUseJsonIndexDistinct(IndexSegment indexSegment,
ExpressionContext expr) {
ParsedJsonExtractIndex parsed = parseJsonExtractIndex(expr);
@@ -526,9 +640,6 @@ public class JsonIndexDistinctOperator extends
BaseOperator<DistinctResultsBlock
if (!reader.isPathIndexed(parsed._jsonPathString)) {
return false;
}
- // The 5th arg (_filterExpression) is a JSON filter expression, not a
plain JSON path,
- // so isPathIndexed() is not appropriate for it. The reader's
getMatchingFlattenedDocsMap()
- // handles filter expressions internally.
return true;
}
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/operator/query/JsonIndexDistinctOperatorTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/operator/query/JsonIndexDistinctOperatorTest.java
new file mode 100644
index 00000000000..bb8431d0707
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/core/operator/query/JsonIndexDistinctOperatorTest.java
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.query;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.blocks.results.DistinctResultsBlock;
+import org.apache.pinot.core.operator.filter.BaseFilterOperator;
+import org.apache.pinot.core.operator.filter.BitmapCollection;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.SegmentContext;
+import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.index.reader.JsonIndexReader;
+import org.roaringbitmap.RoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.expectThrows;
+
+
+/**
+ * Unit tests for {@link JsonIndexDistinctOperator}.
+ */
+public class JsonIndexDistinctOperatorTest {
+ private static final String STRING_EXTRACT = "JSON_EXTRACT_INDEX(tags,
'$.instance', 'STRING')";
+ private static final String STRING_EXTRACT_WITH_EMPTY_DEFAULT =
+ "JSON_EXTRACT_INDEX(tags, '$.instance', 'STRING', '')";
+ private static final String STRING_EXTRACT_WITH_DEFAULT =
+ "JSON_EXTRACT_INDEX(tags, '$.instance', 'STRING', 'missing')";
+ private static final String INVALID_INT_DEFAULT_EXTRACT =
+ "JSON_EXTRACT_INDEX(tags, '$.instance', 'INT', 'abc')";
+ private static final String SAME_PATH_FILTER = "REGEXP_LIKE(\"$.instance\",
'.*test.*')";
+ private static final String CROSS_PATH_FILTER = "REGEXP_LIKE(\"$.env\",
'prod.*')";
+ private static final String SAME_PATH_IS_NULL_FILTER = "\"$.instance\" IS
NULL";
+
+ @Test
+ public void
testSamePathJsonMatchUsesDistinctValuesFastPathForFourArgScalarForm() {
+ QueryContext queryContext =
distinctQuery(STRING_EXTRACT_WITH_EMPTY_DEFAULT, SAME_PATH_FILTER);
+
+ JsonIndexReader jsonIndexReader = mock(JsonIndexReader.class);
+ when(jsonIndexReader.getMatchingDistinctValues("$.instance",
SAME_PATH_FILTER))
+ .thenReturn(Set.of("test-east", "test-west"));
+
+ DistinctResultsBlock resultsBlock =
+ buildOperator(queryContext, jsonIndexReader, bufferBitmap(0, 1),
2).nextBlock();
+
+ assertEquals(extractValues(resultsBlock), Set.of("test-east",
"test-west"));
+ verify(jsonIndexReader).getMatchingDistinctValues("$.instance",
SAME_PATH_FILTER);
+ verify(jsonIndexReader, never()).getMatchingFlattenedDocsMap(any(), any());
+ verify(jsonIndexReader, never()).convertFlattenedDocIdsToDocIds(any());
+ }
+
+ @Test
+ public void
testSamePathJsonMatchUsesDistinctValuesFastPathForThreeArgScalarForm() {
+ QueryContext queryContext = distinctQuery(STRING_EXTRACT,
SAME_PATH_FILTER);
+
+ JsonIndexReader jsonIndexReader = mock(JsonIndexReader.class);
+ when(jsonIndexReader.getMatchingDistinctValues("$.instance",
SAME_PATH_FILTER))
+ .thenReturn(Set.of("test-east", "test-west"));
+
+ DistinctResultsBlock resultsBlock =
+ buildOperator(queryContext, jsonIndexReader, bufferBitmap(0, 1),
2).nextBlock();
+
+ assertEquals(extractValues(resultsBlock), Set.of("test-east",
"test-west"));
+ verify(jsonIndexReader).getMatchingDistinctValues("$.instance",
SAME_PATH_FILTER);
+ verify(jsonIndexReader, never()).getMatchingFlattenedDocsMap(any(), any());
+ verify(jsonIndexReader, never()).convertFlattenedDocIdsToDocIds(any());
+ }
+
+ @Test
+ public void testDifferentPathJsonMatchIsAppliedAtDocLevel() {
+ QueryContext queryContext = distinctQuery(STRING_EXTRACT,
CROSS_PATH_FILTER);
+
+ JsonIndexReader jsonIndexReader = mock(JsonIndexReader.class);
+ Map<String, RoaringBitmap> flattenedDocsByValue = new HashMap<>();
+ flattenedDocsByValue.put("prod-a", bitmap(100));
+ flattenedDocsByValue.put("prod-b", bitmap(200));
+ flattenedDocsByValue.put("other-doc", bitmap(300));
+ when(jsonIndexReader.getMatchingFlattenedDocsMap("$.instance",
null)).thenReturn(flattenedDocsByValue);
+ stubConvertedDocIds(jsonIndexReader, Map.of("prod-a", bitmap(0), "prod-b",
bitmap(1), "other-doc", bitmap(2)));
+
+ DistinctResultsBlock resultsBlock =
+ buildOperator(queryContext, jsonIndexReader, bufferBitmap(0, 1),
3).nextBlock();
+
+ assertEquals(extractValues(resultsBlock), Set.of("prod-a", "prod-b"));
+ verify(jsonIndexReader).getMatchingFlattenedDocsMap("$.instance", null);
+ verify(jsonIndexReader, never()).getMatchingFlattenedDocsMap("$.instance",
+ "REGEXP_LIKE(\"$.env\", ''prod.*'')");
+ verify(jsonIndexReader).convertFlattenedDocIdsToDocIds(any());
+ }
+
+ @Test
+ public void testCanUseJsonIndexDistinctAllowsThreeArgScalarForm() {
+ QueryContext queryContext = distinctQuery(STRING_EXTRACT,
CROSS_PATH_FILTER);
+
+ JsonIndexReader jsonIndexReader = mock(JsonIndexReader.class);
+ when(jsonIndexReader.isPathIndexed("$.instance")).thenReturn(true);
+ IndexSegment indexSegment = buildCanUseIndexSegment(jsonIndexReader);
+
+ assertTrue(JsonIndexDistinctOperator.canUseJsonIndexDistinct(indexSegment,
+ queryContext.getSelectExpressions().get(0)));
+ }
+
+ @Test
+ public void testCanUseJsonIndexDistinctAllowsFourArgScalarForm() {
+ QueryContext queryContext =
distinctQuery(STRING_EXTRACT_WITH_EMPTY_DEFAULT, CROSS_PATH_FILTER);
+
+ JsonIndexReader jsonIndexReader = mock(JsonIndexReader.class);
+ when(jsonIndexReader.isPathIndexed("$.instance")).thenReturn(true);
+ IndexSegment indexSegment = buildCanUseIndexSegment(jsonIndexReader);
+
+ assertTrue(JsonIndexDistinctOperator.canUseJsonIndexDistinct(indexSegment,
+ queryContext.getSelectExpressions().get(0)));
+ }
+
+ @Test
+ public void testCanUseJsonIndexDistinctRejectsInvalidDefaultArgument() {
+ QueryContext queryContext = distinctQuery(INVALID_INT_DEFAULT_EXTRACT,
CROSS_PATH_FILTER);
+
+ JsonIndexReader jsonIndexReader = mock(JsonIndexReader.class);
+ when(jsonIndexReader.isPathIndexed("$.instance")).thenReturn(true);
+ IndexSegment indexSegment = buildCanUseIndexSegment(jsonIndexReader);
+
+ assertFalse(JsonIndexDistinctOperator.canUseJsonIndexDistinct(indexSegment,
+ queryContext.getSelectExpressions().get(0)));
+ }
+
+ @Test
+ public void testFourArgAddsDefaultForDocsWithoutJsonPath() {
+ QueryContext queryContext = distinctQuery(STRING_EXTRACT_WITH_DEFAULT,
CROSS_PATH_FILTER);
+
+ JsonIndexReader jsonIndexReader = mock(JsonIndexReader.class);
+ Map<String, RoaringBitmap> flattenedDocsByValue = new HashMap<>();
+ flattenedDocsByValue.put("prod-a", bitmap(100));
+ flattenedDocsByValue.put("prod-b", bitmap(200));
+ when(jsonIndexReader.getMatchingFlattenedDocsMap("$.instance",
null)).thenReturn(flattenedDocsByValue);
+ stubConvertedDocIds(jsonIndexReader, Map.of("prod-a", bitmap(0), "prod-b",
bitmap(1)));
+
+ DistinctResultsBlock resultsBlock =
+ buildOperator(queryContext, jsonIndexReader, bufferBitmap(0, 1, 2),
3).nextBlock();
+
+ assertEquals(extractValues(resultsBlock), Set.of("prod-a", "prod-b",
"missing"));
+ }
+
+ @Test
+ public void testSamePathIsNullStillAddsDefaultForMissingPath() {
+ QueryContext queryContext = distinctQuery(STRING_EXTRACT_WITH_DEFAULT,
SAME_PATH_IS_NULL_FILTER);
+
+ JsonIndexReader jsonIndexReader = mock(JsonIndexReader.class);
+ when(jsonIndexReader.getMatchingFlattenedDocsMap("$.instance",
SAME_PATH_IS_NULL_FILTER)).thenReturn(
+ new HashMap<>());
+
+ DistinctResultsBlock resultsBlock =
+ buildOperator(queryContext, jsonIndexReader, bufferBitmap(2),
3).nextBlock();
+
+ assertEquals(extractValues(resultsBlock), Set.of("missing"));
+ verify(jsonIndexReader).getMatchingFlattenedDocsMap("$.instance",
SAME_PATH_IS_NULL_FILTER);
+ verify(jsonIndexReader).convertFlattenedDocIdsToDocIds(any());
+ }
+
+ @Test
+ public void testMissingPathWithoutDefaultThrows() {
+ QueryContext queryContext = distinctQuery(STRING_EXTRACT,
SAME_PATH_IS_NULL_FILTER);
+
+ JsonIndexReader jsonIndexReader = mock(JsonIndexReader.class);
+ when(jsonIndexReader.getMatchingFlattenedDocsMap("$.instance",
SAME_PATH_IS_NULL_FILTER)).thenReturn(
+ new HashMap<>());
+
+ RuntimeException exception = expectThrows(RuntimeException.class,
+ () -> buildOperator(queryContext, jsonIndexReader, bufferBitmap(2),
3).nextBlock());
+
+ assertTrue(exception.getMessage().contains("Illegal Json Path"));
+ }
+
+ private static QueryContext distinctQuery(String expression, String
filterJsonString) {
+ return QueryContextConverterUtils.getQueryContext(
+ "SELECT DISTINCT " + expression + " AS tag_value FROM myTable WHERE
JSON_MATCH(tags, '"
+ + filterJsonString.replace("'", "''") + "')");
+ }
+
+ private static void stubConvertedDocIds(JsonIndexReader jsonIndexReader,
+ Map<String, RoaringBitmap> convertedDocIds) {
+ doAnswer(invocation -> {
+ @SuppressWarnings("unchecked")
+ Map<String, RoaringBitmap> docsByValue = (Map<String, RoaringBitmap>)
invocation.getArgument(0);
+ docsByValue.clear();
+ docsByValue.putAll(convertedDocIds);
+ return null;
+ }).when(jsonIndexReader).convertFlattenedDocIdsToDocIds(any());
+ }
+
+ private static IndexSegment buildCanUseIndexSegment(JsonIndexReader
jsonIndexReader) {
+ DataSource dataSource = mock(DataSource.class);
+ when(dataSource.getJsonIndex()).thenReturn(jsonIndexReader);
+
+ IndexSegment indexSegment = mock(IndexSegment.class);
+ when(indexSegment.getDataSourceNullable("tags")).thenReturn(dataSource);
+ return indexSegment;
+ }
+
+ private static JsonIndexDistinctOperator buildOperator(QueryContext
queryContext, JsonIndexReader jsonIndexReader,
+ MutableRoaringBitmap filterBitmap, int numDocs) {
+ SegmentMetadata segmentMetadata = mock(SegmentMetadata.class);
+ when(segmentMetadata.getTotalDocs()).thenReturn(numDocs);
+
+ DataSource dataSource = mock(DataSource.class);
+ when(dataSource.getJsonIndex()).thenReturn(jsonIndexReader);
+
+ IndexSegment indexSegment = mock(IndexSegment.class);
+ when(indexSegment.getSegmentMetadata()).thenReturn(segmentMetadata);
+ when(indexSegment.getSegmentName()).thenReturn("testSegment");
+ when(indexSegment.getDataSource(eq("tags"), any())).thenReturn(dataSource);
+ when(indexSegment.getDataSourceNullable("tags")).thenReturn(dataSource);
+
+ return new JsonIndexDistinctOperator(indexSegment, new
SegmentContext(indexSegment), queryContext,
+ new StaticBitmapFilterOperator(numDocs, filterBitmap));
+ }
+
+ private static RoaringBitmap bitmap(int... docIds) {
+ RoaringBitmap bitmap = new RoaringBitmap();
+ for (int docId : docIds) {
+ bitmap.add(docId);
+ }
+ return bitmap;
+ }
+
+ private static MutableRoaringBitmap bufferBitmap(int... docIds) {
+ MutableRoaringBitmap bitmap = new MutableRoaringBitmap();
+ for (int docId : docIds) {
+ bitmap.add(docId);
+ }
+ return bitmap;
+ }
+
+ private static Set<String> extractValues(DistinctResultsBlock resultsBlock) {
+ List<Object[]> rows = resultsBlock.getRows();
+ return rows.stream().map(row -> (String)
row[0]).collect(Collectors.toSet());
+ }
+
+ private static final class StaticBitmapFilterOperator extends
BaseFilterOperator {
+ private final MutableRoaringBitmap _bitmap;
+
+ StaticBitmapFilterOperator(int numDocs, MutableRoaringBitmap bitmap) {
+ super(numDocs, false);
+ _bitmap = bitmap;
+ }
+
+ @Override
+ public boolean canProduceBitmaps() {
+ return true;
+ }
+
+ @Override
+ public BitmapCollection getBitmaps() {
+ return new BitmapCollection(_numDocs, false, _bitmap);
+ }
+
+ @Override
+ public List<Operator> getChildOperators() {
+ return List.of();
+ }
+
+ @Override
+ protected org.apache.pinot.core.common.BlockDocIdSet getTrues() {
+ throw new UnsupportedOperationException("Bitmap path only");
+ }
+
+ @Override
+ public String toExplainString() {
+ return "STATIC_BITMAP_FILTER";
+ }
+ }
+}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/JsonPathTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/JsonPathTest.java
index 267410e9e5e..520f55126ab 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/JsonPathTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/JsonPathTest.java
@@ -733,6 +733,251 @@ public class JsonPathTest extends
CustomDataQueryClusterIntegrationTest {
"JsonIndexDistinctOperator should throw for missing JSON path without
defaultValue");
}
+ // --- Same-path JSON_MATCH predicate tests (trigger
getMatchingDistinctValues fast path) ---
+
+ /**
+ * Same-path REGEXP_LIKE: fully pushed down, single dict scan, no posting
list reads.
+ */
+ @Test(dataProvider = "useBothQueryEngines")
+ public void testJsonIndexDistinctSamePathRegexpLike(boolean
useMultiStageQueryEngine)
+ throws Exception {
+ setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+
+ // REGEXP_LIKE on $.k1 matching a subset of values
+ String query = "SELECT DISTINCT jsonExtractIndex(" + MY_MAP_STR_FIELD_NAME
+ ", '$.k1', 'STRING') FROM "
+ + getTableName() + " WHERE JSON_MATCH(" + MY_MAP_STR_FIELD_NAME
+ + ", 'REGEXP_LIKE(\"$.k1\", ''value-k1-[0-9]'')')"
+ + " ORDER BY jsonExtractIndex(" + MY_MAP_STR_FIELD_NAME + ", '$.k1',
'STRING') LIMIT 10000";
+
+ JsonNode baselineResponse = postQuery(query);
+ Assert.assertEquals(baselineResponse.get("exceptions").size(), 0);
+
+ JsonNode optimizedResponse = postQueryWithOptions(query,
USE_INDEX_BASED_DISTINCT_OPERATOR + "=true");
+ Assert.assertEquals(optimizedResponse.get("exceptions").size(), 0);
+
+ List<String> baselineRows = extractOrderedDistinctValues(baselineResponse);
+ List<String> optimizedRows =
extractOrderedDistinctValues(optimizedResponse);
+ Assert.assertFalse(baselineRows.isEmpty(), "REGEXP_LIKE should match
single-digit k1 values");
+ Assert.assertEquals(optimizedRows, baselineRows,
+ "Same-path REGEXP_LIKE fast path should match baseline");
+
+ if (!useMultiStageQueryEngine) {
+
Assert.assertEquals(optimizedResponse.get("numEntriesScannedPostFilter").asLong(),
0L);
+ }
+ }
+
+ /**
+ * Same-path EQ: fully pushed down.
+ */
+ @Test(dataProvider = "useBothQueryEngines")
+ public void testJsonIndexDistinctSamePathEq(boolean useMultiStageQueryEngine)
+ throws Exception {
+ setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+
+ String query = "SELECT DISTINCT jsonExtractIndex(" + MY_MAP_STR_FIELD_NAME
+ ", '$.k1', 'STRING') FROM "
+ + getTableName() + " WHERE JSON_MATCH(" + MY_MAP_STR_FIELD_NAME
+ + ", '\"$.k1\" = ''value-k1-0''')"
+ + " ORDER BY jsonExtractIndex(" + MY_MAP_STR_FIELD_NAME + ", '$.k1',
'STRING') LIMIT 10000";
+
+ JsonNode baselineResponse = postQuery(query);
+ Assert.assertEquals(baselineResponse.get("exceptions").size(), 0);
+
+ JsonNode optimizedResponse = postQueryWithOptions(query,
USE_INDEX_BASED_DISTINCT_OPERATOR + "=true");
+ Assert.assertEquals(optimizedResponse.get("exceptions").size(), 0);
+
+ List<String> baselineRows = extractOrderedDistinctValues(baselineResponse);
+ List<String> optimizedRows =
extractOrderedDistinctValues(optimizedResponse);
+ Assert.assertEquals(optimizedRows, baselineRows,
+ "Same-path EQ fast path should match baseline");
+ Assert.assertTrue(optimizedRows.contains("value-k1-0"));
+
+ if (!useMultiStageQueryEngine) {
+
Assert.assertEquals(optimizedResponse.get("numEntriesScannedPostFilter").asLong(),
0L);
+ }
+ }
+
+ /**
+ * Same-path NOT_EQ: fully pushed down.
+ */
+ @Test(dataProvider = "useBothQueryEngines")
+ public void testJsonIndexDistinctSamePathNotEq(boolean
useMultiStageQueryEngine)
+ throws Exception {
+ setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+
+ String query = "SELECT DISTINCT jsonExtractIndex(" + MY_MAP_STR_FIELD_NAME
+ ", '$.k1', 'STRING') FROM "
+ + getTableName() + " WHERE JSON_MATCH(" + MY_MAP_STR_FIELD_NAME
+ + ", '\"$.k1\" != ''value-k1-0''')"
+ + " ORDER BY jsonExtractIndex(" + MY_MAP_STR_FIELD_NAME + ", '$.k1',
'STRING') LIMIT 10000";
+
+ JsonNode baselineResponse = postQuery(query);
+ Assert.assertEquals(baselineResponse.get("exceptions").size(), 0);
+
+ JsonNode optimizedResponse = postQueryWithOptions(query,
USE_INDEX_BASED_DISTINCT_OPERATOR + "=true");
+ Assert.assertEquals(optimizedResponse.get("exceptions").size(), 0);
+
+ List<String> baselineRows = extractOrderedDistinctValues(baselineResponse);
+ List<String> optimizedRows =
extractOrderedDistinctValues(optimizedResponse);
+ Assert.assertEquals(optimizedRows, baselineRows,
+ "Same-path NOT_EQ fast path should match baseline");
+ Assert.assertFalse(optimizedRows.contains("value-k1-0"));
+
+ if (!useMultiStageQueryEngine) {
+
Assert.assertEquals(optimizedResponse.get("numEntriesScannedPostFilter").asLong(),
0L);
+ }
+ }
+
+ /**
+ * Same-path IN: fully pushed down.
+ */
+ @Test(dataProvider = "useBothQueryEngines")
+ public void testJsonIndexDistinctSamePathIn(boolean useMultiStageQueryEngine)
+ throws Exception {
+ setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+
+ String query = "SELECT DISTINCT jsonExtractIndex(" + MY_MAP_STR_FIELD_NAME
+ ", '$.k1', 'STRING') FROM "
+ + getTableName() + " WHERE JSON_MATCH(" + MY_MAP_STR_FIELD_NAME
+ + ", '\"$.k1\" IN (''value-k1-0'', ''value-k1-1'', ''value-k1-2'')')"
+ + " ORDER BY jsonExtractIndex(" + MY_MAP_STR_FIELD_NAME + ", '$.k1',
'STRING') LIMIT 10000";
+
+ JsonNode baselineResponse = postQuery(query);
+ Assert.assertEquals(baselineResponse.get("exceptions").size(), 0);
+
+ JsonNode optimizedResponse = postQueryWithOptions(query,
USE_INDEX_BASED_DISTINCT_OPERATOR + "=true");
+ Assert.assertEquals(optimizedResponse.get("exceptions").size(), 0);
+
+ List<String> baselineRows = extractOrderedDistinctValues(baselineResponse);
+ List<String> optimizedRows =
extractOrderedDistinctValues(optimizedResponse);
+ Assert.assertEquals(optimizedRows, baselineRows,
+ "Same-path IN fast path should match baseline");
+ Assert.assertEquals(optimizedRows.size(), 3);
+
+ if (!useMultiStageQueryEngine) {
+
Assert.assertEquals(optimizedResponse.get("numEntriesScannedPostFilter").asLong(),
0L);
+ }
+ }
+
+ /**
+ * Same-path NOT_IN: fully pushed down.
+ */
+ @Test(dataProvider = "useBothQueryEngines")
+ public void testJsonIndexDistinctSamePathNotIn(boolean
useMultiStageQueryEngine)
+ throws Exception {
+ setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+
+ String query = "SELECT DISTINCT jsonExtractIndex(" + MY_MAP_STR_FIELD_NAME
+ ", '$.k1', 'STRING') FROM "
+ + getTableName() + " WHERE JSON_MATCH(" + MY_MAP_STR_FIELD_NAME
+ + ", '\"$.k1\" NOT IN (''value-k1-0'', ''value-k1-1'')')"
+ + " ORDER BY jsonExtractIndex(" + MY_MAP_STR_FIELD_NAME + ", '$.k1',
'STRING') LIMIT 10000";
+
+ JsonNode baselineResponse = postQuery(query);
+ Assert.assertEquals(baselineResponse.get("exceptions").size(), 0);
+
+ JsonNode optimizedResponse = postQueryWithOptions(query,
USE_INDEX_BASED_DISTINCT_OPERATOR + "=true");
+ Assert.assertEquals(optimizedResponse.get("exceptions").size(), 0);
+
+ List<String> baselineRows = extractOrderedDistinctValues(baselineResponse);
+ List<String> optimizedRows =
extractOrderedDistinctValues(optimizedResponse);
+ Assert.assertEquals(optimizedRows, baselineRows,
+ "Same-path NOT_IN fast path should match baseline");
+ Assert.assertFalse(optimizedRows.contains("value-k1-0"));
+ Assert.assertFalse(optimizedRows.contains("value-k1-1"));
+
+ if (!useMultiStageQueryEngine) {
+
Assert.assertEquals(optimizedResponse.get("numEntriesScannedPostFilter").asLong(),
0L);
+ }
+ }
+
+ /**
+ * Same-path IS NOT NULL: fully pushed down.
+ */
+ @Test(dataProvider = "useBothQueryEngines")
+ public void testJsonIndexDistinctSamePathIsNotNull(boolean
useMultiStageQueryEngine)
+ throws Exception {
+ setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+
+ String query = "SELECT DISTINCT jsonExtractIndex(" + MY_MAP_STR_FIELD_NAME
+ ", '$.k1', 'STRING') FROM "
+ + getTableName() + " WHERE JSON_MATCH(" + MY_MAP_STR_FIELD_NAME + ",
'\"$.k1\" IS NOT NULL')"
+ + " ORDER BY jsonExtractIndex(" + MY_MAP_STR_FIELD_NAME + ", '$.k1',
'STRING') LIMIT 10000";
+
+ JsonNode baselineResponse = postQuery(query);
+ Assert.assertEquals(baselineResponse.get("exceptions").size(), 0);
+
+ JsonNode optimizedResponse = postQueryWithOptions(query,
USE_INDEX_BASED_DISTINCT_OPERATOR + "=true");
+ Assert.assertEquals(optimizedResponse.get("exceptions").size(), 0);
+
+ List<String> baselineRows = extractOrderedDistinctValues(baselineResponse);
+ List<String> optimizedRows =
extractOrderedDistinctValues(optimizedResponse);
+ Assert.assertEquals(optimizedRows, baselineRows,
+ "Same-path IS NOT NULL fast path should match baseline");
+ Assert.assertEquals(optimizedRows.size(), NUM_DOCS_PER_SEGMENT,
+ "IS NOT NULL should return all values since every doc has $.k1");
+
+ if (!useMultiStageQueryEngine) {
+
Assert.assertEquals(optimizedResponse.get("numEntriesScannedPostFilter").asLong(),
0L);
+ }
+ }
+
+ /**
+ * Same-path REGEXP_LIKE with 4-arg form (defaultValue): fully pushed down
fast path still works with defaults.
+ */
+ @Test(dataProvider = "useBothQueryEngines")
+ public void testJsonIndexDistinctSamePathRegexpLikeWithDefault(boolean
useMultiStageQueryEngine)
+ throws Exception {
+ setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+
+ String query = "SELECT DISTINCT jsonExtractIndex(" + MY_MAP_STR_FIELD_NAME
+ + ", '$.k1', 'STRING', 'fallback') FROM "
+ + getTableName() + " WHERE JSON_MATCH(" + MY_MAP_STR_FIELD_NAME
+ + ", 'REGEXP_LIKE(\"$.k1\", ''value-k1-[0-9]'')')"
+ + " ORDER BY jsonExtractIndex(" + MY_MAP_STR_FIELD_NAME + ", '$.k1',
'STRING', 'fallback') LIMIT 10000";
+
+ JsonNode baselineResponse = postQuery(query);
+ Assert.assertEquals(baselineResponse.get("exceptions").size(), 0);
+
+ JsonNode optimizedResponse = postQueryWithOptions(query,
USE_INDEX_BASED_DISTINCT_OPERATOR + "=true");
+ Assert.assertEquals(optimizedResponse.get("exceptions").size(), 0);
+
+ List<String> baselineRows = extractOrderedDistinctValues(baselineResponse);
+ List<String> optimizedRows =
extractOrderedDistinctValues(optimizedResponse);
+ Assert.assertEquals(optimizedRows, baselineRows,
+ "Same-path REGEXP_LIKE 4-arg fast path should match baseline");
+ // The default should NOT appear since the filter only matches docs that
HAVE $.k1
+ Assert.assertFalse(optimizedRows.contains("fallback"),
+ "Same-path filter ensures all matching docs have the path, so no
default should appear");
+
+ if (!useMultiStageQueryEngine) {
+
Assert.assertEquals(optimizedResponse.get("numEntriesScannedPostFilter").asLong(),
0L);
+ }
+ }
+
+ /**
+ * Same-path REGEXP_LIKE without ORDER BY: verify LIMIT is respected with
fast path.
+ */
+ @Test(dataProvider = "useBothQueryEngines")
+ public void testJsonIndexDistinctSamePathWithLimit(boolean
useMultiStageQueryEngine)
+ throws Exception {
+ setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+
+ String query = "SELECT DISTINCT jsonExtractIndex(" + MY_MAP_STR_FIELD_NAME
+ ", '$.k1', 'STRING') FROM "
+ + getTableName() + " WHERE JSON_MATCH(" + MY_MAP_STR_FIELD_NAME
+ + ", '\"$.k1\" IS NOT NULL') LIMIT 5";
+
+ JsonNode baselineResponse = postQuery(query);
+ Assert.assertEquals(baselineResponse.get("exceptions").size(), 0);
+
+ JsonNode optimizedResponse = postQueryWithOptions(query,
USE_INDEX_BASED_DISTINCT_OPERATOR + "=true");
+ Assert.assertEquals(optimizedResponse.get("exceptions").size(), 0);
+
+ List<String> baselineRows = extractOrderedDistinctValues(baselineResponse);
+ List<String> optimizedRows =
extractOrderedDistinctValues(optimizedResponse);
+ Assert.assertEquals(optimizedRows.size(), 5, "LIMIT 5 should be respected
by fast path");
+ Assert.assertEquals(baselineRows.size(), 5);
+
+ if (!useMultiStageQueryEngine) {
+
Assert.assertEquals(optimizedResponse.get("numEntriesScannedPostFilter").asLong(),
0L);
+ }
+ }
+
private static List<String> extractOrderedDistinctValues(JsonNode response) {
List<String> values = new ArrayList<>();
JsonNode rows = response.get("resultTable").get("rows");
diff --git
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkJsonIndexDistinct.java
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkJsonIndexDistinct.java
new file mode 100644
index 00000000000..cd1b5a9bdc7
--- /dev/null
+++
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkJsonIndexDistinct.java
@@ -0,0 +1,414 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.perf;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.function.IntSupplier;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.ExecutionStatistics;
+import org.apache.pinot.core.operator.blocks.results.DistinctResultsBlock;
+import org.apache.pinot.core.operator.query.DistinctOperator;
+import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
+import org.apache.pinot.core.plan.maker.PlanMaker;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import org.apache.pinot.core.util.GapfillUtils;
+import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.SegmentContext;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.sql.parsers.CalciteSqlParser;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+
+/**
+ * JMH benchmark for DISTINCT over scalar {@code JSON_EXTRACT_INDEX(...)} on a
table that is explicitly configured with
+ * a JSON index on {@code tags}. The benchmark state is intended for
single-threaded JMH execution and is not
+ * thread-safe.
+ *
+ * <p>The base sample queries are:
+ * {@code SELECT DISTINCT JSON_EXTRACT_INDEX(tags, '$.instance', 'STRING') AS
tag_value FROM myTable WHERE
+ * JSON_MATCH(tags, 'REGEXP_LIKE("$.instance", ''.*test.*'')')}
+ * and
+ * {@code SELECT DISTINCT JSON_EXTRACT_INDEX(tags, '$.instance', 'STRING', '')
AS tag_value FROM myTable WHERE
+ * JSON_MATCH(tags, 'REGEXP_LIKE("$.instance", ''.*test.*'')')}.
+ *
+ * <p>Pinot assigns {@code SELECT DISTINCT} queries without an explicit limit
a default limit of {@code 10}. The
+ * benchmark validates the exact sample query for planning, then executes the
same query with an explicit large limit so
+ * the full matching distinct set is measured and compared across operators.
+ */
+@State(Scope.Benchmark)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@Fork(1)
+@Warmup(iterations = 3, time = 1)
+@Measurement(iterations = 5, time = 1)
+public class BenchmarkJsonIndexDistinct {
+ private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(),
"BenchmarkJsonIndexDistinct");
+ private static final String TABLE_NAME = "myTable";
+ private static final String SEGMENT_NAME = "jsonIndexDistinctSegment";
+ private static final String ID_COLUMN = "id";
+ private static final String TAGS_COLUMN = "tags";
+ private static final String JSON_INDEX_DISTINCT_OPERATOR_NAME =
"JsonIndexDistinctOperator";
+ private static final String SAME_PATH_FILTER =
+ "WHERE JSON_MATCH(tags, 'REGEXP_LIKE(\"$.instance\", ''.*test.*'')')";
+ private static final String EXTRA_FILTER =
+ "WHERE JSON_MATCH(tags, 'REGEXP_LIKE(\"$.instance\", ''.*test.*'')') "
+ + "AND JSON_MATCH(tags, '\"$.cluster\" = ''cluster-0''')";
+ private static final String THREE_ARG_SAMPLE_QUERY =
+ "SELECT DISTINCT JSON_EXTRACT_INDEX(tags, '$.instance', 'STRING') AS
tag_value "
+ + "FROM myTable " + SAME_PATH_FILTER;
+ private static final String FOUR_ARG_SAMPLE_QUERY =
+ "SELECT DISTINCT JSON_EXTRACT_INDEX(tags, '$.instance', 'STRING', '') AS
tag_value "
+ + "FROM myTable " + SAME_PATH_FILTER;
+ private static final String THREE_ARG_EXTRA_FILTER_SAMPLE_QUERY =
+ "SELECT DISTINCT JSON_EXTRACT_INDEX(tags, '$.instance', 'STRING') AS
tag_value "
+ + "FROM myTable " + EXTRA_FILTER;
+ private static final String FOUR_ARG_EXTRA_FILTER_SAMPLE_QUERY =
+ "SELECT DISTINCT JSON_EXTRACT_INDEX(tags, '$.instance', 'STRING', '') AS
tag_value "
+ + "FROM myTable " + EXTRA_FILTER;
+ private static final PlanMaker PLAN_MAKER = new InstancePlanMakerImplV2();
+ private static final boolean ASSERT_SPEEDUP =
+
Boolean.parseBoolean(System.getProperty("pinot.perf.jsonIndexDistinct.assertSpeedup",
"true"));
+ private static final int REGRESSION_WARMUP_ITERATIONS = 3;
+ private static final int REGRESSION_MEASURE_ITERATIONS = 7;
+ private static final int REGRESSION_BATCH_SIZE = 10;
+
+ private static final TableConfig TABLE_CONFIG = new
TableConfigBuilder(TableType.OFFLINE)
+ .setTableName(TABLE_NAME)
+ .setJsonIndexColumns(Collections.singletonList(TAGS_COLUMN))
+ .build();
+
+ private static final Schema SCHEMA = new Schema.SchemaBuilder()
+ .setSchemaName(TABLE_NAME)
+ .addSingleValueDimension(ID_COLUMN, FieldSpec.DataType.INT)
+ .addSingleValueDimension(TAGS_COLUMN, FieldSpec.DataType.JSON)
+ .build();
+
+ @Param({"500000"})
+ int _numRows;
+
+ @Param({"10", "100", "1000", "10000", "100000"})
+ int _instanceCardinality;
+
+ @Param({"0.01", "0.05", "0.1", "0.3", "0.5", "0.75", "0.9", "1.0"})
+ double _testInstanceFraction;
+
+ @Param({"1000000"})
+ int _distinctLimit;
+
+ @Param({"THREE_ARG", "FOUR_ARG", "THREE_ARG_EXTRA_FILTER",
"FOUR_ARG_EXTRA_FILTER"})
+ String _queryVariant;
+
+ private IndexSegment _indexSegment;
+ private QueryContext _baselineQueryContext;
+ private QueryContext _optimizedQueryContext;
+ private int _expectedDistinctCount;
+ private volatile int _regressionSink;
+
+ @Setup(Level.Trial)
+ public void setup()
+ throws Exception {
+ boolean extraFilter = _queryVariant.contains("EXTRA_FILTER");
+ Preconditions.checkState(_numRows >= _instanceCardinality,
+ "Benchmark requires numRows >= instanceCardinality but got numRows=%s
instanceCardinality=%s",
+ _numRows, _instanceCardinality);
+ int matchingInstances = Math.max(1, Math.min(_instanceCardinality,
+ (int) Math.round(_instanceCardinality * _testInstanceFraction)));
+ // With the extra cluster-0 filter, only instances where instanceId % 32
== 0 pass the cross-path filter.
+ // Among matching instances [0, matchingInstances), those with id % 32 ==
0 survive.
+ _expectedDistinctCount = extraFilter
+ ? (int) java.util.stream.IntStream.range(0,
matchingInstances).filter(i -> i % 32 == 0).count()
+ : matchingInstances;
+ _expectedDistinctCount = Math.max(_expectedDistinctCount, extraFilter ? 0
: 1);
+ Preconditions.checkState(_distinctLimit >=
Math.max(_expectedDistinctCount, 1),
+ "Distinct limit must cover all matching values for deterministic
validation. limit=%s expectedDistinct=%s",
+ _distinctLimit, _expectedDistinctCount);
+
+ FileUtils.deleteQuietly(INDEX_DIR);
+ buildSegment();
+
+ IndexLoadingConfig indexLoadingConfig = new
IndexLoadingConfig(TABLE_CONFIG, SCHEMA);
+ _indexSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR,
SEGMENT_NAME), indexLoadingConfig);
+
+
Preconditions.checkState(TABLE_CONFIG.getIndexingConfig().getJsonIndexColumns().contains(TAGS_COLUMN),
+ "Table config must include JSON index on %s", TAGS_COLUMN);
+ Preconditions.checkState(_indexSegment.getDataSource(TAGS_COLUMN,
SCHEMA).getJsonIndex() != null,
+ "Loaded segment must expose JSON index on %s", TAGS_COLUMN);
+
+ String sampleQuery = getSampleQuery();
+ PinotQuery samplePinotQuery =
CalciteSqlParser.compileToPinotQuery(sampleQuery);
+ Preconditions.checkState(samplePinotQuery.getLimit() == 10,
+ "Unexpected default DISTINCT limit for sample query: %s",
samplePinotQuery.getLimit());
+
+ QueryContext sampleBaselineQueryContext = compileQueryContext(sampleQuery,
Collections.emptyMap());
+ QueryContext sampleOptimizedQueryContext = compileQueryContext(sampleQuery,
+
Collections.singletonMap(CommonConstants.Broker.Request.QueryOptionKey.USE_INDEX_BASED_DISTINCT_OPERATOR,
+ "true"));
+ Preconditions.checkState(planOperator(sampleBaselineQueryContext)
instanceof DistinctOperator,
+ "Exact %s sample query must plan to DistinctOperator without %s",
_queryVariant,
+
CommonConstants.Broker.Request.QueryOptionKey.USE_INDEX_BASED_DISTINCT_OPERATOR);
+ assertJsonIndexDistinctOperator(
+ "Exact " + _queryVariant + " sample query",
planOperator(sampleOptimizedQueryContext));
+
+ String benchmarkQuery = sampleQuery + " LIMIT " + _distinctLimit;
+ _baselineQueryContext = compileQueryContext(benchmarkQuery,
Collections.emptyMap());
+ _optimizedQueryContext = compileQueryContext(benchmarkQuery,
+
Collections.singletonMap(CommonConstants.Broker.Request.QueryOptionKey.USE_INDEX_BASED_DISTINCT_OPERATOR,
+ "true"));
+
+ Preconditions.checkState(planOperator(_baselineQueryContext) instanceof
DistinctOperator,
+ "Benchmark query must plan to DistinctOperator without %s",
+
CommonConstants.Broker.Request.QueryOptionKey.USE_INDEX_BASED_DISTINCT_OPERATOR);
+ assertJsonIndexDistinctOperator("Benchmark query",
planOperator(_optimizedQueryContext));
+
+ QueryExecution baselineExecution =
executeAndCollect(_baselineQueryContext);
+ QueryExecution optimizedExecution =
executeAndCollect(_optimizedQueryContext);
+
Preconditions.checkState(baselineExecution._values.equals(optimizedExecution._values),
+ "Result mismatch. baseline=%s optimized=%s variant=%s",
+ baselineExecution._values.size(), optimizedExecution._values.size(),
_queryVariant);
+ if (!extraFilter) {
+ Preconditions.checkState(optimizedExecution._values.size() ==
_expectedDistinctCount,
+ "Unexpected distinct count. expected=%s actual=%s variant=%s",
+ _expectedDistinctCount, optimizedExecution._values.size(),
_queryVariant);
+ // Fast path (fully pushed down): no docs scanned, no entries scanned
post-filter
+ Preconditions.checkState(optimizedExecution._stats.getNumDocsScanned()
== 0,
+ "JsonIndexDistinctOperator should not scan docs post-filter but
scanned %s",
+ optimizedExecution._stats.getNumDocsScanned());
+
Preconditions.checkState(optimizedExecution._stats.getNumEntriesScannedPostFilter()
== 0,
+ "JsonIndexDistinctOperator should not scan post-filter entries but
scanned %s",
+ optimizedExecution._stats.getNumEntriesScannedPostFilter());
+ }
+
Preconditions.checkState(baselineExecution._stats.getNumEntriesScannedPostFilter()
> 0,
+ "Baseline DistinctOperator should scan post-filter entries");
+ }
+
+ @Benchmark
+ public int distinctOperatorPath() {
+ return runQueryCount(_baselineQueryContext);
+ }
+
+ @Benchmark
+ public int jsonIndexDistinctOperatorPath() {
+ return runQueryCount(_optimizedQueryContext);
+ }
+
+ @TearDown(Level.Trial)
+ public void tearDown() {
+ try {
+ verifySpeedup();
+ } finally {
+ if (_indexSegment != null) {
+ _indexSegment.destroy();
+ }
+ FileUtils.deleteQuietly(INDEX_DIR);
+ }
+ }
+
+ private void verifySpeedup() {
+ if (!ASSERT_SPEEDUP || _queryVariant.contains("EXTRA_FILTER")) {
+ return;
+ }
+
+ long optimizedNs = measureMedianNs(() ->
runQueryCount(_optimizedQueryContext));
+ long baselineNs = measureMedianNs(() ->
runQueryCount(_baselineQueryContext));
+ Preconditions.checkState(optimizedNs < baselineNs,
+ "JsonIndexDistinctOperator must stay faster. rows=%s
instanceCardinality=%s testFraction=%s optimized=%sns "
+ + "baseline=%sns expectedDistinct=%s",
+ _numRows, _instanceCardinality, _testInstanceFraction, optimizedNs,
baselineNs, _expectedDistinctCount);
+ }
+
+ private void buildSegment()
+ throws Exception {
+ SegmentGeneratorConfig config = new SegmentGeneratorConfig(TABLE_CONFIG,
SCHEMA);
+ config.setOutDir(INDEX_DIR.getPath());
+ config.setTableName(TABLE_NAME);
+ config.setSegmentName(SEGMENT_NAME);
+
+ SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
+ try (RecordReader recordReader = new
GeneratedDataRecordReader(createTestData())) {
+ driver.init(config, recordReader);
+ driver.build();
+ }
+ }
+
+ private LazyDataGenerator createTestData() {
+ int matchingInstances = _expectedDistinctCount;
+ String[] tagsValues = new String[_instanceCardinality];
+ for (int instanceId = 0; instanceId < _instanceCardinality; instanceId++) {
+ boolean matchesFilter = instanceId < matchingInstances;
+ String instanceValue = matchesFilter ? "service-test-" + instanceId :
"service-prod-" + instanceId;
+ tagsValues[instanceId] =
+ "{\"instance\":\"" + instanceValue + "\",\"cluster\":\"cluster-" +
(instanceId % 32) + "\"}";
+ }
+
+ return new LazyDataGenerator() {
+ @Override
+ public int size() {
+ return _numRows;
+ }
+
+ @Override
+ public GenericRow next(GenericRow row, int index) {
+ row.putValue(ID_COLUMN, index);
+ row.putValue(TAGS_COLUMN, tagsValues[index % tagsValues.length]);
+ return row;
+ }
+
+ @Override
+ public void rewind() {
+ }
+ };
+ }
+
+ private QueryContext compileQueryContext(String query, Map<String, String>
queryOptions) {
+ PinotQuery pinotQuery = CalciteSqlParser.compileToPinotQuery(query);
+ if (!queryOptions.isEmpty()) {
+ pinotQuery.setQueryOptions(new HashMap<>(queryOptions));
+ }
+ PinotQuery serverPinotQuery = GapfillUtils.stripGapfill(pinotQuery);
+ if (serverPinotQuery != pinotQuery && !queryOptions.isEmpty()) {
+ serverPinotQuery.setQueryOptions(new HashMap<>(queryOptions));
+ }
+ QueryContext queryContext =
QueryContextConverterUtils.getQueryContext(serverPinotQuery);
+ queryContext.setSchema(SCHEMA);
+ return queryContext;
+ }
+
+ private Operator planOperator(QueryContext queryContext) {
+ return PLAN_MAKER.makeSegmentPlanNode(new SegmentContext(_indexSegment),
queryContext).run();
+ }
+
+ private static boolean isJsonIndexDistinctOperator(Operator operator) {
+ return
JSON_INDEX_DISTINCT_OPERATOR_NAME.equals(operator.getClass().getSimpleName());
+ }
+
+ private static void assertJsonIndexDistinctOperator(String queryDescription,
Operator operator) {
+ Preconditions.checkState(isJsonIndexDistinctOperator(operator),
+ "%s must plan to JsonIndexDistinctOperator with %s=true but planned
%s",
+ queryDescription,
CommonConstants.Broker.Request.QueryOptionKey.USE_INDEX_BASED_DISTINCT_OPERATOR,
+ operator.getClass().getName());
+ }
+
+ private int runQueryCount(QueryContext queryContext) {
+ Operator operator = planOperator(queryContext);
+ DistinctResultsBlock resultsBlock = (DistinctResultsBlock)
operator.nextBlock();
+ return resultsBlock.getNumRows();
+ }
+
+ private QueryExecution executeAndCollect(QueryContext queryContext) {
+ Operator operator = planOperator(queryContext);
+ DistinctResultsBlock resultsBlock = (DistinctResultsBlock)
operator.nextBlock();
+ ExecutionStatistics stats = operator.getExecutionStatistics();
+ Set<String> values = new TreeSet<>();
+ for (Object[] row : resultsBlock.getRows()) {
+ values.add((String) row[0]);
+ }
+ return new QueryExecution(values, stats);
+ }
+
+ private String getSampleQuery() {
+ switch (_queryVariant) {
+ case "THREE_ARG":
+ return THREE_ARG_SAMPLE_QUERY;
+ case "FOUR_ARG":
+ return FOUR_ARG_SAMPLE_QUERY;
+ case "THREE_ARG_EXTRA_FILTER":
+ return THREE_ARG_EXTRA_FILTER_SAMPLE_QUERY;
+ case "FOUR_ARG_EXTRA_FILTER":
+ return FOUR_ARG_EXTRA_FILTER_SAMPLE_QUERY;
+ default:
+ throw new IllegalStateException("Unsupported query variant: " +
_queryVariant);
+ }
+ }
+
+ private long measureMedianNs(IntSupplier runner) {
+ for (int i = 0; i < REGRESSION_WARMUP_ITERATIONS; i++) {
+ _regressionSink = runRegressionBatch(runner);
+ }
+
+ long[] samples = new long[REGRESSION_MEASURE_ITERATIONS];
+ for (int i = 0; i < REGRESSION_MEASURE_ITERATIONS; i++) {
+ long startNs = System.nanoTime();
+ _regressionSink = runRegressionBatch(runner);
+ samples[i] = System.nanoTime() - startNs;
+ }
+ Arrays.sort(samples);
+ return samples[samples.length / 2];
+ }
+
+ private int runRegressionBatch(IntSupplier runner) {
+ int sum = 0;
+ for (int i = 0; i < REGRESSION_BATCH_SIZE; i++) {
+ sum += runner.getAsInt();
+ }
+ return sum;
+ }
+
+ public static void main(String[] args)
+ throws Exception {
+ new Runner(new OptionsBuilder()
+ .include(BenchmarkJsonIndexDistinct.class.getSimpleName())
+ .build()).run();
+ }
+
+ private static final class QueryExecution {
+ final Set<String> _values;
+ final ExecutionStatistics _stats;
+
+ QueryExecution(Set<String> values, ExecutionStatistics stats) {
+ _values = values;
+ _stats = stats;
+ }
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/json/ImmutableJsonIndexReader.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/json/ImmutableJsonIndexReader.java
index 49655411821..fc03e03c614 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/json/ImmutableJsonIndexReader.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/json/ImmutableJsonIndexReader.java
@@ -29,9 +29,11 @@ import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
+import java.util.Set;
import javax.annotation.Nullable;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.request.context.ExpressionContext;
@@ -550,6 +552,198 @@ public class ImmutableJsonIndexReader implements
JsonIndexReader {
return result;
}
+ @Override
+ public Set<String> getMatchingDistinctValues(String jsonPathKey, @Nullable
String filterString) {
+ // Normalize the path key (same logic as getMatchingFlattenedDocsMap)
+ if (_version == BaseJsonIndexCreator.VERSION_2) {
+ if (jsonPathKey.startsWith("$")) {
+ jsonPathKey = jsonPathKey.substring(1);
+ } else {
+ jsonPathKey = JsonUtils.KEY_SEPARATOR + jsonPathKey;
+ }
+ } else {
+ if (jsonPathKey.startsWith("$.")) {
+ jsonPathKey = jsonPathKey.substring(2);
+ }
+ }
+
+ Pair<String, ImmutableRoaringBitmap> pathKey =
getKeyAndFlattenedDocIds(jsonPathKey);
+ if (pathKey.getRight() != null && pathKey.getRight().isEmpty()) {
+ return new HashSet<>();
+ }
+ jsonPathKey = pathKey.getLeft();
+
+ // Array index paths need bitmap intersection — fall back to the default
implementation
+ if (pathKey.getRight() != null) {
+ return collectValuesFromFlattenedDocsMap(jsonPathKey, filterString);
+ }
+
+ if (filterString == null) {
+ return collectAllValues(jsonPathKey);
+ }
+
+ // Parse the filter and attempt single-pass evaluation for same-path
predicates
+ FilterContext filter;
+ try {
+ filter =
RequestContextUtils.getFilter(CalciteSqlParser.compileToExpression(filterString));
+ Preconditions.checkArgument(!filter.isConstant());
+ } catch (Exception e) {
+ throw new BadQueryRequestException("Invalid json match filter: " +
filterString);
+ }
+
+ // Only optimize simple single-predicate, non-exclusive, same-path filters
+ if (filter.getType() != FilterContext.Type.PREDICATE ||
isExclusive(filter.getPredicate().getType())) {
+ return collectValuesFromFlattenedDocsMap(jsonPathKey, filterString);
+ }
+
+ Predicate predicate = filter.getPredicate();
+ String predicateKey = normalizePredicateKey(predicate);
+ if (predicateKey == null || !predicateKey.equals(jsonPathKey)) {
+ return collectValuesFromFlattenedDocsMap(jsonPathKey, filterString);
+ }
+
+ return collectDistinctValuesByPredicate(predicate, jsonPathKey);
+ }
+
+ /// Collects all distinct values for the key without any filter — no posting
list reads.
+ private Set<String> collectAllValues(String jsonPathKey) {
+ int[] dictIds = getDictIdRangeForKey(jsonPathKey);
+ if (dictIds[0] < 0) {
+ return new HashSet<>();
+ }
+ Set<String> result = new HashSet<>();
+ byte[] dictBuffer = _dictionary.getBuffer();
+ int valueStart = jsonPathKey.length() + 1;
+ for (int dictId = dictIds[0]; dictId < dictIds[1]; dictId++) {
+ String keyValue = _dictionary.getStringValue(dictId, dictBuffer);
+ result.add(keyValue.substring(valueStart));
+
QueryThreadContext.checkTerminationAndSampleUsagePeriodically(result.size(),
+ "ImmutableJsonIndexReader.getMatchingDistinctValues");
+ }
+ return result;
+ }
+
+ /// Single-pass dictionary scan: evaluates the predicate on each value
string directly, no posting list reads.
+ private Set<String> collectDistinctValuesByPredicate(Predicate predicate,
String jsonPathKey) {
+ int[] dictIds = getDictIdRangeForKey(jsonPathKey);
+ if (dictIds[0] < 0) {
+ return new HashSet<>();
+ }
+ Set<String> result = new HashSet<>();
+ byte[] dictBuffer = _dictionary.getBuffer();
+ int valueStart = jsonPathKey.length() + 1;
+ java.util.function.Predicate<String> valueMatcher =
buildValueMatcher(predicate, jsonPathKey);
+
+ for (int dictId = dictIds[0]; dictId < dictIds[1]; dictId++) {
+ String keyValue = _dictionary.getStringValue(dictId, dictBuffer);
+ String value = keyValue.substring(valueStart);
+ if (valueMatcher.test(value)) {
+ result.add(value);
+ }
+ QueryThreadContext.checkTerminationAndSampleUsagePeriodically(dictId -
dictIds[0],
+ "ImmutableJsonIndexReader.getMatchingDistinctValues");
+ }
+ return result;
+ }
+
+ /// Builds a string predicate that evaluates the filter condition on a value
string.
+ private static java.util.function.Predicate<String>
buildValueMatcher(Predicate predicate, String key) {
+ switch (predicate.getType()) {
+ case EQ: {
+ String eqValue = ((EqPredicate) predicate).getValue();
+ return value -> value.equals(eqValue);
+ }
+ case NOT_EQ: {
+ String notEqValue = ((NotEqPredicate) predicate).getValue();
+ return value -> !value.equals(notEqValue);
+ }
+ case IN: {
+ java.util.Set<String> inValues = new HashSet<>(((InPredicate)
predicate).getValues());
+ return inValues::contains;
+ }
+ case NOT_IN: {
+ java.util.Set<String> notInValues = new HashSet<>(((NotInPredicate)
predicate).getValues());
+ return value -> !notInValues.contains(value);
+ }
+ case REGEXP_LIKE: {
+ Pattern pattern = ((RegexpLikePredicate) predicate).getPattern();
+ Matcher matcher = pattern.matcher("");
+ return value -> matcher.reset(value).matches();
+ }
+ case RANGE: {
+ RangePredicate rangePredicate = (RangePredicate) predicate;
+ FieldSpec.DataType rangeDataType = rangePredicate.getRangeDataType();
+ if (rangeDataType.isNumeric()) {
+ rangeDataType = FieldSpec.DataType.DOUBLE;
+ } else {
+ rangeDataType = FieldSpec.DataType.STRING;
+ }
+ boolean lowerUnbounded =
rangePredicate.getLowerBound().equals(RangePredicate.UNBOUNDED);
+ boolean upperUnbounded =
rangePredicate.getUpperBound().equals(RangePredicate.UNBOUNDED);
+ boolean lowerInclusive = lowerUnbounded ||
rangePredicate.isLowerInclusive();
+ boolean upperInclusive = upperUnbounded ||
rangePredicate.isUpperInclusive();
+ Object lowerBound = lowerUnbounded ? null :
rangeDataType.convert(rangePredicate.getLowerBound());
+ Object upperBound = upperUnbounded ? null :
rangeDataType.convert(rangePredicate.getUpperBound());
+ FieldSpec.DataType dt = rangeDataType;
+ return value -> {
+ Object valueObj = dt.convert(value);
+ boolean lowerOk = lowerUnbounded || (lowerInclusive ?
dt.compare(valueObj, lowerBound) >= 0
+ : dt.compare(valueObj, lowerBound) > 0);
+ boolean upperOk = upperUnbounded || (upperInclusive ?
dt.compare(valueObj, upperBound) <= 0
+ : dt.compare(valueObj, upperBound) < 0);
+ return lowerOk && upperOk;
+ };
+ }
+ case IS_NOT_NULL:
+ return value -> true;
+ default:
+ throw new IllegalStateException("Unsupported predicate type for
distinct values: " + predicate.getType());
+ }
+ }
+
+ /// Normalizes the predicate's key path to match the internal dictionary
format.
+ @Nullable
+ private String normalizePredicateKey(Predicate predicate) {
+ ExpressionContext lhs = predicate.getLhs();
+ if (lhs.getType() != ExpressionContext.Type.IDENTIFIER) {
+ return null;
+ }
+ String key = lhs.getIdentifier();
+ if (_version == BaseJsonIndexCreator.VERSION_2) {
+ if (key.startsWith("$")) {
+ key = key.substring(1);
+ } else {
+ key = JsonUtils.KEY_SEPARATOR + key;
+ }
+ } else {
+ if (key.startsWith("$.")) {
+ key = key.substring(2);
+ }
+ }
+ // Only handle simple paths without array indices
+ Pair<String, ImmutableRoaringBitmap> pair = getKeyAndFlattenedDocIds(key);
+ if (pair.getRight() != null) {
+ return null;
+ }
+ return pair.getLeft();
+ }
+
+ /// Falls back to the default approach: build full flattened docs map and
return keys.
+ private Set<String> collectValuesFromFlattenedDocsMap(String normalizedKey,
@Nullable String filterString) {
+ // Need to denormalize the key back to original format for
getMatchingFlattenedDocsMap
+ // Instead, use the default interface method which handles normalization
+ return JsonIndexReader.super.getMatchingDistinctValues(
+ denormalizeKey(normalizedKey), filterString);
+ }
+
+ private String denormalizeKey(String normalizedKey) {
+ if (_version == BaseJsonIndexCreator.VERSION_2) {
+ return "$" + normalizedKey;
+ } else {
+ return "$." + normalizedKey;
+ }
+ }
+
@Override
public String[][] getValuesMV(int[] docIds, int length, Map<String,
RoaringBitmap> valueToMatchingFlattenedDocs) {
String[][] result = new String[length][];
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/JsonIndexReader.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/JsonIndexReader.java
index 489f2dd003c..06a095b4e20 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/JsonIndexReader.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/JsonIndexReader.java
@@ -19,6 +19,7 @@
package org.apache.pinot.segment.spi.index.reader;
import java.util.Map;
+import java.util.Set;
import javax.annotation.Nullable;
import org.apache.pinot.segment.spi.index.IndexReader;
import org.roaringbitmap.RoaringBitmap;
@@ -78,6 +79,19 @@ public interface JsonIndexReader extends IndexReader {
*/
void convertFlattenedDocIdsToDocIds(Map<String, RoaringBitmap>
flattenedDocIdsMap);
+ /**
+ * Returns only the distinct value strings for the given key that match the
optional filter, without materializing
+ * per-value posting list bitmaps or converting flattened doc IDs. This is
significantly faster than
+ * {@link #getMatchingFlattenedDocsMap} followed by {@link
#convertFlattenedDocIdsToDocIds} when only the distinct
+ * values are needed (e.g. fully-pushed-down DISTINCT queries).
+ *
+ * <p>The default implementation delegates to {@link
#getMatchingFlattenedDocsMap} and returns its key set.
+ * Implementations should override for better performance.
+ */
+ default Set<String> getMatchingDistinctValues(String key, @Nullable String
filterJsonString) {
+ return getMatchingFlattenedDocsMap(key, filterJsonString).keySet();
+ }
+
/**
* Returns true if the given JSON path is indexed and can be used for
index-based operations
* (e.g. getMatchingFlattenedDocsMap, JsonIndexDistinctOperator).
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]