FrankChen021 commented on code in PR #19460:
URL: https://github.com/apache/druid/pull/19460#discussion_r3241280810


##########
processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorFactory.java:
##########
@@ -110,9 +143,328 @@ public List<AggregatorFactory> 
getAggregatorsForPreAggregated()
     };
   }
 
+  private CursorHolder makeClusteredCursorHolder(CursorBuildSpec spec, 
ClusteredValueGroupsBaseTableSchema clusterSummary)
+  {
+    final List<TableClusterGroupSpec> matching = 
Projections.pruneClusterGroups(
+        new ArrayList<>(index.getClusterGroupSchemas()),
+        spec.getFilter(),
+        spec.getVirtualColumns()
+    );
+
+    if (matching.isEmpty()) {
+      return EmptyClusteredCursorHolder.INSTANCE;
+    }
+
+    if (matching.size() == 1) {
+      return makeSingleGroupClusteredCursorHolder(spec, matching.get(0));
+    }
+    return makeMultiGroupClusteredCursorHolder(spec, matching);
+  }
+
+  private CursorHolder makeSingleGroupClusteredCursorHolder(
+      CursorBuildSpec spec,
+      TableClusterGroupSpec valueGroup
+  )
+  {
+    final QueryableIndex groupIndex = 
index.getClusterGroupQueryableIndex(valueGroup);
+    if (groupIndex == null) {
+      throw DruidException.defensive(
+          "No cluster-group sub-index resolvable for clustering values "
+          + Arrays.toString(valueGroup.lookupClusteringValues())
+      );
+    }
+
+    return new QueryableIndexCursorHolder(

Review Comment:
   [P1] Cluster-column filters are reapplied against sub-indexes that do not 
contain them
   
   After pruneClusterGroups keeps a matching group, the original 
CursorBuildSpec is passed unchanged into a QueryableIndexCursorHolder for the 
group sub-index. That sub-index intentionally does not contain the clustering 
columns, so EqualityFilter.getBitmapColumnIndex sees a missing column and 
builds an exact all-false/all-unknown bitmap before the 
ClusteringColumnSelectorFactory wrapper can provide the constant value to a 
matcher. Queries such as tenant = 'acme' therefore prune to the acme group and 
then scan zero rows. The multi-group supplier path repeats the same pattern, so 
clustering predicates need to be stripped/residualized after pruning or the 
group holder needs a cluster-aware ColumnIndexSelector/filter-bundle path.



##########
processing/src/main/java/org/apache/druid/segment/projections/Projections.java:
##########
@@ -530,6 +548,241 @@ public static String 
getProjectionSegmentInternalFilePrefix(ProjectionSchema pro
     return projectionSchema.getName() + "/";
   }
 
+  /**
+   * Check whether {@code type} is an allowed cluster group clustering-column 
type. Clustering is restricted to the
+   * primitive scalar types: {@link ValueType#STRING}, {@link ValueType#LONG}, 
{@link ValueType#DOUBLE},
+   * {@link ValueType#FLOAT}. Complex and array types are rejected.
+   */
+  public static boolean isAllowedClusteringType(@Nullable ColumnType type)
+  {
+    return type != null && type.anyOf(ValueType.STRING, ValueType.LONG, 
ValueType.DOUBLE, ValueType.FLOAT);
+  }
+
+  /**
+   * Segment internal file prefix + column for a cluster group's per-group 
column data:
+   * {@code __base$<id0>_<id1>...<idK>/<column>}
+   */
+  public static String getClusterGroupSegmentInternalFileName(List<Integer> 
clusteringValueIds, String column)
+  {
+    return getClusterGroupSegmentInternalFilePrefix(clusteringValueIds) + 
column;
+  }
+
+  public static String getClusterGroupSegmentInternalFilePrefix(List<Integer> 
clusteringValueIds)
+  {
+    if (clusteringValueIds == null || clusteringValueIds.isEmpty()) {
+      throw DruidException.defensive("clusteringValueIds must not be null or 
empty");
+    }
+    final StringBuilder sb = new StringBuilder(CLUSTER_GROUP_PREFIX);
+    for (int i = 0; i < clusteringValueIds.size(); i++) {
+      if (i > 0) {
+        sb.append('_');
+      }
+      sb.append(clusteringValueIds.get(i));
+    }
+    sb.append('/');
+    return sb.toString();
+  }
+
+  /**
+   * Returns the subset of {@code groups} that a query filter can't rule out 
from clustering values alone.
+   * Filters not referencing any clustering column are conservatively retained 
for every group.
+   */
+  public static List<TableClusterGroupSpec> pruneClusterGroups(
+      List<TableClusterGroupSpec> groups,
+      @Nullable Filter filter,
+      @Nullable VirtualColumns queryVirtualColumns
+  )
+  {
+    if (filter == null || groups.isEmpty()) {
+      return groups;
+    }
+    final VirtualColumns queryVcs = queryVirtualColumns == null ? 
VirtualColumns.EMPTY : queryVirtualColumns;
+    final List<TableClusterGroupSpec> kept = new ArrayList<>(groups.size());
+    for (TableClusterGroupSpec group : groups) {
+      if (matchesClusterGroup(filter, group, queryVcs)) {
+        kept.add(group);
+      }
+    }
+    return kept;
+  }
+
+  private static boolean matchesClusterGroup(
+      Filter filter,
+      TableClusterGroupSpec group,
+      VirtualColumns queryVcs
+  )
+  {
+    final ClusteredValueGroupsBaseTableSchema summary = group.getSummary();
+    final RowSignature clusteringColumns = summary.getClusteringColumns();
+    final VirtualColumns groupVcs = summary.getVirtualColumns();
+
+    // remap query-side column names that are virtual-column-equivalent to a 
clustering column
+    final Map<String, String> remap = buildClusterGroupRemap(
+        filter.getRequiredColumns(),
+        clusteringColumns,
+        queryVcs,
+        groupVcs
+    );
+    final Filter rewritten = remap.isEmpty() ? filter : 
filter.rewriteRequiredColumns(remap);
+
+    // keep unless provably FALSE; UNKNOWN (filter references non-clustering 
data) keeps conservatively
+    return matchesClusterGroupFilter(rewritten, clusteringColumns, 
group.lookupClusteringValues())
+        != DruidPredicateMatch.FALSE;
+  }
+
+  /**
+   * Build a name-rewrite map so the pruner can walk the filter against a 
group's clustering tuple. Three cases per
+   * filter column:
+   * <ol>
+   *   <li>Query virtual column by that name (shadows any physical of the same 
name): prunable iff the group has an
+   *       equivalent VC whose output is a clustering column. Same-name 
equivalence is an identity entry; different-name
+   *       equivalence remaps. Otherwise, the column is remapped to a 
non-clustering sentinel so the pruner returns
+   *       UNKNOWN, without that, a query virtual sharing a clustering 
column's name would be mis-matched against the
+   *       clustering value.</li>
+   *   <li>No query virtual column, but the name is a clustering column: 
identity entry, filter walks it directly.</li>
+   *   <li>Neither query virtual column nor clustering column: identity entry, 
pruner returns UNKNOWN at that leaf.</li>
+   * </ol>
+   * The query virtual column check must come first because query VC names are 
allowed to shadow physical/clustering
+   * column names.
+   * <p/>
+   * If no column needs a non-identity rewrite the result is empty and the 
caller skips the rewrite call. When any
+   * non-identity rewrite is present, identity entries are populated for all 
remaining required columns because
+   * {@link Filter#rewriteRequiredColumns} requires an entry for every 
referenced column (missing entries throw).
+   */
+  private static Map<String, String> buildClusterGroupRemap(
+      Set<String> requiredColumns,
+      RowSignature clusteringColumns,
+      VirtualColumns queryVcs,
+      VirtualColumns groupVcs
+  )
+  {
+    // if Query virtual column isn't equivalent to a clustering column, we 
remap to a sentinel name that is guaranteed
+    // not to be a clustering column so the pruner returns UNKNOWN for any 
leaf referencing it; compute it lazily
+    final Supplier<String> sentinel = Suppliers.memoize(() -> {
+      String candidate = "__nonClusteringRef";
+      while (clusteringColumns.indexOf(candidate) >= 0) {
+        candidate = "_" + candidate;
+      }
+      return candidate;
+    });
+
+    Map<String, String> remap = null;
+    for (String col : requiredColumns) {
+      final VirtualColumns.Node queryNode = queryVcs.getNode(col);
+      if (queryNode == null) {
+        continue;
+      }
+      // query treats `col` as a virtual column, shadowing any 
physical/clustering column of the same name
+      final VirtualColumn equivalent = groupVcs.findEquivalent(queryNode);
+      final String target = equivalent != null && 
clusteringColumns.contains(equivalent.getOutputName())
+                            ? equivalent.getOutputName()
+                            : null;
+      if (target == null) {
+        // query column has same name as a clustering column, but has no 
equivalent clustering virtual column; remap to
+        // use sentinel to be safe
+        if (remap == null) {
+          remap = new HashMap<>();
+        }
+        remap.put(col, sentinel.get());
+      } else if (!col.equals(target)) {
+        if (remap == null) {
+          remap = new HashMap<>();
+        }
+        remap.put(col, target);
+      }
+      // else: same name, handled below if remap ends up non-empty
+    }
+    if (remap == null) {
+      return Collections.emptyMap();
+    }
+    // fill identity entries for the remaining required columns; 
rewriteRequiredColumns rejects partial maps
+    for (String col : requiredColumns) {
+      remap.putIfAbsent(col, col);
+    }
+    return remap;
+  }
+
+  /**
+   * Walk a (remapped) filter against a group's constant clustering values. 
Returns a {@link DruidPredicateMatch}
+   * 3VL result: TRUE = filter holds for every row in the group, FALSE = 
filter holds for no row (group can be
+   * pruned), UNKNOWN = can't decide from clustering values alone (filter 
references non-clustering data, or
+   * unrecognized filter type). UNKNOWN is distinct from FALSE so that {@code 
NOT(can't-tell)} stays
+   * {@code can't-tell} rather than flipping to "definitely false" and 
silently pruning live groups.
+   */
+  private static DruidPredicateMatch matchesClusterGroupFilter(
+      Filter filter,
+      RowSignature clusteringColumns,
+      Object[] clusteringValues
+  )
+  {
+    if (filter instanceof AndFilter andFilter) {
+      DruidPredicateMatch result = DruidPredicateMatch.TRUE;   // identity for 
AND
+      for (Filter sub : andFilter.getFilters()) {
+        result = DruidPredicateMatch.and(result, 
matchesClusterGroupFilter(sub, clusteringColumns, clusteringValues));
+        if (result == DruidPredicateMatch.FALSE) {
+          return result;   // short-circuit: AND with false stays false
+        }
+      }
+      return result;
+    }
+
+    if (filter instanceof OrFilter orFilter) {
+      DruidPredicateMatch result = DruidPredicateMatch.FALSE;   // identity 
for OR
+      for (Filter sub : orFilter.getFilters()) {
+        result = DruidPredicateMatch.or(result, matchesClusterGroupFilter(sub, 
clusteringColumns, clusteringValues));
+        if (result == DruidPredicateMatch.TRUE) {
+          return result;   // short-circuit: OR with true stays true
+        }
+      }
+      return result;
+    }
+
+    if (filter instanceof NotFilter notFilter) {
+      return DruidPredicateMatch.not(
+          matchesClusterGroupFilter(notFilter.getBaseFilter(), 
clusteringColumns, clusteringValues)
+      );
+    }
+
+    if (filter instanceof NullFilter isNull) {
+      final int idx = clusteringColumns.indexOf(isNull.getColumn());
+      if (idx < 0) {
+        return DruidPredicateMatch.UNKNOWN;
+      }
+      return DruidPredicateMatch.of(clusteringValues[idx] == null);
+    }
+
+    if (filter instanceof EqualityFilter eq) {
+      final int idx = clusteringColumns.indexOf(eq.getColumn());
+      if (idx < 0) {
+        return DruidPredicateMatch.UNKNOWN;
+      }
+      // EqualityFilter doesn't match nulls; constructor also rejects null 
match values.
+      if (clusteringValues[idx] == null) {
+        return DruidPredicateMatch.FALSE;
+      }
+      return DruidPredicateMatch.of(Objects.equals(clusteringValues[idx], 
eq.getMatchValue()));

Review Comment:
   [P1] Pruning ignores EqualityFilter type coercion semantics
   
   The pruner compares the stored clustering value directly with 
eq.getMatchValue(), but EqualityFilter normal matcher casts the literal to the 
input column type. For example, a LONG clustering value 5 with an 
EqualityFilter typed as STRING value "5" would match when evaluated by the real 
matcher, but Objects.equals(5L, "5") returns false here and the only live 
cluster group is removed before scanning. TypedInFilter has the same risk below 
with direct Objects.equals over the raw sorted values. The pruning check should 
use the same predicate/cast semantics as the filter matcher, or stay UNKNOWN 
when the literal type does not exactly match the clustering column type.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to