Jackie-Jiang commented on code in PR #17247:
URL: https://github.com/apache/pinot/pull/17247#discussion_r2692580633


##########
pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/BaseResultsBlock.java:
##########
@@ -37,6 +37,13 @@
  * The {@code BaseResultsBlock} class is the holder of the server side results.
  */
 public abstract class BaseResultsBlock implements Block {
+  public enum EarlyTerminationReason {
+    NONE,
+    DISTINCT_MAX_ROWS,
+    DISTINCT_NO_NEW_VALUES,
+    TIME_LIMIT

Review Comment:
   Do you want to call it `DISTINCT_TIME_LIMIT` to be more specific? In the 
future we can introduce other `TIME_LIMIT` and not mix them



##########
pinot-core/src/main/java/org/apache/pinot/core/query/distinct/DistinctEarlyTerminationContext.java:
##########
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.distinct;
+
+import java.util.function.LongSupplier;
+
+/**
+ * Tracks per-block early-termination budgets for distinct executors (row 
limits and no-change limits).
+ * <p>All distinct executors should delegate to this class so that query 
options such as
+ * {@code maxRowsInDistinct} and {@code numRowsWithoutChangeInDistinct} are 
enforced consistently
+ * while processing each {@link 
org.apache.pinot.core.operator.blocks.ValueBlock}.</p>
+ */
+public class DistinctEarlyTerminationContext {
+  private static final int UNLIMITED_ROWS = Integer.MAX_VALUE;
+  private static final long UNLIMITED_TIME_NANOS = Long.MAX_VALUE;
+
+  private int _rowsRemaining = UNLIMITED_ROWS;
+  private int _numRowsProcessed = 0;
+  private int _numRowsWithoutChangeLimit = UNLIMITED_ROWS;
+  private int _numRowsWithoutChange = 0;
+  private boolean _numRowsWithoutChangeLimitReached = false;
+  private boolean _trackingEnabled = false;
+  // Absolute deadline (in nanos from the configured time supplier). A 
deadline stays consistent with the time source,
+  // enables mid-block checks, and preserves remaining budgets if the time 
supplier changes (e.g. in tests).
+  private long _deadlineTimeNanos = UNLIMITED_TIME_NANOS;
+  private LongSupplier _timeSupplier = System::nanoTime;
+
+  public void setTimeSupplier(LongSupplier timeSupplier) {
+    if (timeSupplier == null || timeSupplier == _timeSupplier) {
+      return;
+    }
+    if (_deadlineTimeNanos != UNLIMITED_TIME_NANOS) {
+      // Preserve the already computed remaining budget when switching time 
sources (primarily for tests).
+      long remainingTimeNanos = getRemainingTimeNanos();
+      _timeSupplier = timeSupplier;
+      setRemainingTimeNanos(remainingTimeNanos);

Review Comment:
   I don't follow why we need this. This logic is never needed in production



##########
pinot-core/src/main/java/org/apache/pinot/core/query/distinct/BaseSingleColumnDistinctExecutor.java:
##########
@@ -106,4 +162,79 @@ private boolean processWithoutNull(BlockValSet 
blockValueSet, int numDocs) {
   public DistinctTable getResult() {
     return _distinctTable;
   }
+
+  @Override
+  public int getNumDistinctRowsCollected() {
+    return _distinctTable.size();
+  }
+
+  @Override
+  public int getRemainingRowsToProcess() {
+    return _earlyTerminationContext.getRemainingRowsToProcess();
+  }
+
+  private int clampToRemaining(int numDocs) {
+    return _earlyTerminationContext.clampToRemaining(numDocs);
+  }
+
+  private void recordRowProcessed(boolean distinctChanged) {
+    _earlyTerminationContext.recordRowProcessed(distinctChanged);
+  }
+
+  private boolean shouldStopProcessing() {
+    return _earlyTerminationContext.shouldStopProcessing();
+  }
+
+  private boolean processSVRange(S values, int from, int to, RoaringBitmap 
nullBitmap, boolean trackDistinctChange) {
+    boolean limitReached = false;
+    boolean nullAdded = _distinctTable.hasNull();
+    for (int docId = from; docId < to; docId++) {
+      if (shouldStopProcessing()) {

Review Comment:
   (MAJOR) This is again doing per row time check.



##########
pinot-core/src/main/java/org/apache/pinot/core/query/distinct/BaseSingleColumnDistinctExecutor.java:
##########
@@ -106,4 +162,79 @@ private boolean processWithoutNull(BlockValSet 
blockValueSet, int numDocs) {
   public DistinctTable getResult() {
     return _distinctTable;
   }
+
+  @Override
+  public int getNumDistinctRowsCollected() {
+    return _distinctTable.size();
+  }
+
+  @Override
+  public int getRemainingRowsToProcess() {
+    return _earlyTerminationContext.getRemainingRowsToProcess();
+  }
+
+  private int clampToRemaining(int numDocs) {
+    return _earlyTerminationContext.clampToRemaining(numDocs);
+  }
+
+  private void recordRowProcessed(boolean distinctChanged) {
+    _earlyTerminationContext.recordRowProcessed(distinctChanged);
+  }
+
+  private boolean shouldStopProcessing() {
+    return _earlyTerminationContext.shouldStopProcessing();
+  }
+
+  private boolean processSVRange(S values, int from, int to, RoaringBitmap 
nullBitmap, boolean trackDistinctChange) {

Review Comment:
   Rename it to `processSVWithTracking`



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/query/DistinctOperator.java:
##########
@@ -40,31 +45,108 @@
  */
 public class DistinctOperator extends BaseOperator<DistinctResultsBlock> {
   private static final String EXPLAIN_NAME = "DISTINCT";
+  private static final int UNLIMITED_ROWS = Integer.MAX_VALUE;
 
   private final IndexSegment _indexSegment;
   private final QueryContext _queryContext;
   private final BaseProjectOperator<?> _projectOperator;
 
   private int _numDocsScanned = 0;
+  private final int _maxRowsInDistinct;
+  private final int _numRowsWithoutChangeInDistinct;
+  private boolean _hitMaxRowsLimit = false;
+  private boolean _hitNoChangeLimit = false;
+  private final long _maxExecutionTimeNs;
+  private boolean _hitTimeLimit = false;
+  private final LongSupplier _timeSupplier;
 
   public DistinctOperator(IndexSegment indexSegment, QueryContext queryContext,
       BaseProjectOperator<?> projectOperator) {
+    this(indexSegment, queryContext, projectOperator, System::nanoTime);
+  }
+
+  DistinctOperator(IndexSegment indexSegment, QueryContext queryContext, 
BaseProjectOperator<?> projectOperator,
+      LongSupplier timeSupplier) {
     _indexSegment = indexSegment;
     _queryContext = queryContext;
     _projectOperator = projectOperator;
+    _timeSupplier = timeSupplier;
+    Map<String, String> queryOptions = queryContext.getQueryOptions();
+    if (queryOptions != null) {
+      Integer maxRowsInDistinct = 
QueryOptionsUtils.getMaxRowsInDistinct(queryOptions);
+      _maxRowsInDistinct = maxRowsInDistinct != null ? maxRowsInDistinct : 
UNLIMITED_ROWS;
+      Integer numRowsWithoutChange = 
QueryOptionsUtils.getNumRowsWithoutChangeInDistinct(queryOptions);
+      _numRowsWithoutChangeInDistinct =
+          numRowsWithoutChange != null ? numRowsWithoutChange : UNLIMITED_ROWS;
+      Long maxExecutionTimeMs = 
QueryOptionsUtils.getMaxExecutionTimeMsInDistinct(queryOptions);
+      _maxExecutionTimeNs =
+          maxExecutionTimeMs != null ? 
TimeUnit.MILLISECONDS.toNanos(maxExecutionTimeMs) : Long.MAX_VALUE;
+    } else {
+      _maxRowsInDistinct = UNLIMITED_ROWS;
+      _numRowsWithoutChangeInDistinct = UNLIMITED_ROWS;
+      _maxExecutionTimeNs = Long.MAX_VALUE;
+    }
   }
 
   @Override
   protected DistinctResultsBlock getNextBlock() {
     DistinctExecutor executor = 
DistinctExecutorFactory.getDistinctExecutor(_projectOperator, _queryContext);
+    executor.setTimeSupplier(_timeSupplier);
+    executor.setMaxRowsToProcess(_maxRowsInDistinct);
+    
executor.setNumRowsWithoutChangeInDistinct(_numRowsWithoutChangeInDistinct);
     ValueBlock valueBlock;
+    // Precompute control flags to keep loop checks consistent and avoid 
repeated comparisons.
+    boolean enforceRowLimit = _maxRowsInDistinct != UNLIMITED_ROWS;
+    boolean enforceNoChangeLimit = _numRowsWithoutChangeInDistinct != 
UNLIMITED_ROWS;
+    boolean enforceTimeLimit = _maxExecutionTimeNs != Long.MAX_VALUE;
+    boolean trackRows = enforceRowLimit || enforceNoChangeLimit || 
enforceTimeLimit;
+    final long startTimeNs = _timeSupplier.getAsLong();
     while ((valueBlock = _projectOperator.nextBlock()) != null) {
-      _numDocsScanned += valueBlock.getNumDocs();
-      if (executor.process(valueBlock)) {
+      if (enforceTimeLimit) {
+        long elapsed = _timeSupplier.getAsLong() - startTimeNs;
+        long remaining = _maxExecutionTimeNs - elapsed;
+        executor.setRemainingTimeNanos(remaining);

Review Comment:
   Why setting it in a per block bases? We only need to set it once right?



##########
pinot-core/src/main/java/org/apache/pinot/core/query/distinct/BaseSingleColumnDistinctExecutor.java:
##########
@@ -32,54 +33,109 @@
 public abstract class BaseSingleColumnDistinctExecutor<T extends 
DistinctTable, S, M> implements DistinctExecutor {
   protected final ExpressionContext _expression;
   protected final T _distinctTable;
+  private final DistinctEarlyTerminationContext _earlyTerminationContext = new 
DistinctEarlyTerminationContext();
 
   public BaseSingleColumnDistinctExecutor(ExpressionContext expression, T 
distinctTable) {
     _expression = expression;
     _distinctTable = distinctTable;
   }
 
+  @Override
+  public void setMaxRowsToProcess(int maxRows) {
+    _earlyTerminationContext.setMaxRowsToProcess(maxRows);
+  }
+
+  @Override
+  public void setNumRowsWithoutChangeInDistinct(int 
numRowsWithoutChangeInDistinct) {
+    
_earlyTerminationContext.setNumRowsWithoutChangeInDistinct(numRowsWithoutChangeInDistinct);
+  }
+
+  @Override
+  public void setTimeSupplier(LongSupplier timeSupplier) {
+    _earlyTerminationContext.setTimeSupplier(timeSupplier);
+  }
+
+  @Override
+  public void setRemainingTimeNanos(long remainingTimeNanos) {
+    _earlyTerminationContext.setRemainingTimeNanos(remainingTimeNanos);
+  }
+
+  @Override
+  public boolean isNumRowsWithoutChangeLimitReached() {
+    return _earlyTerminationContext.isNumRowsWithoutChangeLimitReached();
+  }
+
+  @Override
+  public int getNumRowsProcessed() {
+    return _earlyTerminationContext.getNumRowsProcessed();
+  }
+
   @Override
   public boolean process(ValueBlock valueBlock) {
+    boolean trackProgress = _earlyTerminationContext.isTrackingEnabled();
+    if (trackProgress && shouldStopProcessing()) {
+      return true;
+    }
     BlockValSet blockValueSet = valueBlock.getBlockValueSet(_expression);
-    int numDocs = valueBlock.getNumDocs();
-    if (_distinctTable.isNullHandlingEnabled() && 
blockValueSet.isSingleValue()) {
-      RoaringBitmap nullBitmap = blockValueSet.getNullBitmap();
-      if (nullBitmap != null && !nullBitmap.isEmpty()) {
-        return processWithNull(blockValueSet, numDocs, nullBitmap);
-      } else {
-        return processWithoutNull(blockValueSet, numDocs);
+    int numDocs = trackProgress ? clampToRemaining(valueBlock.getNumDocs()) : 
valueBlock.getNumDocs();
+    if (numDocs <= 0) {
+      return true;
+    }
+    if (trackProgress) {
+      boolean trackDistinctChange = 
_earlyTerminationContext.isDistinctChangeTrackingEnabled();
+      boolean limitReached = processWithTracking(blockValueSet, numDocs, 
trackDistinctChange);
+      return limitReached || shouldStopProcessing();
+    }
+    return processWithoutTracking(blockValueSet, numDocs);
+  }
+
+  private boolean processWithTracking(BlockValSet blockValueSet, int numDocs, 
boolean trackDistinctChange) {
+    if (blockValueSet.isSingleValue()) {
+      RoaringBitmap nullBitmap = null;
+      if (_distinctTable.isNullHandlingEnabled()) {
+        nullBitmap = blockValueSet.getNullBitmap();
+      }
+      S values = getValuesSV(blockValueSet);
+      return processSVRange(values, 0, numDocs, nullBitmap, 
trackDistinctChange);
+    } else {
+      M values = getValuesMV(blockValueSet);
+      return processMVRange(values, 0, numDocs, trackDistinctChange);
+    }
+  }
+
+  private boolean processWithoutTracking(BlockValSet blockValueSet, int 
numDocs) {

Review Comment:
   Can we keep the method without tracking as is, and only add method 
`WithTracking`? Currently it contains refactor and new functionality, and I 
don't really see the value of the refactoring.



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java:
##########
@@ -97,6 +97,26 @@ public static Map<String, String> 
resolveCaseInsensitiveOptions(Map<String, Stri
     return resolved;
   }
 
+  public static Map<String, String> mergeQueryOptions(@Nullable Map<String, 
String> baseOptions,
+      @Nullable Map<String, String> overridingOptions) {
+    Map<String, String> merged = new HashMap<>();
+    if (baseOptions != null) {
+      merged.putAll(baseOptions);
+    }
+    if (overridingOptions != null && !overridingOptions.isEmpty()) {
+      merged.putAll(resolveCaseInsensitiveOptions(overridingOptions));
+    }
+    return merged;
+  }
+
+  public static void mergeQueryOptionsIfAbsent(@Nullable Map<String, String> 
targetOptions,
+      @Nullable Map<String, String> extraOptions) {
+    if (targetOptions == null || extraOptions == null || 
extraOptions.isEmpty()) {
+      return;
+    }
+    
resolveCaseInsensitiveOptions(extraOptions).forEach(targetOptions::putIfAbsent);
+  }
+

Review Comment:
   These changes seems no longer needed. Suggest reverting them



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to