This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 14059b375eb Apply min/max segment pruning to filtered selection ORDER 
BY col LIMIT n (#18692)
14059b375eb is described below

commit 14059b375eb2743ed7d9340364b7c5cc51e7d058
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Wed Jun 10 13:04:49 2026 +0200

    Apply min/max segment pruning to filtered selection ORDER BY col LIMIT n 
(#18692)
---
 .../query/pruner/SelectionQuerySegmentPruner.java  | 239 ++++++++++++++++-
 .../pruner/SelectionQuerySegmentPrunerTest.java    | 286 +++++++++++++++++++++
 .../BenchmarkSelectionOrderByFilterPruning.java    | 210 +++++++++++++++
 3 files changed, 725 insertions(+), 10 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPruner.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPruner.java
index 6f2e29a2240..993050d7898 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPruner.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPruner.java
@@ -23,11 +23,20 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FilterContext;
 import org.apache.pinot.common.request.context.OrderByExpressionContext;
+import org.apache.pinot.common.request.context.predicate.EqPredicate;
+import org.apache.pinot.common.request.context.predicate.NotEqPredicate;
+import org.apache.pinot.common.request.context.predicate.Predicate;
+import org.apache.pinot.common.request.context.predicate.RangePredicate;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.request.context.utils.QueryContextUtils;
 import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.datasource.DataSource;
 import org.apache.pinot.segment.spi.datasource.DataSourceMetadata;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.env.PinotConfiguration;
 
 
@@ -37,8 +46,14 @@ import org.apache.pinot.spi.env.PinotConfiguration;
  *   <li>For selection query with LIMIT 0, keep 1 segment to create the data 
schema</li>
  *   <li>For selection only query without filter, keep enough documents to 
fulfill the LIMIT requirement</li>
  *   <li>
- *     For selection order-by query without filer, if the first order-by 
expression is an identifier (column), prune
- *     segments based on the column min/max value and keep enough documents to 
fulfill the LIMIT and OFFSET requirement.
+ *     For selection order-by query, if the first order-by expression is an 
identifier (column), prune segments based on
+ *     the column min/max value and keep enough documents to fulfill the LIMIT 
and OFFSET requirement. This works both
+ *     without a filter and with a filter: with a filter, each segment 
contributes towards the LIMIT only the number of
+ *     rows that <em>provably</em> match the filter based on min/max metadata 
(its total docs if it fully matches, 0
+ *     otherwise). Using this lower bound on matching rows keeps the boundary 
safe, so segments are never pruned when
+ *     they might still hold a top-n matching row. The optimization is skipped 
when null handling is active for the
+ *     order-by/predicate columns because nulls are stored as a default value 
that pollutes the min/max metadata
+ *     (see #18685).
  *   </li>
  * </ul>
  */
@@ -51,10 +66,24 @@ public class SelectionQuerySegmentPruner implements 
SegmentPruner {
 
   @Override
   public boolean isApplicableTo(QueryContext query) {
-    // Only prune selection queries
-    // If LIMIT is not 0, only prune segments for selection queries without 
filter
-    return QueryContextUtils.isSelectionQuery(query)
-        && (query.getFilter() == null || query.getLimit() == 0);
+    if (!QueryContextUtils.isSelectionQuery(query)) {
+      return false;
+    }
+    // Without a filter (or for LIMIT 0, where we just keep one segment for 
the schema), pruning is always applicable.
+    if (query.getFilter() == null || query.getLimit() == 0) {
+      return true;
+    }
+    // With a filter, only the order-by-on-identifier path can prune safely 
(the selection-only path relies on exact
+    // doc counts, which a filter invalidates). Additionally, null handling 
must not be active for the order-by column,
+    // because nulls are stored as a default value that pollutes the column 
min/max metadata used for sorting and the
+    // boundary.
+    List<OrderByExpressionContext> orderByExpressions = 
query.getOrderByExpressions();
+    if (orderByExpressions == null) {
+      return false;
+    }
+    ExpressionContext firstOrderByExpression = 
orderByExpressions.get(0).getExpression();
+    return firstOrderByExpression.getType() == 
ExpressionContext.Type.IDENTIFIER
+        && !isNullHandlingActive(query, 
firstOrderByExpression.getIdentifier());
   }
 
   @Override
@@ -75,7 +104,9 @@ public class SelectionQuerySegmentPruner implements 
SegmentPruner {
     }
 
     if (query.getOrderByExpressions() == null) {
-      return pruneSelectionOnly(segments, query);
+      // Count-based selection-only pruning is only safe without a filter 
(total docs is an exact match count). With a
+      // filter present this path is not selected (see isApplicableTo); guard 
defensively in case it is reached.
+      return query.getFilter() == null ? pruneSelectionOnly(segments, query) : 
segments;
     } else {
       return pruneSelectionOrderBy(segments, query);
     }
@@ -100,7 +131,7 @@ public class SelectionQuerySegmentPruner implements 
SegmentPruner {
   }
 
   /**
-   * Helper method to prune segments for selection order-by queries without 
filter.
+   * Helper method to prune segments for selection order-by queries.
    * <p>When the first order-by expression is an identifier (column), we can 
prune segments based on the column min/max
    * value:
    * <ul>
@@ -108,6 +139,8 @@ public class SelectionQuerySegmentPruner implements 
SegmentPruner {
    *   <li>2. Pick the top segments until we get enough documents to fulfill 
the LIMIT and OFFSET requirement</li>
    *   <li>3. Keep the segments that has value overlap with the selected ones; 
remove the others</li>
    * </ul>
+   * <p>Each segment contributes towards the LIMIT only its {@link 
#guaranteedMatchingDocs} (a lower bound on its
+   * matching rows), so the optimization remains correct when a filter is 
present.
    */
   private List<IndexSegment> pruneSelectionOrderBy(List<IndexSegment> 
segments, QueryContext query) {
     List<OrderByExpressionContext> orderByExpressions = 
query.getOrderByExpressions();
@@ -157,7 +190,7 @@ public class SelectionQuerySegmentPruner implements 
SegmentPruner {
         IndexSegment segment = segments.get(minMaxValue._index);
         if (remainingDocs > 0) {
           selectedSegments.add(segment);
-          remainingDocs -= segment.getSegmentMetadata().getTotalDocs();
+          remainingDocs -= guaranteedMatchingDocs(segment, query);
           maxValue = minMaxValue._maxValue;
         } else {
           // After getting enough documents, prune all the segments with min 
value larger than the current max value, or
@@ -184,7 +217,7 @@ public class SelectionQuerySegmentPruner implements 
SegmentPruner {
         IndexSegment segment = segments.get(minMaxValue._index);
         if (remainingDocs > 0) {
           selectedSegments.add(segment);
-          remainingDocs -= segment.getSegmentMetadata().getTotalDocs();
+          remainingDocs -= guaranteedMatchingDocs(segment, query);
           minValue = minMaxValue._minValue;
         } else {
           // After getting enough documents, prune all the segments with max 
value smaller than the current min value,
@@ -201,6 +234,192 @@ public class SelectionQuerySegmentPruner implements 
SegmentPruner {
     return selectedSegments;
   }
 
+  /**
+   * Returns a lower bound on the number of rows in the segment that match the 
query filter, used to decide how many
+   * leading segments must be kept to guarantee the LIMIT + OFFSET requirement.
+   * <ul>
+   *   <li>Without a filter, every row matches, so this is the exact total doc 
count.</li>
+   *   <li>With a filter, this is the total doc count if the segment 
<em>provably</em> matches the filter for all of its
+   *   rows (based on min/max metadata), and 0 otherwise. Using 0 for segments 
that only partially (or not provably
+   *   fully) match is a safe under-count: such segments are still kept (they 
overlap the boundary), but they never let
+   *   the boundary advance past rows they might contain.</li>
+   * </ul>
+   */
+  private long guaranteedMatchingDocs(IndexSegment segment, QueryContext 
query) {
+    int totalDocs = segment.getSegmentMetadata().getTotalDocs();
+    FilterContext filter = query.getFilter();
+    if (filter == null) {
+      return totalDocs;
+    }
+    return fullyMatches(segment, filter, query) ? totalDocs : 0;
+  }
+
+  /**
+   * Returns {@code true} only if <em>all</em> rows of the segment provably 
satisfy the filter, based on min/max
+   * metadata. A {@code false} result never means "does not match"; it means 
"cannot prove that all rows match", which
+   * is always safe to treat as a 0 lower bound. NOT and unsupported 
predicates conservatively return {@code false}.
+   */
+  private boolean fullyMatches(IndexSegment segment, FilterContext filter, 
QueryContext query) {
+    switch (filter.getType()) {
+      case AND:
+        assert filter.getChildren() != null;
+        for (FilterContext child : filter.getChildren()) {
+          if (!fullyMatches(segment, child, query)) {
+            return false;
+          }
+        }
+        return true;
+      case OR:
+        assert filter.getChildren() != null;
+        for (FilterContext child : filter.getChildren()) {
+          if (fullyMatches(segment, child, query)) {
+            return true;
+          }
+        }
+        return false;
+      case CONSTANT:
+        return filter.isConstantTrue();
+      case PREDICATE:
+        return predicateFullyMatches(segment, filter.getPredicate(), query);
+      case NOT:
+      default:
+        return false;
+    }
+  }
+
+  /**
+   * Returns {@code true} only if all rows of the segment provably satisfy the 
predicate, based on the predicate
+   * column's min/max metadata. Only identifier predicates on non-nullable 
columns of types {@code RANGE} (e.g.
+   * {@code >, >=, <, <=}), {@code EQ} ({@code =}) and {@code NOT_EQ} ({@code 
<>}) are supported; everything else
+   * conservatively returns {@code false}.
+   */
+  private boolean predicateFullyMatches(IndexSegment segment, Predicate 
predicate, QueryContext query) {
+    ExpressionContext lhs = predicate.getLhs();
+    if (lhs.getType() != ExpressionContext.Type.IDENTIFIER) {
+      return false;
+    }
+    String column = lhs.getIdentifier();
+    // Nulls are stored as a default value that pollutes the column min/max 
metadata (and are excluded from comparisons
+    // under null handling), so full-match cannot be reasoned about when null 
handling is active for the column.
+    if (isNullHandlingActive(query, column)) {
+      return false;
+    }
+    DataSource dataSource = segment.getDataSource(column, query.getSchema());
+    DataSourceMetadata dataSourceMetadata = dataSource.getDataSourceMetadata();
+    Comparable minValue = dataSourceMetadata.getMinValue();
+    Comparable maxValue = dataSourceMetadata.getMaxValue();
+    if (minValue == null || maxValue == null) {
+      return false;
+    }
+    // NaN (FLOAT/DOUBLE) breaks the ordering assumptions: 
Float/Double.compareTo treats NaN as the largest value, which
+    // does not match filter semantics (comparisons against NaN are never 
true). A NaN min/max could make a segment look
+    // fully-matching when its NaN rows actually do not match, so refuse to 
reason about full match in that case.
+    if (isNaN(minValue) || isNaN(maxValue)) {
+      return false;
+    }
+    DataType dataType = dataSourceMetadata.getDataType();
+    try {
+      switch (predicate.getType()) {
+        case RANGE:
+          return rangeFullyMatches((RangePredicate) predicate, minValue, 
maxValue, dataType);
+        case EQ: {
+          Comparable value = convertValue(((EqPredicate) 
predicate).getValue(), dataType);
+          // All rows equal the value iff the whole segment collapses to that 
single value.
+          return minValue.compareTo(value) == 0 && maxValue.compareTo(value) 
== 0;
+        }
+        case NOT_EQ: {
+          Comparable value = convertValue(((NotEqPredicate) 
predicate).getValue(), dataType);
+          // All rows differ from the value iff the value lies outside [min, 
max].
+          return value.compareTo(minValue) < 0 || value.compareTo(maxValue) > 
0;
+        }
+        default:
+          return false;
+      }
+    } catch (Exception e) {
+      // Different data types / unparseable literal: cannot prove full match.
+      return false;
+    }
+  }
+
+  /**
+   * Returns {@code true} if the segment's whole {@code [minValue, maxValue]} 
range provably satisfies the range
+   * predicate (i.e. it is fully contained within the predicate's bounds).
+   */
+  private boolean rangeFullyMatches(RangePredicate predicate, Comparable 
minValue, Comparable maxValue,
+      DataType dataType) {
+    String lowerBound = predicate.getLowerBound();
+    if (!lowerBound.equals(RangePredicate.UNBOUNDED)) {
+      Comparable lowerBoundValue = convertValue(lowerBound, dataType);
+      if (predicate.isLowerInclusive()) {
+        if (minValue.compareTo(lowerBoundValue) < 0) {
+          return false;
+        }
+      } else {
+        if (minValue.compareTo(lowerBoundValue) <= 0) {
+          return false;
+        }
+      }
+    }
+    String upperBound = predicate.getUpperBound();
+    if (!upperBound.equals(RangePredicate.UNBOUNDED)) {
+      Comparable upperBoundValue = convertValue(upperBound, dataType);
+      if (predicate.isUpperInclusive()) {
+        if (maxValue.compareTo(upperBoundValue) > 0) {
+          return false;
+        }
+      } else {
+        if (maxValue.compareTo(upperBoundValue) >= 0) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Returns whether null handling is active for the column, in which case 
this optimization must be skipped. Pinot
+   * stores nulls as a default value that pollutes the column min/max metadata 
and the total doc count; only when null
+   * handling is enabled are those null rows excluded from comparisons, which 
is when the pollution becomes unsafe to
+   * reason about (with null handling disabled, nulls are simply the default 
value and the min/max stay accurate). This
+   * mirrors the null caution in {@code SelectionPlanNode#isSorted}.
+   * <p>A column carries null semantics only when null handling is enabled for 
the query <b>and</b> the column is
+   * nullable. Nullability follows the same resolution used at segment build 
time (e.g.
+   * {@code BaseSegmentCreator#isNullable}): under column-based null handling 
the per-column
+   * {@link FieldSpec#isNullable} flag, otherwise (table/query-level null 
handling) all columns are nullable. The check
+   * is conservative: an unknown schema or column is treated as 
null-handling-active.
+   */
+  private static boolean isNullHandlingActive(QueryContext query, String 
column) {
+    if (!query.isNullHandlingEnabled()) {
+      // Null semantics are off: nulls are just the default value, so min/max 
and doc counts stay accurate.
+      return false;
+    }
+    Schema schema = query.getSchema();
+    if (schema == null) {
+      return true;
+    }
+    if (schema.isEnableColumnBasedNullHandling()) {
+      // Column-based null handling: only nullable columns carry nulls.
+      FieldSpec fieldSpec = schema.getFieldSpecFor(column);
+      return fieldSpec == null || fieldSpec.isNullable();
+    }
+    // Table/query-level (legacy) null handling applies null semantics to all 
columns.
+    return true;
+  }
+
+  private static boolean isNaN(Comparable value) {
+    return (value instanceof Double && ((Double) value).isNaN())
+        || (value instanceof Float && ((Float) value).isNaN());
+  }
+
+  /**
+   * Converts a predicate literal to the column's stored type. Any parse 
failure propagates to the caller, which treats
+   * it as "cannot prove full match" (this pruner must stay conservative; an 
actually invalid query is rejected by the
+   * preceding {@link ColumnValueSegmentPruner} or by query execution).
+   */
+  private static Comparable convertValue(String stringValue, DataType 
dataType) {
+    return dataType.convertInternal(stringValue);
+  }
+
   private static class MinMaxValue {
     final int _index;
     final Comparable _minValue;
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPrunerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPrunerTest.java
index 2f65ef11af8..fc1052325db 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPrunerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPrunerTest.java
@@ -29,6 +29,7 @@ import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.segment.spi.datasource.DataSource;
 import org.apache.pinot.segment.spi.datasource.DataSourceMetadata;
 import 
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.testng.annotations.Test;
 
@@ -37,6 +38,7 @@ import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertSame;
 import static org.testng.Assert.assertTrue;
 
@@ -44,6 +46,15 @@ import static org.testng.Assert.assertTrue;
 public class SelectionQuerySegmentPrunerTest {
   public static final String ORDER_BY_COLUMN = "testColumn";
 
+  public static final String FILTER_COLUMN = "foo";
+
+  // Schema with a LONG order-by column and a STRING column, used by the 
filter-aware tests. Null handling is off, so
+  // these columns are not treated as null-handling-active even though they 
are nullable by default.
+  private static final Schema SCHEMA = new Schema.SchemaBuilder()
+      .addSingleValueDimension(ORDER_BY_COLUMN, FieldSpec.DataType.LONG)
+      .addSingleValueDimension(FILTER_COLUMN, FieldSpec.DataType.STRING)
+      .build();
+
   private final SelectionQuerySegmentPruner _segmentPruner = new 
SelectionQuerySegmentPruner();
 
   @Test
@@ -211,12 +222,279 @@ public class SelectionQuerySegmentPrunerTest {
     assertEquals(result.size(), 3);
   }
 
+  /**
+   * Range-partitioned, non-overlapping segments on {@code testColumn}: [0,9], 
[10,19], [20,29], [30,39], [40,49].
+   */
+  private List<IndexSegment> rangePartitionedSegments() {
+    return Arrays.asList(
+        getIndexSegment(0L, 9L, 10),     // 0
+        getIndexSegment(10L, 19L, 10),   // 1
+        getIndexSegment(20L, 29L, 10),   // 2
+        getIndexSegment(30L, 39L, 10),   // 3
+        getIndexSegment(40L, 49L, 10));  // 4
+  }
+
+  private List<IndexSegment> prune(List<IndexSegment> segments, String query) {
+    QueryContext queryContext = 
QueryContextConverterUtils.getQueryContext(query);
+    queryContext.setSchema(SCHEMA);
+    return _segmentPruner.prune(segments, queryContext);
+  }
+
+  @Test
+  public void testSelectionOrderByWithFilterDesc() {
+    List<IndexSegment> segments = rangePartitionedSegments();
+    // DESC, want largest 5 with col > 25: all live in [40,49]. Only the top 
fully-matching segment is kept.
+    List<IndexSegment> result =
+        prune(segments, "SELECT * FROM testTable WHERE testColumn > 25 ORDER 
BY testColumn DESC LIMIT 5");
+    assertEquals(result, List.of(segments.get(4)));
+
+    // >= behaves the same here (min 40 >= 30).
+    result = prune(segments, "SELECT * FROM testTable WHERE testColumn >= 30 
ORDER BY testColumn DESC LIMIT 5");
+    assertEquals(result, List.of(segments.get(4)));
+  }
+
+  @Test
+  public void testSelectionOrderByWithFilterAsc() {
+    List<IndexSegment> segments = rangePartitionedSegments();
+    // ASC, want smallest 15 with col < 25: live in [0,9] (0-9) and [10,19] 
(10-14). Higher segments are pruned.
+    List<IndexSegment> result =
+        prune(segments, "SELECT * FROM testTable WHERE testColumn < 25 ORDER 
BY testColumn LIMIT 15");
+    assertEquals(result, List.of(segments.get(0), segments.get(1)));
+
+    // <= behaves the same here (max 19 <= 24).
+    result = prune(segments, "SELECT * FROM testTable WHERE testColumn <= 24 
ORDER BY testColumn LIMIT 15");
+    assertEquals(result, List.of(segments.get(0), segments.get(1)));
+  }
+
+  @Test
+  public void testFilterStraddlingSegmentIsKept() {
+    // Correctness counterexample for the lower-bound rule: DESC, col < 35, 
LIMIT 5. The answer (34..30) lives in the
+    // straddling segment [30,39], which is counted as 0 matching docs. A 
naive getTotalDocs() accumulation would let
+    // the boundary advance past it and prune it; the lower-bound rule must 
keep it.
+    List<IndexSegment> segments = rangePartitionedSegments();
+    List<IndexSegment> result =
+        prune(segments, "SELECT * FROM testTable WHERE testColumn < 35 ORDER 
BY testColumn DESC LIMIT 5");
+    assertTrue(result.contains(segments.get(3)), "straddling segment [30,39] 
holding the top-n must be kept");
+  }
+
+  @Test
+  public void testFilterEqAndNotEq() {
+    // EQ fully matches only a single-valued segment; here add one collapsed 
to 25.
+    List<IndexSegment> segments = Arrays.asList(
+        getIndexSegment(0L, 9L, 10),     // 0
+        getIndexSegment(25L, 25L, 10),   // 1 (single value 25)
+        getIndexSegment(40L, 49L, 10));  // 2
+    // col = 25 ORDER BY col DESC LIMIT 5 -> only [25,25] fully matches; 
[40,49] (max 49 >= 25 boundary) pruned.
+    List<IndexSegment> result =
+        prune(segments, "SELECT * FROM testTable WHERE testColumn = 25 ORDER 
BY testColumn DESC LIMIT 5");
+    assertTrue(result.contains(segments.get(1)));
+
+    // col <> 25 DESC LIMIT 5 -> largest != 25 live in [40,49]; that segment 
fully matches (25 outside [40,49]).
+    result = prune(segments, "SELECT * FROM testTable WHERE testColumn <> 25 
ORDER BY testColumn DESC LIMIT 5");
+    assertEquals(result, List.of(segments.get(2)));
+  }
+
+  @Test
+  public void testAndConjunctOnNonProvableColumnDisablesPruning() {
+    // col > 25 is provably-full on the high segments, but the ANDed predicate 
on an unanalyzable column ('foo' is not
+    // in the schema) cannot be proven full for any segment -> 0 lower bound 
everywhere -> nothing is pruned (safe).
+    List<IndexSegment> segments = rangePartitionedSegments();
+    List<IndexSegment> result = prune(segments,
+        "SELECT * FROM testTable WHERE testColumn > 25 AND foo = 'x' ORDER BY 
testColumn DESC LIMIT 5");
+    assertEquals(result.size(), segments.size());
+  }
+
+  @Test
+  public void testOrFilterFullMatch() {
+    // LIMIT exceeds the total doc count (5 segments * 10 docs), so 
guaranteedMatchingDocs() -> fullyMatches() is
+    // evaluated for every segment: the OR is fully matched when ANY child is 
(seg [30,39], [40,49] via testColumn > 25)
+    // and not matched otherwise. No segment can be pruned (the limit needs 
them all), so the result is unchanged.
+    List<IndexSegment> segments = rangePartitionedSegments();
+    List<IndexSegment> result = prune(segments,
+        "SELECT * FROM testTable WHERE testColumn > 25 OR testColumn > 1000 
ORDER BY testColumn LIMIT 100");
+    assertEquals(result.size(), segments.size());
+  }
+
+  @Test
+  public void testFilterNotProvablyFullCases() {
+    // Each of these filters cannot be proven full-match from min/max 
metadata, so every segment contributes 0 and
+    // (with a LIMIT larger than the data) nothing is pruned. They exercise 
the conservative fall-through branches.
+    List<IndexSegment> segments = rangePartitionedSegments();
+    // Predicate on a non-identifier (function) expression.
+    assertEquals(prune(segments,
+        "SELECT * FROM testTable WHERE testColumn + 1 > 5 ORDER BY testColumn 
LIMIT 100").size(), segments.size());
+    // Unsupported predicate type (IN) for the full-match analysis.
+    assertEquals(prune(segments,
+        "SELECT * FROM testTable WHERE testColumn IN (1, 2, 3) ORDER BY 
testColumn LIMIT 100").size(), segments.size());
+    // NOT filter is never treated as full-match.
+    assertEquals(prune(segments,
+        "SELECT * FROM testTable WHERE NOT (testColumn IN (1, 2, 3)) ORDER BY 
testColumn LIMIT 100").size(),
+        segments.size());
+    // Bound larger than any value: no segment is fully contained, so nothing 
is pruned.
+    assertEquals(prune(segments,
+        "SELECT * FROM testTable WHERE testColumn > 99999999999999999999999 
ORDER BY testColumn LIMIT 100").size(),
+        segments.size());
+  }
+
+  @Test
+  public void testAndAllChildrenFullMatch() {
+    // AND where every child is provably full on the top segment -> AND 
returns true and the set is limit-pruned.
+    List<IndexSegment> segments = rangePartitionedSegments();
+    List<IndexSegment> result = prune(segments,
+        "SELECT * FROM testTable WHERE testColumn > 25 AND testColumn < 1000 
ORDER BY testColumn DESC LIMIT 5");
+    assertEquals(result, List.of(segments.get(4)));
+  }
+
+  @Test
+  public void testRangeInclusiveBoundsNotFull() {
+    // Inclusive bounds where some processed segments are not fully contained, 
exercising the inclusive "not full"
+    // branches. LIMIT exceeds the data so nothing is pruned.
+    List<IndexSegment> segments = rangePartitionedSegments();
+    assertEquals(prune(segments,
+        "SELECT * FROM testTable WHERE testColumn >= 25 ORDER BY testColumn 
LIMIT 100").size(), segments.size());
+    assertEquals(prune(segments,
+        "SELECT * FROM testTable WHERE testColumn <= 25 ORDER BY testColumn 
LIMIT 100").size(), segments.size());
+  }
+
+  @Test
+  public void testPruneCalledWithNullHandlingActive() {
+    // prune() invoked directly (bypassing isApplicableTo) with null handling 
on: the predicate column is
+    // null-handling-active, so it is never provably full and nothing is 
pruned.
+    List<IndexSegment> segments = rangePartitionedSegments();
+    List<IndexSegment> result = prune(segments, "SET enableNullHandling=true; "
+        + "SELECT * FROM testTable WHERE testColumn > 25 ORDER BY testColumn 
DESC LIMIT 100");
+    assertEquals(result.size(), segments.size());
+  }
+
+  @Test
+  public void testFilterPredicateColumnWithoutMinMax() {
+    // The filter column has no min/max metadata, so it cannot be proven 
full-match -> 0 contribution, nothing pruned.
+    List<IndexSegment> segments = Arrays.asList(
+        getIndexSegment(0L, 9L, 10, false, null, null),
+        getIndexSegment(10L, 19L, 10, false, null, null));
+    List<IndexSegment> result =
+        prune(segments, "SELECT * FROM testTable WHERE foo >= 'a' ORDER BY 
testColumn DESC LIMIT 5");
+    assertEquals(result.size(), segments.size());
+  }
+
+  @Test
+  public void testFilteredSelectionOnlyNotPruned() {
+    // Selection-only (no ORDER BY) with a filter: count-based pruning is 
unsafe, so prune() keeps every segment.
+    List<IndexSegment> segments = rangePartitionedSegments();
+    List<IndexSegment> result = prune(segments, "SELECT * FROM testTable WHERE 
testColumn > 25 LIMIT 5");
+    assertEquals(result.size(), segments.size());
+  }
+
+  @Test
+  public void testFilterWithOffset() {
+    List<IndexSegment> segments = rangePartitionedSegments();
+    // DESC, col > 25, LIMIT 5 OFFSET 30. remainingDocs = 35; the two 
fully-matching segments [30,39] and [40,49]
+    // provide only 20 matching docs, so the boundary never advances enough to 
prune them; both are kept.
+    List<IndexSegment> result =
+        prune(segments, "SELECT * FROM testTable WHERE testColumn > 25 ORDER 
BY testColumn DESC LIMIT 5, 30");
+    assertEquals(result.size(), segments.size());
+  }
+
+  @Test
+  public void testFloatNaNNotTreatedAsFullMatch() {
+    // Regression: an all-NaN DOUBLE segment must not be counted as fully 
matching 'col > 5'. NaN sorts as the largest
+    // value, so without the NaN guard the all-NaN segment would (a) be 
counted as 10 matching docs and (b) sort first
+    // (DESC), advancing the boundary and wrongly pruning the segment [10, 20] 
that actually holds the top-n.
+    Schema doubleSchema =
+        new Schema.SchemaBuilder().addSingleValueDimension(ORDER_BY_COLUMN, 
FieldSpec.DataType.DOUBLE).build();
+    IndexSegment nanSegment = getDoubleSegment(Double.NaN, Double.NaN, 10);
+    IndexSegment realSegment = getDoubleSegment(10.0, 20.0, 10);
+    QueryContext queryContext = QueryContextConverterUtils.getQueryContext(
+        "SELECT * FROM testTable WHERE testColumn > 5 ORDER BY testColumn DESC 
LIMIT 5");
+    queryContext.setSchema(doubleSchema);
+    List<IndexSegment> result = _segmentPruner.prune(Arrays.asList(nanSegment, 
realSegment), queryContext);
+    assertTrue(result.contains(realSegment), "segment [10, 20] holding the 
top-n must not be pruned");
+  }
+
+  private IndexSegment getDoubleSegment(Double minValue, Double maxValue, int 
totalDocs) {
+    IndexSegment indexSegment = mock(IndexSegment.class);
+    DataSource dataSource = mock(DataSource.class);
+    when(indexSegment.getDataSource(eq(ORDER_BY_COLUMN), 
any(Schema.class))).thenReturn(dataSource);
+    DataSourceMetadata dataSourceMetadata = mock(DataSourceMetadata.class);
+    when(dataSource.getDataSourceMetadata()).thenReturn(dataSourceMetadata);
+    when(dataSourceMetadata.getMinValue()).thenReturn(minValue);
+    when(dataSourceMetadata.getMaxValue()).thenReturn(maxValue);
+    
when(dataSourceMetadata.getDataType()).thenReturn(FieldSpec.DataType.DOUBLE);
+    SegmentMetadata segmentMetadata = mock(SegmentMetadata.class);
+    when(indexSegment.getSegmentMetadata()).thenReturn(segmentMetadata);
+    when(segmentMetadata.getTotalDocs()).thenReturn(totalDocs);
+    return indexSegment;
+  }
+
+  @Test
+  public void testIsApplicableTo() {
+    // No filter: applicable (existing behavior), with or without order by.
+    assertTrue(_segmentPruner.isApplicableTo(queryWithSchema("SELECT * FROM 
testTable LIMIT 5")));
+    assertTrue(_segmentPruner.isApplicableTo(queryWithSchema("SELECT * FROM 
testTable ORDER BY testColumn LIMIT 5")));
+    // LIMIT 0 with a filter: applicable (just keeps one segment for the 
schema).
+    assertTrue(_segmentPruner.isApplicableTo(
+        queryWithSchema("SELECT * FROM testTable WHERE testColumn > 5 ORDER BY 
testColumn LIMIT 0")));
+    // Filtered order-by on a non-nullable identifier: applicable.
+    assertTrue(_segmentPruner.isApplicableTo(
+        queryWithSchema("SELECT * FROM testTable WHERE testColumn > 5 ORDER BY 
testColumn DESC LIMIT 5")));
+    // Filtered selection-only (no order by): not applicable (count-based 
pruning unsafe with a filter).
+    assertFalse(_segmentPruner.isApplicableTo(queryWithSchema("SELECT * FROM 
testTable WHERE testColumn > 5 LIMIT 5")));
+    // Filtered order-by on a non-identifier: not applicable.
+    assertFalse(_segmentPruner.isApplicableTo(
+        queryWithSchema("SELECT * FROM testTable WHERE testColumn > 5 ORDER BY 
testColumn + 1 DESC LIMIT 5")));
+    // Filtered order-by but null handling enabled -> column treated as 
nullable -> not applicable.
+    assertFalse(_segmentPruner.isApplicableTo(queryWithSchema(
+        "SET enableNullHandling=true; SELECT * FROM testTable WHERE testColumn 
> 5 ORDER BY testColumn DESC LIMIT 5")));
+    // Null handling on but no schema available -> conservatively treated as 
null-handling-active -> not applicable.
+    
assertFalse(_segmentPruner.isApplicableTo(QueryContextConverterUtils.getQueryContext(
+        "SET enableNullHandling=true; SELECT * FROM testTable WHERE testColumn 
> 5 ORDER BY testColumn DESC LIMIT 5")));
+  }
+
+  @Test
+  public void testIsApplicableToColumnBasedNullHandling() {
+    String query = "SET enableNullHandling=true; "
+        + "SELECT * FROM testTable WHERE testColumn > 5 ORDER BY testColumn 
DESC LIMIT 5";
+
+    // Column-based null handling + non-nullable column: nulls cannot occur, 
so the optimization still applies even
+    // though null handling is enabled.
+    QueryContext queryContext = 
QueryContextConverterUtils.getQueryContext(query);
+    queryContext.setSchema(columnBasedNullHandlingSchema(false));
+    assertTrue(_segmentPruner.isApplicableTo(queryContext));
+
+    // Column-based null handling + nullable column: skip (min/max may be 
polluted by nulls).
+    queryContext = QueryContextConverterUtils.getQueryContext(query);
+    queryContext.setSchema(columnBasedNullHandlingSchema(true));
+    assertFalse(_segmentPruner.isApplicableTo(queryContext));
+  }
+
+  private static Schema columnBasedNullHandlingSchema(boolean 
orderByColumnNullable) {
+    Schema schema = new Schema.SchemaBuilder()
+        .addSingleValueDimension(ORDER_BY_COLUMN, FieldSpec.DataType.LONG)
+        .setEnableColumnBasedNullHandling(true)
+        .build();
+    schema.getFieldSpecFor(ORDER_BY_COLUMN).setNullable(orderByColumnNullable);
+    return schema;
+  }
+
+  private QueryContext queryWithSchema(String query) {
+    QueryContext queryContext = 
QueryContextConverterUtils.getQueryContext(query);
+    queryContext.setSchema(SCHEMA);
+    return queryContext;
+  }
+
   private IndexSegment getIndexSegment(@Nullable Long minValue, @Nullable Long 
maxValue, int totalDocs) {
     return getIndexSegment(minValue, maxValue, totalDocs, false);
   }
 
   private IndexSegment getIndexSegment(@Nullable Long minValue, @Nullable Long 
maxValue, int totalDocs,
       boolean upsert) {
+    // A STRING filter column whose [min, max] = ["a", "z"] never collapses to 
a single value, so a predicate like
+    // 'foo = x' is never provably full-match -> exercises the "AND with a 
non-provable conjunct" path.
+    return getIndexSegment(minValue, maxValue, totalDocs, upsert, "a", "z");
+  }
+
+  private IndexSegment getIndexSegment(@Nullable Long minValue, @Nullable Long 
maxValue, int totalDocs,
+      boolean upsert, @Nullable String filterMinValue, @Nullable String 
filterMaxValue) {
     IndexSegment indexSegment = mock(IndexSegment.class);
     when(indexSegment.getColumnNames()).thenReturn(ImmutableSet.of("foo", 
"testColumn"));
     DataSource dataSource = mock(DataSource.class);
@@ -225,6 +503,14 @@ public class SelectionQuerySegmentPrunerTest {
     when(dataSource.getDataSourceMetadata()).thenReturn(dataSourceMetadata);
     when(dataSourceMetadata.getMinValue()).thenReturn(minValue);
     when(dataSourceMetadata.getMaxValue()).thenReturn(maxValue);
+    when(dataSourceMetadata.getDataType()).thenReturn(FieldSpec.DataType.LONG);
+    DataSource filterDataSource = mock(DataSource.class);
+    when(indexSegment.getDataSource(eq(FILTER_COLUMN), 
any(Schema.class))).thenReturn(filterDataSource);
+    DataSourceMetadata filterMetadata = mock(DataSourceMetadata.class);
+    when(filterDataSource.getDataSourceMetadata()).thenReturn(filterMetadata);
+    when(filterMetadata.getMinValue()).thenReturn(filterMinValue);
+    when(filterMetadata.getMaxValue()).thenReturn(filterMaxValue);
+    when(filterMetadata.getDataType()).thenReturn(FieldSpec.DataType.STRING);
     SegmentMetadata segmentMetadata = mock(SegmentMetadata.class);
     when(indexSegment.getSegmentMetadata()).thenReturn(segmentMetadata);
     when(segmentMetadata.getTotalDocs()).thenReturn(totalDocs);
diff --git 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkSelectionOrderByFilterPruning.java
 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkSelectionOrderByFilterPruning.java
new file mode 100644
index 00000000000..ca35f055433
--- /dev/null
+++ 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkSelectionOrderByFilterPruning.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.perf;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.core.query.config.SegmentPrunerConfig;
+import org.apache.pinot.core.query.pruner.SegmentPrunerService;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import 
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import org.apache.pinot.queries.BaseQueriesTest;
+import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.ChainedOptionsBuilder;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+
+/**
+ * End-to-end benchmark for the min/max segment pruning of selection {@code 
ORDER BY <col> LIMIT n} queries with a
+ * filter (issue apache/pinot#18685).
+ * <p>The table is range-partitioned on a sorted, nullable {@code TS_COL}: 
segment {@code i} covers the contiguous range
+ * {@code [i * numRows, (i + 1) * numRows)}. Each invocation runs the segment 
pruners (the default chain, which respects
+ * {@link org.apache.pinot.core.query.pruner.SegmentPruner#isApplicableTo}) 
and then executes the surviving segments via
+ * the single-stage engine.
+ * <p>The comparison is self-contained (no need to diff against a base 
commit): the optimization is gated off when null
+ * handling is active, so running the same filtered query <b>with</b> {@code 
enableNullHandling=true} reproduces the
+ * pre-fix behavior (the filtered order-by is not limit-pruned and every 
matching segment is engaged), while running it
+ * <b>without</b> null handling exercises the fix (only the top segment(s) 
survive). The {@code NO_FILTER} query is a
+ * reference point that is limit-pruned regardless.
+ */
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@Fork(1)
+@Warmup(iterations = 5, time = 1)
+@Measurement(iterations = 5, time = 1)
+@State(Scope.Benchmark)
+public class BenchmarkSelectionOrderByFilterPruning extends BaseQueriesTest {
+  private static final File INDEX_DIR =
+      new File(FileUtils.getTempDirectory(), 
"BenchmarkSelectionOrderByFilterPruning");
+  private static final String TABLE_NAME = "MyTable";
+  private static final String SEGMENT_NAME_TEMPLATE = "testSegment%d";
+  private static final String TS_COL = "TS_COL";
+  private static final String VAL_COL = "VAL_COL";
+
+  private static final TableConfig TABLE_CONFIG = new 
TableConfigBuilder(TableType.OFFLINE)
+      .setTableName(TABLE_NAME)
+      .setSortedColumn(TS_COL)
+      .build();
+  private static final Schema SCHEMA = new Schema.SchemaBuilder()
+      .setSchemaName(TABLE_NAME)
+      .addSingleValueDimension(TS_COL, FieldSpec.DataType.LONG)
+      .addSingleValueDimension(VAL_COL, FieldSpec.DataType.INT)
+      .build();
+
+  /** Which query shape to run; see {@link #buildQuery}. */
+  public enum Mode {
+    /** Filtered order-by, optimization eligible (limit-pruned to the segments 
the LIMIT needs). */
+    FILTER,
+    /** Same filtered query but with null handling on, which gates the 
optimization off (pre-fix behavior). */
+    FILTER_NULL_HANDLING,
+    /** No filter: limit-pruned regardless of the fix (reference point). */
+    NO_FILTER
+  }
+
+  @Param({"50", "100", "200"})
+  private int _numSegments;
+  // Small segments so a larger LIMIT spans several of them.
+  @Param({"20000"})
+  private int _numRows;
+  @Param({"10", "200000"})
+  private int _limit;
+  @Param({"FILTER", "FILTER_NULL_HANDLING", "NO_FILTER"})
+  private Mode _mode;
+
+  private String _query;
+  private List<IndexSegment> _allSegments;
+  private List<IndexSegment> _selectedSegments;
+  private SegmentPrunerService _segmentPrunerService;
+
+  private static String buildQuery(Mode mode, int limit) {
+    String query = "SELECT * FROM MyTable "
+        + (mode == Mode.NO_FILTER ? "" : "WHERE TS_COL > 0 ")
+        + "ORDER BY TS_COL DESC LIMIT " + limit;
+    return mode == Mode.FILTER_NULL_HANDLING ? "SET enableNullHandling=true; " 
+ query : query;
+  }
+
+  public static void main(String[] args)
+      throws Exception {
+    ChainedOptionsBuilder opt =
+        new 
OptionsBuilder().include(BenchmarkSelectionOrderByFilterPruning.class.getSimpleName());
+    new Runner(opt.build()).run();
+  }
+
+  @Setup
+  public void setUp()
+      throws Exception {
+    _query = buildQuery(_mode, _limit);
+    FileUtils.deleteQuietly(INDEX_DIR);
+    IndexLoadingConfig indexLoadingConfig = new 
IndexLoadingConfig(TABLE_CONFIG, SCHEMA);
+    _allSegments = new ArrayList<>(_numSegments);
+    for (int i = 0; i < _numSegments; i++) {
+      String name = String.format(SEGMENT_NAME_TEMPLATE, i);
+      buildSegment(name, i);
+      _allSegments.add(ImmutableSegmentLoader.load(new File(INDEX_DIR, name), 
indexLoadingConfig));
+    }
+    _selectedSegments = _allSegments;
+    _segmentPrunerService = new SegmentPrunerService(new 
SegmentPrunerConfig(new PinotConfiguration()));
+  }
+
+  @TearDown
+  public void tearDown() {
+    for (IndexSegment indexSegment : _allSegments) {
+      indexSegment.destroy();
+    }
+    FileUtils.deleteQuietly(INDEX_DIR);
+    EXECUTOR_SERVICE.shutdownNow();
+  }
+
+  private void buildSegment(String segmentName, int segmentIndex)
+      throws Exception {
+    long baseValue = (long) segmentIndex * _numRows;
+    List<GenericRow> rows = new ArrayList<>(_numRows);
+    for (int i = 0; i < _numRows; i++) {
+      GenericRow row = new GenericRow();
+      // Contiguous, non-overlapping, sorted range per segment.
+      row.putValue(TS_COL, baseValue + i);
+      row.putValue(VAL_COL, ThreadLocalRandom.current().nextInt());
+      rows.add(row);
+    }
+    SegmentGeneratorConfig config = new SegmentGeneratorConfig(TABLE_CONFIG, 
SCHEMA);
+    config.setOutDir(INDEX_DIR.getPath());
+    config.setTableName(TABLE_NAME);
+    config.setSegmentName(segmentName);
+    SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
+    try (RecordReader recordReader = new GenericRowRecordReader(rows)) {
+      driver.init(config, recordReader);
+      driver.build();
+    }
+  }
+
+  @Benchmark
+  public BrokerResponseNative query() {
+    // Prune first (this is what the fix affects), then execute the surviving 
segments through the single-stage engine.
+    QueryContext queryContext = 
QueryContextConverterUtils.getQueryContext(_query);
+    queryContext.setSchema(SCHEMA);
+    _selectedSegments = _segmentPrunerService.prune(_allSegments, 
queryContext);
+    return getBrokerResponse(_query);
+  }
+
+  @Override
+  protected String getFilter() {
+    return "";
+  }
+
+  @Override
+  protected IndexSegment getIndexSegment() {
+    return _allSegments.get(0);
+  }
+
+  @Override
+  protected List<IndexSegment> getIndexSegments() {
+    return _selectedSegments;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to