FrankChen021 commented on code in PR #19460:
URL: https://github.com/apache/druid/pull/19460#discussion_r3241280810
##########
processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorFactory.java:
##########
@@ -110,9 +143,328 @@ public List<AggregatorFactory>
getAggregatorsForPreAggregated()
};
}
+ private CursorHolder makeClusteredCursorHolder(CursorBuildSpec spec,
ClusteredValueGroupsBaseTableSchema clusterSummary)
+ {
+ final List<TableClusterGroupSpec> matching =
Projections.pruneClusterGroups(
+ new ArrayList<>(index.getClusterGroupSchemas()),
+ spec.getFilter(),
+ spec.getVirtualColumns()
+ );
+
+ if (matching.isEmpty()) {
+ return EmptyClusteredCursorHolder.INSTANCE;
+ }
+
+ if (matching.size() == 1) {
+ return makeSingleGroupClusteredCursorHolder(spec, matching.get(0));
+ }
+ return makeMultiGroupClusteredCursorHolder(spec, matching);
+ }
+
+ private CursorHolder makeSingleGroupClusteredCursorHolder(
+ CursorBuildSpec spec,
+ TableClusterGroupSpec valueGroup
+ )
+ {
+ final QueryableIndex groupIndex =
index.getClusterGroupQueryableIndex(valueGroup);
+ if (groupIndex == null) {
+ throw DruidException.defensive(
+ "No cluster-group sub-index resolvable for clustering values "
+ + Arrays.toString(valueGroup.lookupClusteringValues())
+ );
+ }
+
+ return new QueryableIndexCursorHolder(
Review Comment:
[P1] Cluster-column filters are reapplied against sub-indexes that do not
contain them
After pruneClusterGroups keeps a matching group, the original
CursorBuildSpec is passed unchanged into a QueryableIndexCursorHolder for the
group sub-index. That sub-index intentionally does not contain the clustering
columns, so EqualityFilter.getBitmapColumnIndex sees a missing column and
builds an exact all-false/all-unknown bitmap before the
ClusteringColumnSelectorFactory wrapper can provide the constant value to a
matcher. Queries such as tenant = 'acme' therefore prune to the acme group and
then scan zero rows. The multi-group supplier path repeats the same pattern, so
clustering predicates need to be stripped/residualized after pruning or the
group holder needs a cluster-aware ColumnIndexSelector/filter-bundle path.
##########
processing/src/main/java/org/apache/druid/segment/projections/Projections.java:
##########
@@ -530,6 +548,241 @@ public static String
getProjectionSegmentInternalFilePrefix(ProjectionSchema pro
return projectionSchema.getName() + "/";
}
+ /**
+ * Check whether {@code type} is an allowed cluster group clustering-column
type. Clustering is restricted to the
+ * primitive scalar types: {@link ValueType#STRING}, {@link ValueType#LONG},
{@link ValueType#DOUBLE},
+ * {@link ValueType#FLOAT}. Complex and array types are rejected.
+ */
+ public static boolean isAllowedClusteringType(@Nullable ColumnType type)
+ {
+ return type != null && type.anyOf(ValueType.STRING, ValueType.LONG,
ValueType.DOUBLE, ValueType.FLOAT);
+ }
+
+ /**
+ * Segment internal file prefix + column for a cluster group's per-group
column data:
+ * {@code __base$<id0>_<id1>...<idK>/<column>}
+ */
+ public static String getClusterGroupSegmentInternalFileName(List<Integer>
clusteringValueIds, String column)
+ {
+ return getClusterGroupSegmentInternalFilePrefix(clusteringValueIds) +
column;
+ }
+
+ public static String getClusterGroupSegmentInternalFilePrefix(List<Integer>
clusteringValueIds)
+ {
+ if (clusteringValueIds == null || clusteringValueIds.isEmpty()) {
+ throw DruidException.defensive("clusteringValueIds must not be null or
empty");
+ }
+ final StringBuilder sb = new StringBuilder(CLUSTER_GROUP_PREFIX);
+ for (int i = 0; i < clusteringValueIds.size(); i++) {
+ if (i > 0) {
+ sb.append('_');
+ }
+ sb.append(clusteringValueIds.get(i));
+ }
+ sb.append('/');
+ return sb.toString();
+ }
+
+ /**
+ * Returns the subset of {@code groups} that a query filter can't rule out
from clustering values alone.
+ * Filters not referencing any clustering column are conservatively retained
for every group.
+ */
+ public static List<TableClusterGroupSpec> pruneClusterGroups(
+ List<TableClusterGroupSpec> groups,
+ @Nullable Filter filter,
+ @Nullable VirtualColumns queryVirtualColumns
+ )
+ {
+ if (filter == null || groups.isEmpty()) {
+ return groups;
+ }
+ final VirtualColumns queryVcs = queryVirtualColumns == null ?
VirtualColumns.EMPTY : queryVirtualColumns;
+ final List<TableClusterGroupSpec> kept = new ArrayList<>(groups.size());
+ for (TableClusterGroupSpec group : groups) {
+ if (matchesClusterGroup(filter, group, queryVcs)) {
+ kept.add(group);
+ }
+ }
+ return kept;
+ }
+
+ private static boolean matchesClusterGroup(
+ Filter filter,
+ TableClusterGroupSpec group,
+ VirtualColumns queryVcs
+ )
+ {
+ final ClusteredValueGroupsBaseTableSchema summary = group.getSummary();
+ final RowSignature clusteringColumns = summary.getClusteringColumns();
+ final VirtualColumns groupVcs = summary.getVirtualColumns();
+
+ // remap query-side column names that are virtual-column-equivalent to a
clustering column
+ final Map<String, String> remap = buildClusterGroupRemap(
+ filter.getRequiredColumns(),
+ clusteringColumns,
+ queryVcs,
+ groupVcs
+ );
+ final Filter rewritten = remap.isEmpty() ? filter :
filter.rewriteRequiredColumns(remap);
+
+ // keep unless provably FALSE; UNKNOWN (filter references non-clustering
data) keeps conservatively
+ return matchesClusterGroupFilter(rewritten, clusteringColumns,
group.lookupClusteringValues())
+ != DruidPredicateMatch.FALSE;
+ }
+
+ /**
+ * Build a name-rewrite map so the pruner can walk the filter against a
group's clustering tuple. Three cases per
+ * filter column:
+ * <ol>
+ * <li>Query virtual column by that name (shadows any physical of the same
name): prunable iff the group has an
+ * equivalent VC whose output is a clustering column. Same-name
equivalence is an identity entry; different-name
+ * equivalence remaps. Otherwise, the column is remapped to a
non-clustering sentinel so the pruner returns
+ * UNKNOWN, without that, a query virtual sharing a clustering
column's name would be mis-matched against the
+ * clustering value.</li>
+ * <li>No query virtual column, but the name is a clustering column:
identity entry, filter walks it directly.</li>
+ * <li>Neither query virtual column nor clustering column: identity entry,
pruner returns UNKNOWN at that leaf.</li>
+ * </ol>
+ * The query virtual column check must come first because query VC names are
allowed to shadow physical/clustering
+ * column names.
+ * <p/>
+ * If no column needs a non-identity rewrite the result is empty and the
caller skips the rewrite call. When any
+ * non-identity rewrite is present, identity entries are populated for all
remaining required columns because
+ * {@link Filter#rewriteRequiredColumns} requires an entry for every
referenced column (missing entries throw).
+ */
+ private static Map<String, String> buildClusterGroupRemap(
+ Set<String> requiredColumns,
+ RowSignature clusteringColumns,
+ VirtualColumns queryVcs,
+ VirtualColumns groupVcs
+ )
+ {
+ // if Query virtual column isn't equivalent to a clustering column, we
remap to a sentinel name that is guaranteed
+ // not to be a clustering column so the pruner returns UNKNOWN for any
leaf referencing it; compute it lazily
+ final Supplier<String> sentinel = Suppliers.memoize(() -> {
+ String candidate = "__nonClusteringRef";
+ while (clusteringColumns.indexOf(candidate) >= 0) {
+ candidate = "_" + candidate;
+ }
+ return candidate;
+ });
+
+ Map<String, String> remap = null;
+ for (String col : requiredColumns) {
+ final VirtualColumns.Node queryNode = queryVcs.getNode(col);
+ if (queryNode == null) {
+ continue;
+ }
+ // query treats `col` as a virtual column, shadowing any
physical/clustering column of the same name
+ final VirtualColumn equivalent = groupVcs.findEquivalent(queryNode);
+ final String target = equivalent != null &&
clusteringColumns.contains(equivalent.getOutputName())
+ ? equivalent.getOutputName()
+ : null;
+ if (target == null) {
+ // query column has same name as a clustering column, but has no
equivalent clustering virtual column; remap to
+ // use sentinel to be safe
+ if (remap == null) {
+ remap = new HashMap<>();
+ }
+ remap.put(col, sentinel.get());
+ } else if (!col.equals(target)) {
+ if (remap == null) {
+ remap = new HashMap<>();
+ }
+ remap.put(col, target);
+ }
+ // else: same name, handled below if remap ends up non-empty
+ }
+ if (remap == null) {
+ return Collections.emptyMap();
+ }
+ // fill identity entries for the remaining required columns;
rewriteRequiredColumns rejects partial maps
+ for (String col : requiredColumns) {
+ remap.putIfAbsent(col, col);
+ }
+ return remap;
+ }
+
+ /**
+ * Walk a (remapped) filter against a group's constant clustering values.
Returns a {@link DruidPredicateMatch}
+ * 3VL result: TRUE = filter holds for every row in the group, FALSE =
filter holds for no row (group can be
+ * pruned), UNKNOWN = can't decide from clustering values alone (filter
references non-clustering data, or
+ * unrecognized filter type). UNKNOWN is distinct from FALSE so that {@code
NOT(can't-tell)} stays
+ * {@code can't-tell} rather than flipping to "definitely false" and
silently pruning live groups.
+ */
+ private static DruidPredicateMatch matchesClusterGroupFilter(
+ Filter filter,
+ RowSignature clusteringColumns,
+ Object[] clusteringValues
+ )
+ {
+ if (filter instanceof AndFilter andFilter) {
+ DruidPredicateMatch result = DruidPredicateMatch.TRUE; // identity for
AND
+ for (Filter sub : andFilter.getFilters()) {
+ result = DruidPredicateMatch.and(result,
matchesClusterGroupFilter(sub, clusteringColumns, clusteringValues));
+ if (result == DruidPredicateMatch.FALSE) {
+ return result; // short-circuit: AND with false stays false
+ }
+ }
+ return result;
+ }
+
+ if (filter instanceof OrFilter orFilter) {
+ DruidPredicateMatch result = DruidPredicateMatch.FALSE; // identity
for OR
+ for (Filter sub : orFilter.getFilters()) {
+ result = DruidPredicateMatch.or(result, matchesClusterGroupFilter(sub,
clusteringColumns, clusteringValues));
+ if (result == DruidPredicateMatch.TRUE) {
+ return result; // short-circuit: OR with true stays true
+ }
+ }
+ return result;
+ }
+
+ if (filter instanceof NotFilter notFilter) {
+ return DruidPredicateMatch.not(
+ matchesClusterGroupFilter(notFilter.getBaseFilter(),
clusteringColumns, clusteringValues)
+ );
+ }
+
+ if (filter instanceof NullFilter isNull) {
+ final int idx = clusteringColumns.indexOf(isNull.getColumn());
+ if (idx < 0) {
+ return DruidPredicateMatch.UNKNOWN;
+ }
+ return DruidPredicateMatch.of(clusteringValues[idx] == null);
+ }
+
+ if (filter instanceof EqualityFilter eq) {
+ final int idx = clusteringColumns.indexOf(eq.getColumn());
+ if (idx < 0) {
+ return DruidPredicateMatch.UNKNOWN;
+ }
+ // EqualityFilter doesn't match nulls; constructor also rejects null
match values.
+ if (clusteringValues[idx] == null) {
+ return DruidPredicateMatch.FALSE;
+ }
+ return DruidPredicateMatch.of(Objects.equals(clusteringValues[idx],
eq.getMatchValue()));
Review Comment:
[P1] Pruning ignores EqualityFilter type coercion semantics
The pruner compares the stored clustering value directly with
eq.getMatchValue(), but EqualityFilter normal matcher casts the literal to the
input column type. For example, a LONG clustering value 5 with an
EqualityFilter typed as STRING value "5" would match when evaluated by the real
matcher, but Objects.equals(5L, "5") returns false here and the only live
cluster group is removed before scanning. TypedInFilter has the same risk below
with direct Objects.equals over the raw sorted values. The pruning check should
use the same predicate/cast semantics as the filter matcher, or stay UNKNOWN
when the literal type does not exactly match the clustering column type.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]