Copilot commented on code in PR #17247:
URL: https://github.com/apache/pinot/pull/17247#discussion_r2659338310


##########
pinot-core/src/main/java/org/apache/pinot/core/query/distinct/DistinctEarlyTerminationContext.java:
##########
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.distinct;
+
+import java.util.function.LongSupplier;
+
+/**
+ * Tracks per-block early-termination budgets for distinct executors (row 
limits and no-change limits).
+ * <p>All distinct executors should delegate to this class so that query 
options such as
+ * {@code maxRowsInDistinct} and {@code numRowsWithoutChangeInDistinct} are 
enforced consistently
+ * while processing each {@link 
org.apache.pinot.core.operator.blocks.ValueBlock}.</p>
+ */
+public class DistinctEarlyTerminationContext {
+  private static final int UNLIMITED_ROWS = Integer.MAX_VALUE;
+  private static final long UNLIMITED_TIME_NANOS = Long.MAX_VALUE;
+
+  private int _rowsRemaining = UNLIMITED_ROWS;
+  private int _numRowsProcessed = 0;
+  private int _numRowsWithoutChangeLimit = UNLIMITED_ROWS;
+  private int _numRowsWithoutChange = 0;
+  private boolean _numRowsWithoutChangeLimitReached = false;
+  // Absolute deadline (in nanos from the configured time supplier). Using a 
deadline instead of a fixed remaining
+  // value allows executors to stop within a block as time elapses.
+  private long _deadlineTimeNanos = UNLIMITED_TIME_NANOS;
+  private LongSupplier _timeSupplier = System::nanoTime;
+
+  public void setTimeSupplier(LongSupplier timeSupplier) {
+    if (timeSupplier == null || timeSupplier == _timeSupplier) {
+      return;
+    }
+    if (_deadlineTimeNanos != UNLIMITED_TIME_NANOS) {
+      // Preserve the already computed remaining budget when switching time 
sources (primarily for tests).
+      long remainingTimeNanos = getRemainingTimeNanos();
+      _timeSupplier = timeSupplier;
+      setRemainingTimeNanos(remainingTimeNanos);
+    } else {
+      _timeSupplier = timeSupplier;
+    }
+  }
+
+  public void setMaxRowsToProcess(int maxRows) {
+    _rowsRemaining = maxRows;
+  }
+
+  public int getRemainingRowsToProcess() {
+    return _rowsRemaining;
+  }
+
+  public void setNumRowsWithoutChangeInDistinct(int 
numRowsWithoutChangeInDistinct) {
+    _numRowsWithoutChangeLimit = numRowsWithoutChangeInDistinct;
+  }
+
+  public boolean isNumRowsWithoutChangeLimitReached() {
+    return _numRowsWithoutChangeLimitReached;
+  }
+
+  public int getNumRowsProcessed() {
+    return _numRowsProcessed;
+  }
+
+  public int clampToRemaining(int numDocs) {
+    if (_rowsRemaining == UNLIMITED_ROWS) {
+      return numDocs;
+    }
+    if (_rowsRemaining <= 0) {
+      return 0;
+    }
+    return Math.min(numDocs, _rowsRemaining);
+  }
+
+  public void recordRowProcessed(boolean distinctChanged) {
+    _numRowsProcessed++;
+    if (_rowsRemaining != UNLIMITED_ROWS) {
+      _rowsRemaining--;
+    }
+    if (_numRowsWithoutChangeLimit != UNLIMITED_ROWS) {
+      if (distinctChanged) {
+        _numRowsWithoutChange = 0;
+      } else {
+        _numRowsWithoutChange++;
+        if (_numRowsWithoutChange >= _numRowsWithoutChangeLimit) {
+          _numRowsWithoutChangeLimitReached = true;
+        }
+      }
+    }
+  }
+
+  public boolean shouldStopProcessing() {
+    return _rowsRemaining <= 0 || _numRowsWithoutChangeLimitReached || 
getRemainingTimeNanos() <= 0;
+  }
+
+  public long getRemainingTimeNanos() {
+    if (_deadlineTimeNanos == UNLIMITED_TIME_NANOS) {
+      return UNLIMITED_TIME_NANOS;
+    }
+    return _deadlineTimeNanos - _timeSupplier.getAsLong();
+  }
+
+  public void setRemainingTimeNanos(long remainingTimeNanos) {
+    if (remainingTimeNanos == UNLIMITED_TIME_NANOS) {
+      _deadlineTimeNanos = UNLIMITED_TIME_NANOS;
+      return;
+    }
+    long now = _timeSupplier.getAsLong();
+    if (remainingTimeNanos <= 0) {
+      _deadlineTimeNanos = now;
+      return;
+    }
+    try {
+      _deadlineTimeNanos = Math.addExact(now, remainingTimeNanos);
+    } catch (ArithmeticException e) {
+      // Saturate to "unlimited" if the deadline overflows.
+      _deadlineTimeNanos = UNLIMITED_TIME_NANOS;
+    }

Review Comment:
   The overflow handling saturates to unlimited. This may be unexpected if a 
caller passes a very large positive value intending a real deadline. Consider 
documenting this behavior in the method's Javadoc or adding a warning comment 
explaining why saturation to unlimited is safe.



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/query/DistinctOperator.java:
##########
@@ -40,31 +45,119 @@
  */
 public class DistinctOperator extends BaseOperator<DistinctResultsBlock> {
   private static final String EXPLAIN_NAME = "DISTINCT";
+  private static final int UNLIMITED_ROWS = Integer.MAX_VALUE;
 
   private final IndexSegment _indexSegment;
   private final QueryContext _queryContext;
   private final BaseProjectOperator<?> _projectOperator;
 
   private int _numDocsScanned = 0;
+  private final int _maxRowsInDistinct;
+  private final int _numRowsWithoutChangeInDistinct;
+  private int _numRowsWithoutNewDistinct = 0;
+  private boolean _hitMaxRowsLimit = false;
+  private boolean _hitNoChangeLimit = false;
+  private final long _maxExecutionTimeNs;
+  private boolean _hitTimeLimit = false;
+  private final LongSupplier _timeSupplier;
 
   public DistinctOperator(IndexSegment indexSegment, QueryContext queryContext,
       BaseProjectOperator<?> projectOperator) {
+    this(indexSegment, queryContext, projectOperator, System::nanoTime);
+  }
+
+  DistinctOperator(IndexSegment indexSegment, QueryContext queryContext, 
BaseProjectOperator<?> projectOperator,
+      LongSupplier timeSupplier) {
     _indexSegment = indexSegment;
     _queryContext = queryContext;
     _projectOperator = projectOperator;
+    _timeSupplier = timeSupplier;
+    Map<String, String> queryOptions = queryContext.getQueryOptions();
+    if (queryOptions != null) {
+      Integer maxRowsInDistinct = 
QueryOptionsUtils.getMaxRowsInDistinct(queryOptions);
+      _maxRowsInDistinct = maxRowsInDistinct != null ? maxRowsInDistinct : 
UNLIMITED_ROWS;
+      Integer numRowsWithoutChange = 
QueryOptionsUtils.getNumRowsWithoutChangeInDistinct(queryOptions);
+      _numRowsWithoutChangeInDistinct =
+          numRowsWithoutChange != null ? numRowsWithoutChange : UNLIMITED_ROWS;
+      Long maxExecutionTimeMs = 
QueryOptionsUtils.getMaxExecutionTimeMsInDistinct(queryOptions);
+      _maxExecutionTimeNs =
+          maxExecutionTimeMs != null ? 
TimeUnit.MILLISECONDS.toNanos(maxExecutionTimeMs) : Long.MAX_VALUE;
+    } else {
+      _maxRowsInDistinct = UNLIMITED_ROWS;
+      _numRowsWithoutChangeInDistinct = UNLIMITED_ROWS;
+      _maxExecutionTimeNs = Long.MAX_VALUE;
+    }
   }
 
   @Override
   protected DistinctResultsBlock getNextBlock() {
     DistinctExecutor executor = 
DistinctExecutorFactory.getDistinctExecutor(_projectOperator, _queryContext);
+    executor.setTimeSupplier(_timeSupplier);
+    executor.setMaxRowsToProcess(_maxRowsInDistinct);
+    
executor.setNumRowsWithoutChangeInDistinct(_numRowsWithoutChangeInDistinct);
     ValueBlock valueBlock;
+    boolean enforceRowLimit = _maxRowsInDistinct != UNLIMITED_ROWS;
+    boolean enforceNoChangeLimit = _numRowsWithoutChangeInDistinct != 
UNLIMITED_ROWS;
+    boolean enforceTimeLimit = _maxExecutionTimeNs != Long.MAX_VALUE;
+    final long startTimeNs = _timeSupplier.getAsLong();
     while ((valueBlock = _projectOperator.nextBlock()) != null) {
-      _numDocsScanned += valueBlock.getNumDocs();
-      if (executor.process(valueBlock)) {
+      if (enforceTimeLimit) {
+        long elapsed = _timeSupplier.getAsLong() - startTimeNs;
+        long remaining = _maxExecutionTimeNs - elapsed;
+        executor.setRemainingTimeNanos(remaining);
+      }
+      if (enforceTimeLimit && hasExceededTimeLimit(startTimeNs)) {
+        _hitTimeLimit = true;
+        break;
+      }
+      if (enforceRowLimit && executor.getRemainingRowsToProcess() <= 0) {
+        _hitMaxRowsLimit = true;
+        break;
+      }
+      if (enforceNoChangeLimit && 
executor.isNumRowsWithoutChangeLimitReached()) {
+        _hitNoChangeLimit = true;
         break;
       }
+      int rowsProcessedBefore = executor.getNumRowsProcessed();
+      int distinctCountBeforeBlock = enforceNoChangeLimit ? 
executor.getNumDistinctRowsCollected() : -1;
+      boolean satisfied = executor.process(valueBlock);
+      int rowsProcessedForBlock = executor.getNumRowsProcessed() - 
rowsProcessedBefore;
+      _numDocsScanned += rowsProcessedForBlock;
+      if (enforceRowLimit && _numDocsScanned >= _maxRowsInDistinct) {
+        _hitMaxRowsLimit = true;
+      }
+      if (enforceNoChangeLimit) {
+        if (executor.isNumRowsWithoutChangeLimitReached()) {
+          _hitNoChangeLimit = true;
+        } else {
+          int distinctCountAfterBlock = executor.getNumDistinctRowsCollected();
+          if (distinctCountAfterBlock > distinctCountBeforeBlock) {
+            _numRowsWithoutNewDistinct = 0;
+          } else {
+            _numRowsWithoutNewDistinct += rowsProcessedForBlock;
+            if (_numRowsWithoutNewDistinct >= _numRowsWithoutChangeInDistinct) 
{
+              _hitNoChangeLimit = true;
+            }
+          }
+        }
+      }

Review Comment:
   The no-change tracking logic in the operator duplicates the executor's 
internal tracking. If the executor already tracks and signals when the limit is 
reached, the operator could rely solely on 
`executor.isNumRowsWithoutChangeLimitReached()` rather than maintaining 
separate state. This would reduce duplication and potential for drift.



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/query/DistinctOperator.java:
##########
@@ -40,31 +45,119 @@
  */
 public class DistinctOperator extends BaseOperator<DistinctResultsBlock> {
   private static final String EXPLAIN_NAME = "DISTINCT";
+  private static final int UNLIMITED_ROWS = Integer.MAX_VALUE;
 
   private final IndexSegment _indexSegment;
   private final QueryContext _queryContext;
   private final BaseProjectOperator<?> _projectOperator;
 
   private int _numDocsScanned = 0;
+  private final int _maxRowsInDistinct;
+  private final int _numRowsWithoutChangeInDistinct;
+  private int _numRowsWithoutNewDistinct = 0;
+  private boolean _hitMaxRowsLimit = false;
+  private boolean _hitNoChangeLimit = false;
+  private final long _maxExecutionTimeNs;
+  private boolean _hitTimeLimit = false;
+  private final LongSupplier _timeSupplier;
 
   public DistinctOperator(IndexSegment indexSegment, QueryContext queryContext,
       BaseProjectOperator<?> projectOperator) {
+    this(indexSegment, queryContext, projectOperator, System::nanoTime);
+  }
+
+  DistinctOperator(IndexSegment indexSegment, QueryContext queryContext, 
BaseProjectOperator<?> projectOperator,
+      LongSupplier timeSupplier) {
     _indexSegment = indexSegment;
     _queryContext = queryContext;
     _projectOperator = projectOperator;
+    _timeSupplier = timeSupplier;
+    Map<String, String> queryOptions = queryContext.getQueryOptions();
+    if (queryOptions != null) {
+      Integer maxRowsInDistinct = 
QueryOptionsUtils.getMaxRowsInDistinct(queryOptions);
+      _maxRowsInDistinct = maxRowsInDistinct != null ? maxRowsInDistinct : 
UNLIMITED_ROWS;
+      Integer numRowsWithoutChange = 
QueryOptionsUtils.getNumRowsWithoutChangeInDistinct(queryOptions);
+      _numRowsWithoutChangeInDistinct =
+          numRowsWithoutChange != null ? numRowsWithoutChange : UNLIMITED_ROWS;
+      Long maxExecutionTimeMs = 
QueryOptionsUtils.getMaxExecutionTimeMsInDistinct(queryOptions);
+      _maxExecutionTimeNs =
+          maxExecutionTimeMs != null ? 
TimeUnit.MILLISECONDS.toNanos(maxExecutionTimeMs) : Long.MAX_VALUE;
+    } else {
+      _maxRowsInDistinct = UNLIMITED_ROWS;
+      _numRowsWithoutChangeInDistinct = UNLIMITED_ROWS;
+      _maxExecutionTimeNs = Long.MAX_VALUE;
+    }
   }
 
   @Override
   protected DistinctResultsBlock getNextBlock() {
     DistinctExecutor executor = 
DistinctExecutorFactory.getDistinctExecutor(_projectOperator, _queryContext);
+    executor.setTimeSupplier(_timeSupplier);
+    executor.setMaxRowsToProcess(_maxRowsInDistinct);
+    
executor.setNumRowsWithoutChangeInDistinct(_numRowsWithoutChangeInDistinct);
     ValueBlock valueBlock;

Review Comment:
   These boolean flags are computed before the loop but checked repeatedly 
inside. For clarity and maintainability, consider extracting them into a small 
helper method or adding a comment explaining why they are precomputed.
   ```suggestion
       ValueBlock valueBlock;
       // Precompute enforcement flags once before the loop to avoid repeated 
comparisons in each iteration.
   ```



##########
pinot-core/src/main/java/org/apache/pinot/core/query/distinct/DistinctEarlyTerminationContext.java:
##########
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.distinct;
+
+import java.util.function.LongSupplier;
+
+/**
+ * Tracks per-block early-termination budgets for distinct executors (row 
limits and no-change limits).
+ * <p>All distinct executors should delegate to this class so that query 
options such as
+ * {@code maxRowsInDistinct} and {@code numRowsWithoutChangeInDistinct} are 
enforced consistently
+ * while processing each {@link 
org.apache.pinot.core.operator.blocks.ValueBlock}.</p>
+ */
+public class DistinctEarlyTerminationContext {
+  private static final int UNLIMITED_ROWS = Integer.MAX_VALUE;
+  private static final long UNLIMITED_TIME_NANOS = Long.MAX_VALUE;
+
+  private int _rowsRemaining = UNLIMITED_ROWS;
+  private int _numRowsProcessed = 0;
+  private int _numRowsWithoutChangeLimit = UNLIMITED_ROWS;
+  private int _numRowsWithoutChange = 0;
+  private boolean _numRowsWithoutChangeLimitReached = false;
+  // Absolute deadline (in nanos from the configured time supplier). Using a 
deadline instead of a fixed remaining
+  // value allows executors to stop within a block as time elapses.

Review Comment:
   Comment references "allows executors to stop within a block" but does not 
explain the tradeoff. Consider adding a brief note about why a deadline 
approach (rather than decrementing a counter) is chosen—e.g., for monotonic 
accuracy when time sources switch.
   ```suggestion
     // Absolute deadline (in nanos from the configured time supplier). Using a 
deadline instead of decrementing a
     // remaining-time counter allows executors to stop within a block as time 
elapses, and preserves a monotonic,
     // accurate budget even when the time source is switched in 
setTimeSupplier().
   ```



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/DistinctResultsBlockMerger.java:
##########
@@ -18,18 +18,69 @@
  */
 package org.apache.pinot.core.operator.combine.merger;
 
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
+import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
 import org.apache.pinot.core.operator.blocks.results.DistinctResultsBlock;
+import org.apache.pinot.core.query.request.context.QueryContext;
 
 
 public class DistinctResultsBlockMerger implements 
ResultsBlockMerger<DistinctResultsBlock> {
+  private final int _maxRowsAcrossSegments;
+  private boolean _rowBudgetReached;
+  private boolean _timeLimitReached;
+
+  public DistinctResultsBlockMerger(QueryContext queryContext) {
+    if (queryContext.getQueryOptions() != null) {
+      Integer maxRows = 
QueryOptionsUtils.getMaxRowsInDistinct(queryContext.getQueryOptions());
+      _maxRowsAcrossSegments = maxRows != null ? maxRows : Integer.MAX_VALUE;
+    } else {
+      _maxRowsAcrossSegments = Integer.MAX_VALUE;
+    }
+  }

Review Comment:
   The constructor logic can be simplified by extracting the fallback to 
`Integer.MAX_VALUE` into a helper or using `Objects.requireNonNullElse` 
pattern. This reduces nested conditionals and improves readability.



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/DistinctQueriesTest.java:
##########
@@ -0,0 +1,279 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.custom;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import 
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+@Test(suiteName = "CustomClusterIntegrationTest")
+public class DistinctQueriesTest extends CustomDataQueryClusterIntegrationTest 
{
+  private static final String TABLE_NAME = "DistinctQueriesCustomTest";
+
+  private static final String INT_COL = "intCol";
+  private static final String LONG_COL = "longCol";
+  private static final String DOUBLE_COL = "doubleCol";
+  private static final String STRING_COL = "stringCol";
+  private static final String MV_INT_COL = "intArrayCol";
+  private static final String MV_STRING_COL = "stringArrayCol";
+
+  // Keep the dataset modest to avoid slowing down the suite while still 
exercising early termination.
+  private static final int NUM_ROWS_PER_SEGMENT = 50_000;
+  private static final int NUM_INT_VALUES = 5;
+  private static final int NUM_LONG_VALUES = 5;
+  private static final int NUM_DOUBLE_VALUES = 3;
+  private static final int NUM_STRING_VALUES = 4;
+  private static final int NUM_MV_INT_VALUES = 3;
+  private static final long LONG_BASE_VALUE = 1_000L;
+  private static final double DOUBLE_OFFSET = 0.25d;
+  private static final int MV_OFFSET = 50;
+
+  @Override
+  protected long getCountStarResult() {
+    return NUM_ROWS_PER_SEGMENT * 2;
+  }
+
+  @Override
+  public String getTableName() {
+    return TABLE_NAME;
+  }
+
+  @Override
+  public Schema createSchema() {
+    return new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+        .addSingleValueDimension(INT_COL, FieldSpec.DataType.INT)
+        .addSingleValueDimension(LONG_COL, FieldSpec.DataType.LONG)
+        .addSingleValueDimension(DOUBLE_COL, FieldSpec.DataType.DOUBLE)
+        .addSingleValueDimension(STRING_COL, FieldSpec.DataType.STRING)
+        .addMultiValueDimension(MV_INT_COL, FieldSpec.DataType.INT)
+        .addMultiValueDimension(MV_STRING_COL, FieldSpec.DataType.STRING)
+        .build();
+  }
+
+  @Override
+  public List<File> createAvroFiles()
+      throws Exception {
+    org.apache.avro.Schema avroSchema =
+        SchemaBuilder.record("DistinctRecord").fields()
+            .requiredInt(INT_COL)
+            .requiredLong(LONG_COL)
+            .requiredDouble(DOUBLE_COL)
+            .requiredString(STRING_COL)
+            .name(MV_INT_COL).type().array().items().intType().noDefault()
+            
.name(MV_STRING_COL).type().array().items().stringType().noDefault()
+            .endRecord();
+
+    File avroFile = new File(_tempDir, "distinct-data.avro");
+    try (DataFileWriter<GenericData.Record> writer = new DataFileWriter<>(new 
GenericDatumWriter<>(avroSchema))) {
+      writer.create(avroSchema, avroFile);
+      for (int i = 0; i < NUM_ROWS_PER_SEGMENT; i++) {
+        GenericData.Record record = new GenericData.Record(avroSchema);
+        record.put(INT_COL, getIntValue(i));
+        record.put(LONG_COL, getLongValue(i));
+        record.put(DOUBLE_COL, getDoubleValue(i));
+        record.put(STRING_COL, getStringValue(i));
+        record.put(MV_INT_COL, List.of(getMultiValueBase(i), 
getMultiValueBase(i) + MV_OFFSET));
+        record.put(MV_STRING_COL, List.of(getStringValue(i), getStringValue(i 
+ MV_OFFSET)));
+        writer.append(record);
+      }
+    }
+    File avroFile1 = new File(_tempDir, "distinct-data-1.avro");
+    try (DataFileWriter<GenericData.Record> writer = new DataFileWriter<>(new 
GenericDatumWriter<>(avroSchema))) {
+      writer.create(avroSchema, avroFile1);
+      for (int i = 0; i < NUM_ROWS_PER_SEGMENT; i++) {
+        GenericData.Record record = new GenericData.Record(avroSchema);
+        record.put(INT_COL, getIntValue(i));
+        record.put(LONG_COL, getLongValue(i));
+        record.put(DOUBLE_COL, getDoubleValue(i));
+        record.put(STRING_COL, getStringValue(i));
+        record.put(MV_INT_COL, List.of(getMultiValueBase(i), 
getMultiValueBase(i) + MV_OFFSET));
+        record.put(MV_STRING_COL, List.of(getStringValue(i), getStringValue(i 
+ MV_OFFSET)));
+        writer.append(record);
+      }
+    }
+    return List.of(avroFile, avroFile1);
+  }
+
+  @Override
+  protected String getSortedColumn() {
+    return null;
+  }
+
+  private int getIntValue(int recordId) {
+    return recordId % NUM_INT_VALUES;
+  }
+
+  private long getLongValue(int recordId) {
+    return LONG_BASE_VALUE + (recordId % NUM_LONG_VALUES);
+  }
+
+  private double getDoubleValue(int recordId) {
+    return (recordId % NUM_DOUBLE_VALUES) + DOUBLE_OFFSET;
+  }
+
+  private String getStringValue(int recordId) {
+    return "type_" + (recordId % NUM_STRING_VALUES);
+  }
+
+  private int getMultiValueBase(int recordId) {
+    return recordId % NUM_MV_INT_VALUES;
+  }
+
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testDistinctSingleValuedColumns(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    assertDistinctColumnValues(INT_COL, getExpectedIntValues(), 
JsonNode::asInt);
+    assertDistinctColumnValues(LONG_COL, getExpectedLongValues(), 
JsonNode::asLong);
+    assertDistinctColumnValues(DOUBLE_COL, getExpectedDoubleValues(), 
JsonNode::asDouble);
+    assertDistinctColumnValues(STRING_COL, getExpectedStringValues(), 
JsonNode::textValue);
+  }
+
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testDistinctMultiValueColumn(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    String query = useMultiStageQueryEngine ? String.format("SELECT 
ARRAY_TO_MV(%s) FROM %s GROUP BY 1", MV_INT_COL,
+        getTableName()) : String.format("SELECT DISTINCT %s FROM %s", 
MV_INT_COL, getTableName());
+    JsonNode result = postQuery(query);
+    JsonNode rows = result.get("resultTable").get("rows");
+    Set<Integer> actual = new HashSet<>();
+    for (JsonNode row : rows) {
+      actual.add(row.get(0).asInt());
+    }
+    assertEquals(actual, getExpectedMultiValueEntries());
+  }
+
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testDistinctMultipleColumns(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    String query =
+        String.format("SELECT DISTINCT %s, %s FROM %s ORDER BY %s, %s LIMIT 
1000", INT_COL, STRING_COL, getTableName(),
+            INT_COL, STRING_COL);
+    JsonNode rows = postQuery(query).get("resultTable").get("rows");
+    List<String> actual = new ArrayList<>(rows.size());
+    for (JsonNode row : rows) {
+      actual.add(row.get(0).asInt() + "|" + row.get(1).textValue());
+    }
+    assertEquals(actual, getExpectedIntStringPairs());
+  }
+
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testMaxRowsInDistinctEarlyTermination(boolean 
useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    String sql = String.format("SELECT DISTINCT %s FROM %s WHERE rand() < 1.0 
LIMIT 100", STRING_COL, getTableName());
+    JsonNode response =
+        postQueryWithOptions(sql, useMultiStageQueryEngine, 
Map.of(QueryOptionKey.MAX_ROWS_IN_DISTINCT, "3"));
+    assertTrue(response.path("maxRowsInDistinctReached").asBoolean(false),
+        "expected maxRowsInDistinctReached flag. Response: " + response);
+    assertTrue(response.path("partialResult").asBoolean(false), "partialResult 
should be true. Response: " + response);
+    assertEquals(response.get("resultTable").get("rows").size(), 3, "row count 
should honor threshold");
+    assertEquals(response.path("numDocsScanned").asLong() % 3, 0,
+        "expected 3 or 6 rows scanned when budget is exhausted. Response: " + 
response);

Review Comment:
   The assertion checks `numDocsScanned % 3 == 0` and the message says 
"expected 3 or 6 rows." This is fragile if the implementation changes how 
blocks are processed. Consider asserting a more explicit upper bound (e.g., `<= 
6`) or documenting why modulo 3 is guaranteed.
   ```suggestion
       long numDocsScanned = response.path("numDocsScanned").asLong();
       assertTrue(numDocsScanned <= 6,
           "expected at most 6 rows scanned when budget is exhausted. 
numDocsScanned=" + numDocsScanned
               + ", response: " + response);
   ```



##########
pinot-core/src/test/java/org/apache/pinot/core/query/distinct/DistinctExecutorEarlyTerminationTest.java:
##########
@@ -0,0 +1,565 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.distinct;
+
+import it.unimi.dsi.fastutil.ints.IntSet;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.operator.blocks.ValueBlock;
+import 
org.apache.pinot.core.query.distinct.dictionary.DictionaryBasedMultiColumnDistinctExecutor;
+import org.apache.pinot.core.query.distinct.raw.IntDistinctExecutor;
+import org.apache.pinot.core.query.distinct.raw.RawMultiColumnDistinctExecutor;
+import org.apache.pinot.core.query.distinct.table.DistinctTable;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+public class DistinctExecutorEarlyTerminationTest {
+
+  @Test
+  public void testRawSingleColumnExecutorStopsWithinBlock() {
+    ExpressionContext expression = ExpressionContext.forIdentifier("c1");
+    IntDistinctExecutor executor =
+        new IntDistinctExecutor(expression, DataType.INT, /*limit*/ 10, false, 
null);
+    executor.setMaxRowsToProcess(3);
+    int[] values = new int[]{10, 11, 12, 13, 14};
+    ValueBlock block = new SimpleValueBlock(5, Map.of(expression, new 
IntBlockValSet(values)));
+
+    executor.process(block);
+
+    assertEquals(executor.getRemainingRowsToProcess(), 0, "row budget should 
be exhausted");
+    assertEquals(executor.getNumDistinctRowsCollected(), 3, "should only read 
up to the row budget");
+  }
+
+  @Test
+  public void testRawMultiColumnExecutorStopsWithinBlock() {
+    ExpressionContext col1 = ExpressionContext.forIdentifier("c1");
+    ExpressionContext col2 = ExpressionContext.forIdentifier("c2");
+    List<ExpressionContext> expressions = List.of(col1, col2);
+    DataSchema schema = new DataSchema(new String[]{"c1", "c2"},
+        new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.INT});
+    RawMultiColumnDistinctExecutor executor =
+        new RawMultiColumnDistinctExecutor(expressions, false, schema, 10, 
false, null);
+    executor.setMaxRowsToProcess(2);
+
+    Map<ExpressionContext, BlockValSet> blockValSets = Map.of(
+        col1, new IntBlockValSet(new int[]{0, 1, 2, 3, 4}),
+        col2, new IntBlockValSet(new int[]{10, 11, 12, 13, 14})
+    );
+    ValueBlock block = new SimpleValueBlock(5, blockValSets);
+
+    executor.process(block);
+    DistinctTable result = executor.getResult();
+
+    assertEquals(executor.getRemainingRowsToProcess(), 0);
+    assertEquals(result.size(), 2);
+  }
+
+  @Test
+  public void testDictionaryBasedMultiColumnExecutorStopsWithinBlock() {
+    ExpressionContext col1 = ExpressionContext.forIdentifier("c1");
+    ExpressionContext col2 = ExpressionContext.forIdentifier("c2");
+    List<ExpressionContext> expressions = List.of(col1, col2);
+    DataSchema schema = new DataSchema(new String[]{"c1", "c2"},
+        new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.INT});
+    Dictionary dictionary = new SimpleIntDictionary(new int[]{0, 1, 2, 3, 4});
+    DictionaryBasedMultiColumnDistinctExecutor executor =
+        new DictionaryBasedMultiColumnDistinctExecutor(expressions, false, 
schema,
+            List.of(dictionary, dictionary), 10, false, null);
+    executor.setMaxRowsToProcess(4);
+
+    Map<ExpressionContext, BlockValSet> blockValSets = Map.of(
+        col1, new DictionaryBlockValSet(new int[]{0, 1, 2, 3, 4}),
+        col2, new DictionaryBlockValSet(new int[]{4, 3, 2, 1, 0})
+    );
+    ValueBlock block = new SimpleValueBlock(5, blockValSets);
+
+    executor.process(block);
+    DistinctTable result = executor.getResult();
+
+    assertEquals(executor.getRemainingRowsToProcess(), 0);
+    assertEquals(result.size(), 4);
+  }
+
+  @Test
+  public void testRawSingleColumnExecutorStopsAfterNoChangeWithinBlock() {
+    ExpressionContext expression = ExpressionContext.forIdentifier("c1");
+    IntDistinctExecutor executor =
+        new IntDistinctExecutor(expression, DataType.INT, /*limit*/ 10, false, 
null);
+    executor.setNumRowsWithoutChangeInDistinct(2);
+    int[] values = new int[]{10, 10, 10, 11};
+    ValueBlock block = new SimpleValueBlock(4, Map.of(expression, new 
IntBlockValSet(values)));
+
+    boolean satisfied = executor.process(block);
+
+    assertTrue(satisfied, "should terminate when no-change budget is hit 
within a block");
+    assertTrue(executor.isNumRowsWithoutChangeLimitReached());
+    assertEquals(executor.getNumRowsProcessed(), 3);
+    assertEquals(executor.getNumDistinctRowsCollected(), 1);
+  }
+
+  @Test
+  public void testRawMultiColumnExecutorStopsAfterNoChangeWithinBlock() {
+    ExpressionContext col1 = ExpressionContext.forIdentifier("c1");
+    ExpressionContext col2 = ExpressionContext.forIdentifier("c2");
+    List<ExpressionContext> expressions = List.of(col1, col2);
+    DataSchema schema = new DataSchema(new String[]{"c1", "c2"},
+        new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.INT});
+    RawMultiColumnDistinctExecutor executor =
+        new RawMultiColumnDistinctExecutor(expressions, false, schema, 10, 
false, null);
+    executor.setNumRowsWithoutChangeInDistinct(2);
+
+    Map<ExpressionContext, BlockValSet> blockValSets = Map.of(
+        col1, new IntBlockValSet(new int[]{0, 0, 0, 1}),
+        col2, new IntBlockValSet(new int[]{10, 10, 10, 11})
+    );
+    ValueBlock block = new SimpleValueBlock(4, blockValSets);
+
+    boolean satisfied = executor.process(block);
+    DistinctTable result = executor.getResult();
+
+    assertTrue(satisfied);
+    assertTrue(executor.isNumRowsWithoutChangeLimitReached());
+    assertEquals(executor.getNumRowsProcessed(), 3);
+    assertEquals(result.size(), 1);
+  }
+
+  @Test
+  public void 
testDictionaryBasedMultiColumnExecutorStopsAfterNoChangeWithinBlock() {
+    ExpressionContext col1 = ExpressionContext.forIdentifier("c1");
+    ExpressionContext col2 = ExpressionContext.forIdentifier("c2");
+    List<ExpressionContext> expressions = List.of(col1, col2);
+    DataSchema schema = new DataSchema(new String[]{"c1", "c2"},
+        new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.INT});
+    Dictionary dictionary = new SimpleIntDictionary(new int[]{0, 1, 2, 3, 4});
+    DictionaryBasedMultiColumnDistinctExecutor executor =
+        new DictionaryBasedMultiColumnDistinctExecutor(expressions, false, 
schema,
+            List.of(dictionary, dictionary), 10, false, null);
+    executor.setNumRowsWithoutChangeInDistinct(2);
+
+    Map<ExpressionContext, BlockValSet> blockValSets = Map.of(
+        col1, new DictionaryBlockValSet(new int[]{0, 0, 0, 1}),
+        col2, new DictionaryBlockValSet(new int[]{4, 4, 4, 3})
+    );
+    ValueBlock block = new SimpleValueBlock(4, blockValSets);
+
+    boolean satisfied = executor.process(block);
+    DistinctTable result = executor.getResult();
+
+    assertTrue(satisfied);
+    assertTrue(executor.isNumRowsWithoutChangeLimitReached());
+    assertEquals(executor.getNumRowsProcessed(), 3);
+    assertEquals(result.size(), 1);
+  }
+
+  @Test
+  public void testRawSingleColumnExecutorStopsWhenTimeBudgetConsumed() {
+    ExpressionContext expression = ExpressionContext.forIdentifier("c1");
+    IntDistinctExecutor executor =
+        new IntDistinctExecutor(expression, DataType.INT, /*limit*/ 10, false, 
null);

Review Comment:
   Setting `remainingTimeNanos` to 0 is a convenient way to simulate immediate 
timeout in tests. Consider adding a comment explaining this is a deliberate 
test fixture to force early termination, not a production scenario.
   ```suggestion
           new IntDistinctExecutor(expression, DataType.INT, /*limit*/ 10, 
false, null);
       // Set remaining time to 0 to simulate an already exhausted time budget 
in this test, not in production.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to