Copilot commented on code in PR #17820: URL: https://github.com/apache/pinot/pull/17820#discussion_r2955686179
########## pinot-core/src/main/java/org/apache/pinot/core/operator/query/JsonIndexDistinctOperator.java: ########## @@ -0,0 +1,534 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.query; + +import com.google.common.base.CaseFormat; +import java.math.BigDecimal; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import javax.annotation.Nullable; +import org.apache.pinot.common.function.JsonPathCache; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.request.context.OrderByExpressionContext; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.core.common.Operator; +import org.apache.pinot.core.operator.BaseOperator; +import org.apache.pinot.core.operator.ExecutionStatistics; +import org.apache.pinot.core.operator.ExplainAttributeBuilder; +import org.apache.pinot.core.operator.blocks.DocIdSetBlock; +import org.apache.pinot.core.operator.blocks.results.DistinctResultsBlock; +import org.apache.pinot.core.operator.filter.BaseFilterOperator; +import org.apache.pinot.core.plan.DocIdSetPlanNode; +import org.apache.pinot.core.query.distinct.table.BigDecimalDistinctTable; +import org.apache.pinot.core.query.distinct.table.DistinctTable; +import org.apache.pinot.core.query.distinct.table.DoubleDistinctTable; +import org.apache.pinot.core.query.distinct.table.FloatDistinctTable; +import org.apache.pinot.core.query.distinct.table.IntDistinctTable; +import org.apache.pinot.core.query.distinct.table.LongDistinctTable; +import org.apache.pinot.core.query.distinct.table.StringDistinctTable; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.segment.spi.SegmentContext; +import org.apache.pinot.segment.spi.datasource.DataSource; +import org.apache.pinot.segment.spi.index.IndexService; +import org.apache.pinot.segment.spi.index.IndexType; +import org.apache.pinot.segment.spi.index.reader.JsonIndexReader; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.query.QueryThreadContext; +import org.roaringbitmap.RoaringBitmap; + + +/** + * Distinct operator that uses the JSON index value→docId map directly instead of scanning documents. + * Avoids the projection/transform pipeline for SELECT DISTINCT jsonExtractIndex(...). + */ +public class JsonIndexDistinctOperator extends BaseOperator<DistinctResultsBlock> { + private static final String EXPLAIN_NAME = "DISTINCT_JSON_INDEX"; + private static final String FUNCTION_NAME = "jsonExtractIndex"; + + private final IndexSegment _indexSegment; + private final SegmentContext _segmentContext; + private final QueryContext _queryContext; + private final BaseFilterOperator _filterOperator; + + private int _numEntriesExamined = 0; + private long _numEntriesScannedInFilter = 0; + + public JsonIndexDistinctOperator(IndexSegment indexSegment, SegmentContext segmentContext, + QueryContext queryContext, BaseFilterOperator filterOperator) { + _indexSegment = indexSegment; + _segmentContext = segmentContext; + _queryContext = queryContext; + _filterOperator = filterOperator; + } + + @Override + protected DistinctResultsBlock getNextBlock() { + List<ExpressionContext> expressions = _queryContext.getSelectExpressions(); + if (expressions.size() != 1) { + throw new IllegalStateException("JsonIndexDistinctOperator supports single expression only"); + } + + ExpressionContext expr = expressions.get(0); + ParsedJsonExtractIndex parsed = parseJsonExtractIndex(expr); + if (parsed == null) { + throw new IllegalStateException("Expected jsonExtractIndex expression"); + } + + // Evaluate the filter first so we can skip the (potentially expensive) index map when no docs match + RoaringBitmap filteredDocIds = buildFilteredDocIds(); + if (filteredDocIds != null && filteredDocIds.isEmpty()) { + ColumnDataType earlyColumnDataType = ColumnDataType.fromDataTypeSV(parsed._dataType); + DataSchema earlyDataSchema = new DataSchema( + new String[]{expr.toString()}, + new ColumnDataType[]{earlyColumnDataType}); + OrderByExpressionContext earlyOrderBy = _queryContext.getOrderByExpressions() != null + ? _queryContext.getOrderByExpressions().get(0) : null; + return new DistinctResultsBlock( + createDistinctTable(earlyDataSchema, parsed._dataType, earlyOrderBy), _queryContext); + } + + DataSource dataSource = _indexSegment.getDataSource(parsed._columnName, _queryContext.getSchema()); + JsonIndexReader jsonIndexReader = getJsonIndexReader(dataSource); + if (jsonIndexReader == null) { + throw new IllegalStateException("Column " + parsed._columnName + " has no JSON index"); + } + + // Same logic as JsonExtractIndexTransformFunction.getValueToMatchingDocsMap() + Map<String, RoaringBitmap> valueToMatchingDocs = + jsonIndexReader.getMatchingFlattenedDocsMap(parsed._jsonPathString, parsed._filterExpression); + // Always single-value (MV _ARRAY is rejected in parseJsonExtractIndex) + jsonIndexReader.convertFlattenedDocIdsToDocIds(valueToMatchingDocs); + + ColumnDataType columnDataType = ColumnDataType.fromDataTypeSV(parsed._dataType); + DataSchema dataSchema = new DataSchema( + new String[]{expr.toString()}, + new ColumnDataType[]{columnDataType}); + OrderByExpressionContext orderByExpression = _queryContext.getOrderByExpressions() != null + ? _queryContext.getOrderByExpressions().get(0) : null; + DistinctTable distinctTable = createDistinctTable(dataSchema, parsed._dataType, orderByExpression); + + int limit = _queryContext.getLimit(); + int totalDocs = _indexSegment.getSegmentMetadata().getTotalDocs(); + // Track uncovered docs: for the no-filter case, build a union and compare against totalDocs. + // For the filtered case, use a "remaining" bitmap that shrinks in-place (no per-value allocation). + RoaringBitmap allMatchedDocs = filteredDocIds == null ? new RoaringBitmap() : null; + RoaringBitmap remainingDocs = filteredDocIds != null ? filteredDocIds.clone() : null; + boolean allDocsCovered = filteredDocIds == null ? (totalDocs == 0) : filteredDocIds.isEmpty(); + boolean earlyBreak = false; + + for (Map.Entry<String, RoaringBitmap> entry : valueToMatchingDocs.entrySet()) { + _numEntriesExamined++; + QueryThreadContext.checkTerminationAndSampleUsagePeriodically(_numEntriesExamined, EXPLAIN_NAME); + String value = entry.getKey(); + RoaringBitmap docIds = entry.getValue(); + + boolean includeValue; + if (filteredDocIds == null) { + includeValue = true; + // Build union for uncovered-docs detection; short-circuit once all segment docs are covered + if (!allDocsCovered) { + allMatchedDocs.or(docIds); + if (allMatchedDocs.getLongCardinality() >= totalDocs) { + allDocsCovered = true; + } + } + } else { + includeValue = RoaringBitmap.intersects(docIds, filteredDocIds); + // Remove matched docs from remaining set in-place (no allocation per value) + if (!allDocsCovered && includeValue) { + remainingDocs.andNot(docIds); + if (remainingDocs.isEmpty()) { + allDocsCovered = true; + } + } + } + + if (includeValue) { + boolean done = addValueToDistinctTable(distinctTable, value, parsed._dataType, orderByExpression); + if (done) { + earlyBreak = true; + break; + } + } + + if (orderByExpression == null && distinctTable.hasLimit() && distinctTable.size() >= limit) { + earlyBreak = true; + break; + } + } + + // Handle docs not covered by any value in the index. + // Baseline JsonExtractIndexTransformFunction throws when a doc is missing the path and no + // defaultValue is provided. Match that behavior here unless nullHandling is enabled. + // allDocsCovered tracks coverage precisely (against totalDocs or filteredDocIds cardinality). + if (!earlyBreak && !allDocsCovered) { + if (parsed._defaultValue != null) { + addValueToDistinctTable(distinctTable, parsed._defaultValue, parsed._dataType, orderByExpression); + } else if (_queryContext.isNullHandlingEnabled()) { + distinctTable.addNull(); + } else { + throw new RuntimeException( + String.format("Illegal Json Path: [%s], for some docIds in segment [%s]", + parsed._jsonPathString, _indexSegment.getSegmentName())); + } Review Comment: Null-handling behavior here doesn’t match the existing jsonExtractIndex implementation: JsonExtractIndexTransformFunction throws "Illegal Json Path" when the path is absent and no defaultValue is provided, regardless of enableNullHandling. This operator currently returns a NULL distinct value instead of throwing when null handling is enabled, which can change query results/exception behavior for the same SQL. Please either (a) keep behavior consistent and throw even when null handling is enabled, or (b) update JsonExtractIndexTransformFunction (and related operators) + add tests to intentionally change semantics under null handling. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
