Copilot commented on code in PR #17872: URL: https://github.com/apache/pinot/pull/17872#discussion_r2987150652
########## pinot-core/src/main/java/org/apache/pinot/core/operator/query/InvertedIndexDistinctOperator.java: ########## @@ -0,0 +1,507 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.query; + +import com.google.common.base.CaseFormat; +import java.util.Collections; +import java.util.List; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.request.context.OrderByExpressionContext; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.common.utils.config.QueryOptionsUtils; +import org.apache.pinot.core.common.Operator; +import org.apache.pinot.core.operator.BaseOperator; +import org.apache.pinot.core.operator.BaseProjectOperator; +import org.apache.pinot.core.operator.ExecutionStatistics; +import org.apache.pinot.core.operator.ExplainAttributeBuilder; +import org.apache.pinot.core.operator.blocks.DocIdSetBlock; +import org.apache.pinot.core.operator.blocks.ValueBlock; +import org.apache.pinot.core.operator.blocks.results.DistinctResultsBlock; +import org.apache.pinot.core.operator.filter.BaseFilterOperator; +import org.apache.pinot.core.operator.filter.BitmapBasedFilterOperator; +import org.apache.pinot.core.plan.DocIdSetPlanNode; +import org.apache.pinot.core.plan.ProjectPlanNode; +import org.apache.pinot.core.query.distinct.DistinctExecutor; +import org.apache.pinot.core.query.distinct.DistinctExecutorFactory; +import org.apache.pinot.core.query.distinct.table.DictIdDistinctTable; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.segment.spi.SegmentContext; +import org.apache.pinot.segment.spi.datasource.DataSource; +import org.apache.pinot.segment.spi.datasource.DataSourceMetadata; +import org.apache.pinot.segment.spi.index.reader.Dictionary; +import org.apache.pinot.segment.spi.index.reader.InvertedIndexReader; +import org.apache.pinot.segment.spi.index.reader.NullValueVectorReader; +import org.apache.pinot.segment.spi.index.reader.SortedIndexReader; +import org.apache.pinot.spi.query.QueryThreadContext; +import org.apache.pinot.spi.utils.Pairs; +import org.roaringbitmap.PeekableIntIterator; +import org.roaringbitmap.buffer.ImmutableRoaringBitmap; +import org.roaringbitmap.buffer.MutableRoaringBitmap; + + +/** + * Inverted-index-based operator for single-column distinct queries on a single segment. + * + * <p>Supports three execution paths, chosen at runtime: + * <ul> + * <li><b>Sorted index path</b>: For sorted columns, merge-iterates filter bitmap against contiguous doc ranges. + * Cost ~ O(cardinality + filteredDocs). Always chosen when the column has a sorted forward index.</li> + * <li><b>Bitmap inverted index path</b>: Iterates dictionary entries and uses inverted index bitmap intersections + * to check filter membership. Avoids the projection pipeline entirely. Chosen by cost heuristic when dictionary + * cardinality is much smaller than the filtered doc count.</li> + * <li><b>Scan path (fallback)</b>: Uses ProjectOperator + DistinctExecutor to scan filtered docs. + * Used when the cost heuristic determines scanning is cheaper.</li> + * </ul> + * + * <p>Enabled via the {@code useIndexBasedDistinctOperator} query option. The cost ratio can be tuned + * via the {@code invertedIndexDistinctCostRatio} query option. + */ +public class InvertedIndexDistinctOperator extends BaseOperator<DistinctResultsBlock> { + private static final String EXPLAIN_NAME = "DISTINCT_INVERTED_INDEX"; + private static final String EXPLAIN_NAME_SORTED_INDEX = "DISTINCT_SORTED_INDEX"; + private static final String EXPLAIN_NAME_SCAN_FALLBACK = "DISTINCT"; + + private final IndexSegment _indexSegment; + private final SegmentContext _segmentContext; + private final QueryContext _queryContext; + private final BaseFilterOperator _filterOperator; + private final DataSource _dataSource; + private final Dictionary _dictionary; + private final InvertedIndexReader<?> _invertedIndexReader; + + // Scan path: created lazily when scan fallback is chosen + private BaseProjectOperator<?> _projectOperator; + + // Execution tracking + private boolean _usedInvertedIndexPath = false; + private int _numDocsScanned = 0; + private int _numEntriesExamined = 0; + private long _numEntriesScannedInFilter = 0; + + /** + * Creates an InvertedIndexDistinctOperator. The caller (DistinctPlanNode) must verify that the column + * has both a dictionary and an inverted index before constructing this operator. + */ + public InvertedIndexDistinctOperator(IndexSegment indexSegment, SegmentContext segmentContext, + QueryContext queryContext, BaseFilterOperator filterOperator, DataSource dataSource) { + _indexSegment = indexSegment; + _segmentContext = segmentContext; + _queryContext = queryContext; + _filterOperator = filterOperator; + _dataSource = dataSource; + _dictionary = dataSource.getDictionary(); + _invertedIndexReader = dataSource.getInvertedIndex(); + } + + @Override + protected DistinctResultsBlock getNextBlock() { + ImmutableRoaringBitmap filteredDocIds = buildFilteredDocIds(); + + // Sorted index: always use the sorted path — O(cardinality + filteredDocs) merge iteration + if (_invertedIndexReader instanceof SortedIndexReader) { + _usedInvertedIndexPath = true; + return executeSortedIndexPath((SortedIndexReader<?>) _invertedIndexReader, filteredDocIds); + } + // Bitmap inverted index: use cost heuristic to decide + if (shouldUseBitmapInvertedIndex(filteredDocIds)) { + _usedInvertedIndexPath = true; + return executeInvertedIndexPath(filteredDocIds); + } + return executeScanPath(filteredDocIds); + } + + // ==================== Cost Heuristic ==================== + + /** + * Default cost ratios for the inverted-index-based distinct heuristic, keyed by dictionary cardinality threshold. + * The inverted index path is chosen when {@code dictionaryCardinality * costRatio <= filteredDocCount}. + * + * <p>The cost ratio accounts for the per-entry bitmap intersection cost relative to the per-doc scan cost. + * For low-cardinality dictionaries, each bitmap is dense and {@code intersects()} is fast, but there are few + * entries so any unnecessary intersection is relatively expensive vs. scanning a small filtered doc set. + * For high-cardinality dictionaries, bitmaps are sparser and {@code intersects()} is slower per entry, + * but the scan path also becomes cheaper (fewer docs per value), so a lower ratio suffices. + * + * <p>Benchmarking (BenchmarkInvertedIndexDistinct, 1M docs) shows the crossover points: + * <ul> + * <li>dictCard ≤ 1K: costRatio=30 — inverted index wins when filteredDocs ≥ ~30x dictCard</li> + * <li>dictCard ≤ 10K: costRatio=10 — inverted index wins when filteredDocs ≥ ~10x dictCard</li> + * <li>dictCard > 10K: costRatio=6 — inverted index wins when filteredDocs ≥ ~6x dictCard</li> + * </ul> + * + * <p>Can be overridden at query time via the query option {@code invertedIndexDistinctCostRatio}. + */ + static final NavigableMap<Integer, Double> DEFAULT_COST_RATIO_BY_CARDINALITY; + + static { + TreeMap<Integer, Double> map = new TreeMap<>(); + map.put(0, 30.0); // dictCard <= 1000: costRatio = 30 + map.put(1_001, 10.0); // dictCard 1001..10000: costRatio = 10 + map.put(10_001, 6.0); // dictCard > 10000: costRatio = 6 + DEFAULT_COST_RATIO_BY_CARDINALITY = Collections.unmodifiableNavigableMap(map); + } + + static double getDefaultCostRatio(int dictionaryCardinality) { + return DEFAULT_COST_RATIO_BY_CARDINALITY.floorEntry(dictionaryCardinality).getValue(); + } + + private boolean shouldUseBitmapInvertedIndex(@Nullable ImmutableRoaringBitmap filteredDocIds) { + int dictionaryCardinality = _dictionary.length(); + int filteredDocCount = filteredDocIds == null + ? _indexSegment.getSegmentMetadata().getTotalDocs() + : filteredDocIds.getCardinality(); + if (filteredDocCount == 0) { + return false; + } + Double costRatioOverride = QueryOptionsUtils.getInvertedIndexDistinctCostRatio(_queryContext.getQueryOptions()); + double costRatio = costRatioOverride != null ? costRatioOverride : getDefaultCostRatio(dictionaryCardinality); + return (double) dictionaryCardinality * costRatio <= filteredDocCount; + } + + // ==================== Scan Path (Fallback) ==================== + + /** + * Scan fallback: uses ProjectOperator + DistinctExecutor. When the filter bitmap was already materialized + * by {@link #buildFilteredDocIds()}, wraps it in a {@link BitmapBasedFilterOperator} to avoid re-evaluating + * the filter through the projection pipeline. + */ + private DistinctResultsBlock executeScanPath(@Nullable ImmutableRoaringBitmap filteredDocIds) { + BaseFilterOperator filterOp; + if (filteredDocIds != null) { + filterOp = new BitmapBasedFilterOperator(filteredDocIds, false, + _indexSegment.getSegmentMetadata().getTotalDocs()); + } else { + filterOp = _filterOperator; + } + _projectOperator = new ProjectPlanNode(_segmentContext, _queryContext, + _queryContext.getSelectExpressions(), DocIdSetPlanNode.MAX_DOC_PER_CALL, filterOp).run(); + DistinctExecutor executor = DistinctExecutorFactory.getDistinctExecutor(_projectOperator, _queryContext); + ValueBlock valueBlock; + while ((valueBlock = _projectOperator.nextBlock()) != null) { + _numDocsScanned += valueBlock.getNumDocs(); + if (executor.process(valueBlock)) { + break; + } + } + return new DistinctResultsBlock(executor.getResult(), _queryContext); + } + + // ==================== Sorted Index Path ==================== + + /** + * Optimized path for sorted columns. Each dictId maps to a contiguous doc range [start, end]. + * We merge-iterate the filter bitmap with the sorted ranges in O(cardinality + filteredDocs). + */ + private DistinctResultsBlock executeSortedIndexPath(SortedIndexReader<?> sortedReader, + @Nullable ImmutableRoaringBitmap filteredDocIds) { + DictIdDistinctTable dictIdTable = createDictIdTable(); + OrderByExpressionContext orderByExpression = + _queryContext.getOrderByExpressions() != null ? _queryContext.getOrderByExpressions().get(0) : null; + int dictLength = _dictionary.length(); + // Process null handling: exclude null docs from filter and determine if nulls are present + NullFilterResult nullResult = processNullDocs(filteredDocIds); + ImmutableRoaringBitmap nonNullFilteredDocIds = nullResult._nonNullFilteredDocIds; + Review Comment: When null handling is enabled and the filtered docs contain nulls, this operator never reserves a slot for null while collecting dictIds (DictIdDistinctTable never calls addNull()). If the query has a LIMIT and no ORDER BY, this can fill the table to LIMIT dictIds and later cause the broker-side toResultTableWithoutOrderBy() logic to drop null (it only includes null when numValues < limit). Consider calling dictIdTable.addNull() as soon as nullResult indicates nulls are present (before addDictId loop), or otherwise ensure at most (limit-1) non-null values are collected when null is present. ```suggestion if (nullResult._hasNull) { // Reserve one slot in the distinct table for null so that at most (limit - 1) non-null values are collected dictIdTable.addNull(); } ``` ########## pinot-core/src/test/java/org/apache/pinot/queries/InvertedIndexDistinctOperatorTest.java: ########## @@ -0,0 +1,747 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.queries; + +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.core.operator.BaseOperator; +import org.apache.pinot.core.operator.blocks.results.DistinctResultsBlock; +import org.apache.pinot.core.query.distinct.table.DistinctTable; +import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader; +import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader; +import org.apache.pinot.segment.spi.ImmutableSegment; +import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; + + +/** + * Tests for {@link org.apache.pinot.core.operator.query.InvertedIndexDistinctOperator}. + * + * <p>Five segments exercise distinct operator features: + * <ul> + * <li><b>INT segment</b>: 10K records, 100 unique INT values (interleaved), inverted index. + * Tests cost heuristic path selection and inverted-vs-scan correctness.</li> + * <li><b>MV segment</b>: 1K records, MV INT column (50 unique values), inverted index. + * Tests multi-value column support.</li> + * <li><b>Sorted segment</b>: 10K records, sorted INT column (100 unique), sorted forward index. + * Tests sorted index path.</li> + * <li><b>STRING segment</b>: 5K records, STRING column (50 unique), inverted index. + * Tests STRING data type handling.</li> + * <li><b>Null segment</b>: 1K records, INT column with nulls, inverted index. + * Tests null handling.</li> + * </ul> + */ +public class InvertedIndexDistinctOperatorTest extends BaseQueriesTest { + private static final File INDEX_DIR = + new File(FileUtils.getTempDirectory(), "InvertedIndexDistinctOperatorTest"); + private static final String RAW_TABLE_NAME = "testTable"; + + // Active segment — swapped per test group + private IndexSegment _activeSegment; + private final List<IndexSegment> _allSegments = new ArrayList<>(); + + // --- INT segment (cost heuristic tests) --- + private static final String INT_COLUMN = "intColumn"; + private static final int INT_NUM_UNIQUE = 100; + private static final int INT_RECORDS_PER_VALUE = 100; + private static final int INT_NUM_RECORDS = INT_NUM_UNIQUE * INT_RECORDS_PER_VALUE; + private IndexSegment _intSegment; + + // --- MV segment --- + private static final String MV_INT_COLUMN = "mvIntColumn"; + private static final String SV_FILTER_COLUMN = "svFilterColumn"; + private static final int MV_NUM_RECORDS = 1000; + private static final int MV_CARDINALITY = 50; + private IndexSegment _mvSegment; + private Set<Integer> _allMvValues; + private Set<Integer> _filteredMvValues; + + // --- Sorted segment --- + private static final String SORTED_COLUMN = "sortedColumn"; + private static final String FILTER_COLUMN = "filterColumn"; + private static final int SORTED_NUM_UNIQUE = 100; + private static final int SORTED_RECORDS_PER_VALUE = 100; + private IndexSegment _sortedSegment; + + // --- STRING segment --- + private static final String STRING_COLUMN = "stringColumn"; + private static final int STRING_NUM_UNIQUE = 50; + private static final int STRING_RECORDS_PER_VALUE = 100; + private IndexSegment _stringSegment; + + // --- Null segment --- + private static final int NULL_NUM_RECORDS = 1000; + private static final int NULL_NUM_NON_NULL = 950; + private static final int NULL_NUM_UNIQUE = 50; + private IndexSegment _nullSegment; + + @Override + protected String getFilter() { + throw new UnsupportedOperationException(); + } + + @Override + protected IndexSegment getIndexSegment() { + return _activeSegment; + } + + @Override + protected List<IndexSegment> getIndexSegments() { + return List.of(_activeSegment); + } + + @BeforeClass + public void setUp() + throws Exception { + FileUtils.deleteQuietly(INDEX_DIR); + _intSegment = buildIntSegment(); + _mvSegment = buildMvSegment(); + _sortedSegment = buildSortedSegment(); + _stringSegment = buildStringSegment(); + _nullSegment = buildNullSegment(); + } + + @AfterClass + public void tearDown() { + for (IndexSegment segment : _allSegments) { + segment.destroy(); + } + FileUtils.deleteQuietly(INDEX_DIR); + } + + // ==================== Segment Builders ==================== + + private IndexSegment buildIntSegment() + throws Exception { + Schema schema = new Schema.SchemaBuilder().addSingleValueDimension(INT_COLUMN, DataType.INT).build(); + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE) + .setTableName(RAW_TABLE_NAME).setInvertedIndexColumns(List.of(INT_COLUMN)).build(); + + List<GenericRow> records = new ArrayList<>(INT_NUM_RECORDS); + for (int j = 0; j < INT_RECORDS_PER_VALUE; j++) { + for (int i = 0; i < INT_NUM_UNIQUE; i++) { + GenericRow record = new GenericRow(); + record.putValue(INT_COLUMN, i); + records.add(record); + } + } + return buildSegment("intSegment", schema, tableConfig, records); + } + + private IndexSegment buildMvSegment() + throws Exception { + Schema schema = new Schema.SchemaBuilder() + .addMultiValueDimension(MV_INT_COLUMN, DataType.INT) + .addSingleValueDimension(SV_FILTER_COLUMN, DataType.INT).build(); + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE) + .setTableName(RAW_TABLE_NAME) + .setInvertedIndexColumns(List.of(MV_INT_COLUMN, SV_FILTER_COLUMN)).build(); + + _allMvValues = new HashSet<>(); + _filteredMvValues = new HashSet<>(); + List<GenericRow> records = new ArrayList<>(MV_NUM_RECORDS); + for (int i = 0; i < MV_NUM_RECORDS; i++) { + GenericRow record = new GenericRow(); + List<Integer> mvValues = new ArrayList<>(); + mvValues.add(i % MV_CARDINALITY); + mvValues.add((i + 1) % MV_CARDINALITY); + if (i % 3 == 0) { + mvValues.add((i + 2) % MV_CARDINALITY); + } + record.putValue(MV_INT_COLUMN, mvValues.toArray(new Integer[0])); + record.putValue(SV_FILTER_COLUMN, i); + records.add(record); + for (int v : mvValues) { + _allMvValues.add(v); + if (i < 500) { + _filteredMvValues.add(v); + } + } + } + return buildSegment("mvSegment", schema, tableConfig, records); + } + + private IndexSegment buildSortedSegment() + throws Exception { + Schema schema = new Schema.SchemaBuilder() + .addSingleValueDimension(SORTED_COLUMN, DataType.INT) + .addSingleValueDimension(FILTER_COLUMN, DataType.INT).build(); + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE) + .setTableName(RAW_TABLE_NAME).setSortedColumn(SORTED_COLUMN).build(); + + List<GenericRow> records = new ArrayList<>(); + for (int i = 0; i < SORTED_NUM_UNIQUE; i++) { + for (int j = 0; j < SORTED_RECORDS_PER_VALUE; j++) { + GenericRow record = new GenericRow(); + record.putValue(SORTED_COLUMN, i); + record.putValue(FILTER_COLUMN, i * SORTED_RECORDS_PER_VALUE + j); + records.add(record); + } + } + return buildSegment("sortedSegment", schema, tableConfig, records); + } + + private IndexSegment buildStringSegment() + throws Exception { + Schema schema = new Schema.SchemaBuilder() + .addSingleValueDimension(STRING_COLUMN, DataType.STRING) + .addSingleValueDimension(FILTER_COLUMN, DataType.INT).build(); + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE) + .setTableName(RAW_TABLE_NAME) + .setInvertedIndexColumns(List.of(STRING_COLUMN, FILTER_COLUMN)).build(); + + List<GenericRow> records = new ArrayList<>(); + for (int j = 0; j < STRING_RECORDS_PER_VALUE; j++) { + for (int i = 0; i < STRING_NUM_UNIQUE; i++) { + GenericRow record = new GenericRow(); + record.putValue(STRING_COLUMN, String.format("val_%02d", i)); + record.putValue(FILTER_COLUMN, j * STRING_NUM_UNIQUE + i); + records.add(record); + } + } + return buildSegment("stringSegment", schema, tableConfig, records); + } + + private IndexSegment buildNullSegment() + throws Exception { + Schema schema = new Schema.SchemaBuilder() + .addSingleValueDimension(INT_COLUMN, DataType.INT) + .addSingleValueDimension(FILTER_COLUMN, DataType.INT).build(); + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE) + .setTableName(RAW_TABLE_NAME) + .setInvertedIndexColumns(List.of(INT_COLUMN, FILTER_COLUMN)) + .setNullHandlingEnabled(true).build(); + + List<GenericRow> records = new ArrayList<>(NULL_NUM_RECORDS); + for (int i = 0; i < NULL_NUM_RECORDS; i++) { + GenericRow record = new GenericRow(); + record.putValue(INT_COLUMN, i >= NULL_NUM_NON_NULL ? null : i % NULL_NUM_UNIQUE); + record.putValue(FILTER_COLUMN, i); + records.add(record); + } + + SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema); + config.setTableName(RAW_TABLE_NAME); + config.setSegmentName("nullSegment"); + config.setOutDir(new File(INDEX_DIR, "nullSegment_dir").getAbsolutePath()); + config.setDefaultNullHandlingEnabled(true); + + SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl(); + driver.init(config, new GenericRowRecordReader(records)); + driver.build(); + + ImmutableSegment segment = ImmutableSegmentLoader.load( + new File(new File(INDEX_DIR, "nullSegment_dir"), "nullSegment"), + new IndexLoadingConfig(tableConfig, schema)); + _allSegments.add(segment); + return segment; + } + + private IndexSegment buildSegment(String segmentName, Schema schema, TableConfig tableConfig, + List<GenericRow> records) + throws Exception { + File segmentDir = new File(INDEX_DIR, segmentName + "_dir"); + SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema); + config.setTableName(RAW_TABLE_NAME); + config.setSegmentName(segmentName); + config.setOutDir(segmentDir.getAbsolutePath()); + + SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl(); + driver.init(config, new GenericRowRecordReader(records)); + driver.build(); + + ImmutableSegment segment = ImmutableSegmentLoader.load( + new File(segmentDir, segmentName), new IndexLoadingConfig(tableConfig, schema)); + _allSegments.add(segment); + return segment; + } + + // ==================== Helpers ==================== + + private BaseOperator<DistinctResultsBlock> runDistinct(String query) { + BaseOperator<DistinctResultsBlock> op = getOperator(query); + op.nextBlock(); + return op; + } + + private boolean usedInvertedIndex(BaseOperator<DistinctResultsBlock> op) { + return op.toExplainString().contains("DISTINCT_INVERTED_INDEX"); + } + + private boolean usedSortedIndex(BaseOperator<DistinctResultsBlock> op) { + return op.toExplainString().contains("DISTINCT_SORTED_INDEX"); + } + + private Set<Integer> extractIntValues(DistinctTable table) { + Set<Integer> values = new HashSet<>(); + for (Object[] row : table.getRows()) { + if (row[0] != null) { + values.add(((Number) row[0]).intValue()); + } + } + return values; + } + + private Set<String> extractStringValues(DistinctTable table) { + Set<String> values = new HashSet<>(); + for (Object[] row : table.getRows()) { + values.add((String) row[0]); + } + return values; + } + + private boolean containsNull(DistinctTable table) { + for (Object[] row : table.getRows()) { + if (row[0] == null) { + return true; + } + } + return false; + } + + private static final String OPT = "OPTION(useIndexBasedDistinctOperator=true"; + private static final String OPT_INV = OPT + ", invertedIndexDistinctCostRatio=1)"; + private static final String OPT_SCAN = OPT + ", invertedIndexDistinctCostRatio=100000)"; + + // ==================== Cost Heuristic Tests ==================== + + @Test + public void testCostRatioPathSelection() { + _activeSegment = _intSegment; + + // Without the query option → old DistinctOperator + assertFalse(usedInvertedIndex(runDistinct( + "SELECT DISTINCT intColumn FROM testTable WHERE intColumn >= 0"))); + + // costRatio=1, wide filter (10K docs): 100*1 <= 10000 → inverted + assertTrue(usedInvertedIndex(runDistinct( + "SELECT DISTINCT intColumn FROM testTable WHERE intColumn >= 0 " + + OPT + ", invertedIndexDistinctCostRatio=1)"))); + + // costRatio=200, wide filter: 100*200=20000 > 10000 → scan + assertFalse(usedInvertedIndex(runDistinct( + "SELECT DISTINCT intColumn FROM testTable WHERE intColumn >= 0 " + + OPT + ", invertedIndexDistinctCostRatio=200)"))); + + // costRatio=1, selective filter (100 docs): 100*1 <= 100 → inverted + assertTrue(usedInvertedIndex(runDistinct( + "SELECT DISTINCT intColumn FROM testTable WHERE intColumn = 0 " + + OPT + ", invertedIndexDistinctCostRatio=1)"))); + + // costRatio=2, selective filter: 100*2=200 > 100 → scan + assertFalse(usedInvertedIndex(runDistinct( + "SELECT DISTINCT intColumn FROM testTable WHERE intColumn = 0 " + + OPT + ", invertedIndexDistinctCostRatio=2)"))); + + // Default costRatio=30: 100*30=3000 <= 10K → inverted + assertTrue(usedInvertedIndex(runDistinct( + "SELECT DISTINCT intColumn FROM testTable WHERE intColumn >= 0 " + OPT + ")"))); + + // Boundary: costRatio=100: 100*100=10000 <= 10000 → inverted + assertTrue(usedInvertedIndex(runDistinct( + "SELECT DISTINCT intColumn FROM testTable WHERE intColumn >= 0 " + + OPT + ", invertedIndexDistinctCostRatio=100)"))); + + // Above boundary: costRatio=101: 100*101=10100 > 10000 → scan + assertFalse(usedInvertedIndex(runDistinct( + "SELECT DISTINCT intColumn FROM testTable WHERE intColumn >= 0 " + + OPT + ", invertedIndexDistinctCostRatio=101)"))); + } + + @Test + public void testInvertedIndexVsScanCorrectness() { + _activeSegment = _intSegment; + + // With ORDER BY + BaseOperator<DistinctResultsBlock> invertedOp = getOperator( + "SELECT DISTINCT intColumn FROM testTable WHERE intColumn IN " + + "(0,1,2,3,4,5,6,7,8,9) ORDER BY intColumn LIMIT 100 " + OPT_INV); + DistinctTable invertedTable = invertedOp.nextBlock().getDistinctTable(); + assertTrue(usedInvertedIndex(invertedOp)); + + BaseOperator<DistinctResultsBlock> scanOp = getOperator( + "SELECT DISTINCT intColumn FROM testTable WHERE intColumn IN " + + "(0,1,2,3,4,5,6,7,8,9) ORDER BY intColumn LIMIT 100 " + OPT_SCAN); + DistinctTable scanTable = scanOp.nextBlock().getDistinctTable(); + assertFalse(usedInvertedIndex(scanOp)); + + Set<Integer> expected = new HashSet<>(); + for (int i = 0; i < 10; i++) { + expected.add(i); + } + assertEquals(extractIntValues(invertedTable), expected); + assertEquals(extractIntValues(scanTable), expected); + + // Without ORDER BY — same count + BaseOperator<DistinctResultsBlock> inv2 = getOperator( + "SELECT DISTINCT intColumn FROM testTable WHERE intColumn >= 0 LIMIT 200 " + OPT_INV); + assertEquals(inv2.nextBlock().getDistinctTable().size(), INT_NUM_UNIQUE); + + BaseOperator<DistinctResultsBlock> scan2 = getOperator( + "SELECT DISTINCT intColumn FROM testTable WHERE intColumn >= 0 LIMIT 200 " + OPT_SCAN); + assertEquals(scan2.nextBlock().getDistinctTable().size(), INT_NUM_UNIQUE); + } + + // ==================== Multi-Value Tests ==================== + + @Test + public void testMvColumnWithFilter() { + _activeSegment = _mvSegment; + + BaseOperator<DistinctResultsBlock> op = getOperator( + "SELECT DISTINCT mvIntColumn FROM testTable WHERE svFilterColumn < 500 LIMIT 1000 " + OPT_INV); + DistinctTable table = op.nextBlock().getDistinctTable(); + assertTrue(usedInvertedIndex(op)); + assertEquals(extractIntValues(table), _filteredMvValues); + } + + @Test + public void testMvColumnInvertedVsScan() { + _activeSegment = _mvSegment; + + BaseOperator<DistinctResultsBlock> invertedOp = getOperator( + "SELECT DISTINCT mvIntColumn FROM testTable WHERE svFilterColumn < 500 " + + "ORDER BY mvIntColumn LIMIT 1000 " + OPT_INV); + DistinctTable invertedTable = invertedOp.nextBlock().getDistinctTable(); + assertTrue(usedInvertedIndex(invertedOp)); + + BaseOperator<DistinctResultsBlock> scanOp = getOperator( + "SELECT DISTINCT mvIntColumn FROM testTable WHERE svFilterColumn < 500 " + + "ORDER BY mvIntColumn LIMIT 1000 " + OPT_SCAN); + DistinctTable scanTable = scanOp.nextBlock().getDistinctTable(); + assertFalse(usedInvertedIndex(scanOp)); + + assertEquals(extractIntValues(invertedTable), extractIntValues(scanTable)); + } + + @Test + public void testMvColumnVariants() { + _activeSegment = _mvSegment; + + // Match all — should return all unique values + BaseOperator<DistinctResultsBlock> matchAllOp = getOperator( + "SELECT DISTINCT mvIntColumn FROM testTable WHERE svFilterColumn >= 0 LIMIT 1000 " + OPT_INV); + DistinctTable matchAllTable = matchAllOp.nextBlock().getDistinctTable(); + assertEquals(extractIntValues(matchAllTable), _allMvValues); + + // LIMIT + BaseOperator<DistinctResultsBlock> limitOp = getOperator( + "SELECT DISTINCT mvIntColumn FROM testTable WHERE svFilterColumn >= 0 LIMIT 10 " + OPT_INV); + assertEquals(limitOp.nextBlock().getDistinctTable().size(), 10); + + // ORDER BY DESC + BaseOperator<DistinctResultsBlock> descOp = getOperator( + "SELECT DISTINCT mvIntColumn FROM testTable WHERE svFilterColumn < 500 " + + "ORDER BY mvIntColumn DESC LIMIT 1000 " + OPT_INV); + BaseOperator<DistinctResultsBlock> descScanOp = getOperator( + "SELECT DISTINCT mvIntColumn FROM testTable WHERE svFilterColumn < 500 " + + "ORDER BY mvIntColumn DESC LIMIT 1000 " + OPT_SCAN); + assertEquals(extractIntValues(descOp.nextBlock().getDistinctTable()), + extractIntValues(descScanOp.nextBlock().getDistinctTable())); + + // Selective filter: docs 0,1,2 → values {0,1,2,3} + BaseOperator<DistinctResultsBlock> selectiveOp = getOperator( + "SELECT DISTINCT mvIntColumn FROM testTable WHERE svFilterColumn < 3 " + + "ORDER BY mvIntColumn LIMIT 100 " + OPT_INV); + assertEquals(extractIntValues(selectiveOp.nextBlock().getDistinctTable()), + new HashSet<>(Arrays.asList(0, 1, 2, 3))); + + // Empty filter + BaseOperator<DistinctResultsBlock> emptyOp = getOperator( + "SELECT DISTINCT mvIntColumn FROM testTable WHERE svFilterColumn > 99999 LIMIT 1000 " + OPT_INV); + assertEquals(emptyOp.nextBlock().getDistinctTable().size(), 0); + } + + // ==================== Sorted Index Tests ==================== + + @Test + public void testSortedColumnPath() { + _activeSegment = _sortedSegment; + + // Should use sorted index path + BaseOperator<DistinctResultsBlock> op = getOperator( + "SELECT DISTINCT sortedColumn FROM testTable WHERE filterColumn >= 0 LIMIT 1000 " + OPT + ")"); + DistinctTable table = op.nextBlock().getDistinctTable(); + assertTrue(usedSortedIndex(op)); + assertEquals(table.size(), SORTED_NUM_UNIQUE); + } + + @Test + public void testSortedColumnFilters() { + _activeSegment = _sortedSegment; + + // Selective filter: filterColumn < 500 → sorted values 0..4 + BaseOperator<DistinctResultsBlock> selOp = getOperator( + "SELECT DISTINCT sortedColumn FROM testTable WHERE filterColumn < 500 LIMIT 1000 " + OPT + ")"); + Set<Integer> expected = new HashSet<>(); + for (int i = 0; i < 5; i++) { + expected.add(i); + } + assertEquals(extractIntValues(selOp.nextBlock().getDistinctTable()), expected); + + // Sparse filter: filterColumn=50 (value 0) OR filterColumn=150 (value 1) + BaseOperator<DistinctResultsBlock> sparseOp = getOperator( + "SELECT DISTINCT sortedColumn FROM testTable WHERE filterColumn = 50 OR filterColumn = 150 " + + "LIMIT 1000 " + OPT + ")"); + assertEquals(extractIntValues(sparseOp.nextBlock().getDistinctTable()), Set.of(0, 1)); + + // Empty filter + BaseOperator<DistinctResultsBlock> emptyOp = getOperator( + "SELECT DISTINCT sortedColumn FROM testTable WHERE filterColumn > 99999 LIMIT 1000 " + OPT + ")"); + DistinctTable emptyTable = emptyOp.nextBlock().getDistinctTable(); + assertTrue(usedSortedIndex(emptyOp)); + assertEquals(emptyTable.size(), 0); + } + + @Test + public void testSortedColumnOrderByAndLimit() { + _activeSegment = _sortedSegment; + + // LIMIT + BaseOperator<DistinctResultsBlock> limitOp = getOperator( + "SELECT DISTINCT sortedColumn FROM testTable WHERE filterColumn >= 0 LIMIT 10 " + OPT + ")"); + assertEquals(limitOp.nextBlock().getDistinctTable().size(), 10); + + // Matches scan path results + BaseOperator<DistinctResultsBlock> sortedOp = getOperator( + "SELECT DISTINCT sortedColumn FROM testTable WHERE filterColumn < 500 LIMIT 1000 " + OPT + ")"); + BaseOperator<DistinctResultsBlock> scanOp = getOperator( + "SELECT DISTINCT sortedColumn FROM testTable WHERE filterColumn < 500 LIMIT 1000"); + assertEquals(extractIntValues(sortedOp.nextBlock().getDistinctTable()), + extractIntValues(scanOp.nextBlock().getDistinctTable())); + + // ORDER BY DESC + BaseOperator<DistinctResultsBlock> descOp = getOperator( + "SELECT DISTINCT sortedColumn FROM testTable WHERE filterColumn < 500 " + + "ORDER BY sortedColumn DESC LIMIT 1000 " + OPT + ")"); + BaseOperator<DistinctResultsBlock> descScanOp = getOperator( + "SELECT DISTINCT sortedColumn FROM testTable WHERE filterColumn < 500 " + + "ORDER BY sortedColumn DESC LIMIT 1000"); + assertEquals(extractIntValues(descOp.nextBlock().getDistinctTable()), + extractIntValues(descScanOp.nextBlock().getDistinctTable())); + + // ORDER BY DESC with LIMIT + int limit = 5; + BaseOperator<DistinctResultsBlock> descLimitOp = getOperator( + "SELECT DISTINCT sortedColumn FROM testTable WHERE filterColumn >= 0 " + + "ORDER BY sortedColumn DESC LIMIT " + limit + " " + OPT + ")"); + BaseOperator<DistinctResultsBlock> descLimitScanOp = getOperator( + "SELECT DISTINCT sortedColumn FROM testTable WHERE filterColumn >= 0 " + + "ORDER BY sortedColumn DESC LIMIT " + limit); + DistinctTable descLimitTable = descLimitOp.nextBlock().getDistinctTable(); + assertEquals(extractIntValues(descLimitTable), + extractIntValues(descLimitScanOp.nextBlock().getDistinctTable())); + assertEquals(descLimitTable.size(), limit); + } + + // ==================== STRING Tests ==================== + + @Test + public void testStringColumnWithFilter() { + _activeSegment = _stringSegment; + + BaseOperator<DistinctResultsBlock> op = getOperator( + "SELECT DISTINCT stringColumn FROM testTable WHERE filterColumn < 500 LIMIT 1000 " + OPT_INV); + DistinctTable table = op.nextBlock().getDistinctTable(); + assertTrue(usedInvertedIndex(op)); + assertEquals(table.size(), STRING_NUM_UNIQUE); + } + + @Test + public void testStringColumnInvertedVsScan() { + _activeSegment = _stringSegment; + + BaseOperator<DistinctResultsBlock> invertedOp = getOperator( + "SELECT DISTINCT stringColumn FROM testTable WHERE filterColumn < 200 " + + "ORDER BY stringColumn LIMIT 1000 " + OPT_INV); + DistinctTable invertedTable = invertedOp.nextBlock().getDistinctTable(); + assertTrue(usedInvertedIndex(invertedOp)); + + BaseOperator<DistinctResultsBlock> scanOp = getOperator( + "SELECT DISTINCT stringColumn FROM testTable WHERE filterColumn < 200 " + + "ORDER BY stringColumn LIMIT 1000 " + OPT_SCAN); + assertEquals(extractStringValues(invertedTable), extractStringValues(scanOp.nextBlock().getDistinctTable())); + } + + @Test + public void testStringColumnVariants() { + _activeSegment = _stringSegment; + + // ORDER BY DESC + BaseOperator<DistinctResultsBlock> descOp = getOperator( + "SELECT DISTINCT stringColumn FROM testTable WHERE filterColumn >= 0 " + + "ORDER BY stringColumn DESC LIMIT 1000 " + OPT_INV); + BaseOperator<DistinctResultsBlock> descScanOp = getOperator( + "SELECT DISTINCT stringColumn FROM testTable WHERE filterColumn >= 0 " + + "ORDER BY stringColumn DESC LIMIT 1000 " + OPT_SCAN); + assertEquals(extractStringValues(descOp.nextBlock().getDistinctTable()), + extractStringValues(descScanOp.nextBlock().getDistinctTable())); + + // ORDER BY DESC with LIMIT + BaseOperator<DistinctResultsBlock> descLimitOp = getOperator( + "SELECT DISTINCT stringColumn FROM testTable WHERE filterColumn >= 0 " + + "ORDER BY stringColumn DESC LIMIT 5 " + OPT_INV); + BaseOperator<DistinctResultsBlock> descLimitScanOp = getOperator( + "SELECT DISTINCT stringColumn FROM testTable WHERE filterColumn >= 0 " + + "ORDER BY stringColumn DESC LIMIT 5"); + DistinctTable descLimitStrTable = descLimitOp.nextBlock().getDistinctTable(); + assertEquals(extractStringValues(descLimitStrTable), + extractStringValues(descLimitScanOp.nextBlock().getDistinctTable())); + assertEquals(descLimitStrTable.size(), 5); + + // Empty filter + BaseOperator<DistinctResultsBlock> emptyOp = getOperator( + "SELECT DISTINCT stringColumn FROM testTable WHERE filterColumn > 99999 LIMIT 1000 " + OPT_INV); + assertEquals(emptyOp.nextBlock().getDistinctTable().size(), 0); + + // Selective filter + Set<String> allExpected = new HashSet<>(); + for (int i = 0; i < STRING_NUM_UNIQUE; i++) { + allExpected.add(String.format("val_%02d", i)); + } + BaseOperator<DistinctResultsBlock> selectiveOp = getOperator( + "SELECT DISTINCT stringColumn FROM testTable WHERE filterColumn < 100 " + + "ORDER BY stringColumn LIMIT 100 " + OPT_INV); + assertEquals(extractStringValues(selectiveOp.nextBlock().getDistinctTable()), allExpected); + } + + // ==================== Null Handling Tests ==================== + + @Test + public void testNullIncludedWithWideFilter() { + _activeSegment = _nullSegment; + + BaseOperator<DistinctResultsBlock> op = getOperator( + "SELECT DISTINCT intColumn FROM testTable WHERE filterColumn >= 0 LIMIT 1000 " + + OPT + ", invertedIndexDistinctCostRatio=1, enableNullHandling=true)"); + DistinctTable table = op.nextBlock().getDistinctTable(); Review Comment: The null-handling tests validate server-side DistinctTable contents, but they don’t exercise broker-side limiting behavior (DistinctTable#toResultTableWithoutOrderBy only includes null when numValues < limit). Please add a regression test that calls toResultTable() (or otherwise verifies final results) for a query with nulls, no ORDER BY, and LIMIT equal to the number of non-null distinct values so that null must be preserved under the limit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
