cecemei commented on code in PR #19460: URL: https://github.com/apache/druid/pull/19460#discussion_r3307052466
########## processing/src/main/java/org/apache/druid/segment/projections/ClusteringColumnSelectorFactory.java: ########## @@ -0,0 +1,566 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.projections; + +import org.apache.druid.error.DruidException; +import org.apache.druid.math.expr.ExprEval; +import org.apache.druid.math.expr.ExpressionType; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.filter.DruidPredicateFactory; +import org.apache.druid.query.filter.ValueMatcher; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.ConstantExprEvalSelector; +import org.apache.druid.segment.DimensionSelector; +import org.apache.druid.segment.IdLookup; +import org.apache.druid.segment.RowIdSupplier; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ColumnCapabilitiesImpl; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.data.IndexedInts; + +import javax.annotation.Nullable; +import java.util.function.Supplier; + +/** + * {@link ColumnSelectorFactory} wrapper that intercepts requests for clustering columns and returns selectors + * carrying the group's constant value, while delegating all other column lookups to a wrapped factory. This is the + * mechanism by which a cluster group's clustering columns, which are NOT stored in the per-group column data since + * they're constant across the group, are made visible to query engines as if they were ordinary columns. + */ +public class ClusteringColumnSelectorFactory implements ColumnSelectorFactory +{ + private final RowSignature clusteringColumns; + private ColumnSelectorFactory delegate; + private Object[] clusteringValues; + // Bumped on every setDelegate(...) so per-call selector wrappers can detect group transitions and rebuild their + // cached inner state + private long generation; + + public ClusteringColumnSelectorFactory( + ColumnSelectorFactory delegate, + RowSignature clusteringColumns, + Object[] clusteringValues + ) + { + this.clusteringColumns = clusteringColumns; + setDelegate(delegate, clusteringValues); + } + + /** + * Update the underlying factory and the constant values for the current cluster group. Called by a multi-group + * concatenating cursor on each group transition. Selectors previously returned by this factory will, on their next + * invocation, observe the updated state; see the per-call indirection in the inner selector classes. + */ + public void setDelegate(ColumnSelectorFactory delegate, Object[] clusteringValues) Review Comment: I find how `setDelegate` and `generation` usage hard to reason. It felt like we should have all `CursorHolder` in this class and let it manage the advance of cursor, generation, ColumnSelectorFactory, along with the ValueMatcher all together in this class. and `ClusteringColumnSelectorFactory` can implement `Iterable<Cursor>`, WDYT? ########## processing/src/main/java/org/apache/druid/segment/ConcatenatingCursor.java: ########## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.base.Supplier; +import org.apache.druid.error.DruidException; +import org.apache.druid.segment.projections.ClusteringColumnSelectorFactory; + +import javax.annotation.Nullable; +import java.util.List; + +/** + * {@link Cursor} that walks a sequence of per-group cursors back-to-back, presenting them to the caller as a single + * cursor over a clustered base table. On group transitions the wrapper {@link ColumnSelectorFactory} updates its + * underlying delegate and clustering values so previously-acquired delegating selectors observe the new group's data + * on their next access. + * <p> + * Each entry in {@link #holderSuppliers} is a lazy producer of a {@link CursorHolder} for one cluster group. The + * outer {@link CursorHolder} owns the lifecycle of the per-group holders. + */ +public final class ConcatenatingCursor implements Cursor +{ + private final List<Supplier<CursorHolder>> holderSuppliers; + private final List<Object[]> clusteringValuesByGroup; + private final ClusteringColumnSelectorFactory wrapperFactory; + + private int currentIdx; + @Nullable + private Cursor currentCursor; + private boolean initialized; + + public ConcatenatingCursor( + List<Supplier<CursorHolder>> holderSuppliers, + List<Object[]> clusteringValuesByGroup, + ClusteringColumnSelectorFactory wrapperFactory + ) + { + if (holderSuppliers.size() != clusteringValuesByGroup.size()) { + throw DruidException.defensive( + "holderSuppliers size [%s] must equal clusteringValuesByGroup size [%s]", + holderSuppliers.size(), + clusteringValuesByGroup.size() + ); + } + if (holderSuppliers.isEmpty()) { + throw DruidException.defensive("ConcatenatingCursor requires at least one cluster group"); + } + this.holderSuppliers = holderSuppliers; + this.clusteringValuesByGroup = clusteringValuesByGroup; + this.wrapperFactory = wrapperFactory; + this.currentIdx = -1; + } + + private void initializeIfNeeded() + { + if (initialized) { + return; + } + initialized = true; + advanceToNextNonEmptyGroup(); + } + + /** + * Open the next group whose cursor has at least one row. Sets {@code currentCursor = null} when all groups are + * exhausted. + */ + private void advanceToNextNonEmptyGroup() + { + while (++currentIdx < holderSuppliers.size()) { + final CursorHolder holder = holderSuppliers.get(currentIdx).get(); + final Cursor cursor = holder.asCursor(); + if (cursor != null && !cursor.isDone()) { + currentCursor = cursor; + wrapperFactory.setDelegate(cursor.getColumnSelectorFactory(), clusteringValuesByGroup.get(currentIdx)); + return; + } + // Group has no rows after filter application; try the next. + } + currentCursor = null; + } + + @Override + public ColumnSelectorFactory getColumnSelectorFactory() + { + initializeIfNeeded(); + return wrapperFactory; + } + + @Override + public void advance() + { + initializeIfNeeded(); + if (currentCursor == null) { + return; + } Review Comment: nit: can reuse `isDone()` ########## processing/src/main/java/org/apache/druid/segment/projections/Projections.java: ########## @@ -530,6 +547,243 @@ public static String getProjectionSegmentInternalFilePrefix(ProjectionSchema pro return projectionSchema.getName() + "/"; } + /** + * Check whether {@code type} is an allowed cluster group clustering-column type. Clustering is restricted to the + * primitive scalar types: {@link ValueType#STRING}, {@link ValueType#LONG}, {@link ValueType#DOUBLE}, + * {@link ValueType#FLOAT}. Complex and array types are rejected. + */ + public static boolean isAllowedClusteringType(@Nullable ColumnType type) + { + return type != null && type.anyOf(ValueType.STRING, ValueType.LONG, ValueType.DOUBLE, ValueType.FLOAT); + } + + /** + * Segment internal file prefix + column for a cluster group's per-group column data: + * {@code __base$<id0>_<id1>...<idK>/<column>} + */ + public static String getClusterGroupSegmentInternalFileName(List<Integer> clusteringValueIds, String column) + { + return getClusterGroupSegmentInternalFilePrefix(clusteringValueIds) + column; + } + + public static String getClusterGroupSegmentInternalFilePrefix(List<Integer> clusteringValueIds) + { + if (clusteringValueIds == null || clusteringValueIds.isEmpty()) { + throw DruidException.defensive("clusteringValueIds must not be null or empty"); + } + final StringBuilder sb = new StringBuilder(CLUSTER_GROUP_PREFIX); + for (int i = 0; i < clusteringValueIds.size(); i++) { + if (i > 0) { + sb.append('_'); + } + sb.append(clusteringValueIds.get(i)); + } + sb.append('/'); + return sb.toString(); + } + + /** + * Build the per-query {@link ClusterGroupQueryPlan} for {@code groups} against a {@link CursorBuildSpec}. Walks the + * filter tree once per group via {@link #walkClusterGroupFilter}, folding clustering-column leaves to + * {@link TrueFilter} / {@link FalseFilter} against each group's constant clustering tuple and propagating those + * constants through AND / OR / NOT. Non-clustering filters remain in place so the per-group cursor evaluates them + * as expected. Query-VC-equivalent-to-clustering-VC resolution happens per-leaf via {@link #resolveClusteringIndex}. + * <p/> + * Output shape per group encodes the truth value: top-level {@link FalseFilter} = provably FALSE (group is + * pruned from {@link ClusterGroupQueryPlan#survivingGroups()}), top-level {@link TrueFilter} = provably TRUE + * (no residual filter needed at the cursor), anything else = UNKNOWN (residual filter passed to the per-group + * cursor). The walker's result is stashed on the plan so {@link ClusterGroupQueryPlan#rewriteFor} hands it back + * directly without re-walking. + */ + public static ClusterGroupQueryPlan planClusterGroupQuery( + List<TableClusterGroupSpec> groups, + CursorBuildSpec cursorBuildSpec + ) + { + final Filter queryFilter = cursorBuildSpec.getFilter(); + final VirtualColumns queryVcs = cursorBuildSpec.getVirtualColumns(); + if (groups.isEmpty() || queryFilter == null) { + // No filter (or no groups): every group survives, per-group rewrite is a no-op (null filter). + return new ClusterGroupQueryPlan(groups, group -> null); + } + + // Every spec in the list shares one summary by construction (set once in the schema constructor), so + // clusteringColumns + groupVcs are loop-invariant, only the per-group clustering tuple changes. + final ClusteredValueGroupsBaseTableSchema summary = groups.getFirst().getSummary(); + final RowSignature clusteringColumns = summary.getClusteringColumns(); + final VirtualColumns groupVcs = summary.getVirtualColumns(); + + // Single walk per group: produces the rewritten filter, and a top-level FalseFilter means the group prunes. + // Cache the rewrite for every group (including pruned ones, where it's FalseFilter) so rewriteFor doesn't + // re-walk for either the cursor factory or callers that want to inspect a pruned group's outcome directly. + final List<TableClusterGroupSpec> kept = new ArrayList<>(groups.size()); + final IdentityHashMap<TableClusterGroupSpec, Filter> rewriteCache = new IdentityHashMap<>(); Review Comment: it felt like `ClusterGroupQueryPlan` doesnt have much more than this hash map, is there any other reason that we want to have a class wrap it? ########## processing/src/main/java/org/apache/druid/segment/ConcatenatingCursor.java: ########## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.base.Supplier; +import org.apache.druid.error.DruidException; +import org.apache.druid.segment.projections.ClusteringColumnSelectorFactory; + +import javax.annotation.Nullable; +import java.util.List; + +/** + * {@link Cursor} that walks a sequence of per-group cursors back-to-back, presenting them to the caller as a single + * cursor over a clustered base table. On group transitions the wrapper {@link ColumnSelectorFactory} updates its + * underlying delegate and clustering values so previously-acquired delegating selectors observe the new group's data + * on their next access. + * <p> + * Each entry in {@link #holderSuppliers} is a lazy producer of a {@link CursorHolder} for one cluster group. The + * outer {@link CursorHolder} owns the lifecycle of the per-group holders. + */ +public final class ConcatenatingCursor implements Cursor +{ + private final List<Supplier<CursorHolder>> holderSuppliers; + private final List<Object[]> clusteringValuesByGroup; + private final ClusteringColumnSelectorFactory wrapperFactory; + + private int currentIdx; + @Nullable + private Cursor currentCursor; + private boolean initialized; + + public ConcatenatingCursor( + List<Supplier<CursorHolder>> holderSuppliers, + List<Object[]> clusteringValuesByGroup, + ClusteringColumnSelectorFactory wrapperFactory + ) + { + if (holderSuppliers.size() != clusteringValuesByGroup.size()) { + throw DruidException.defensive( + "holderSuppliers size [%s] must equal clusteringValuesByGroup size [%s]", + holderSuppliers.size(), + clusteringValuesByGroup.size() + ); + } + if (holderSuppliers.isEmpty()) { + throw DruidException.defensive("ConcatenatingCursor requires at least one cluster group"); + } + this.holderSuppliers = holderSuppliers; + this.clusteringValuesByGroup = clusteringValuesByGroup; + this.wrapperFactory = wrapperFactory; + this.currentIdx = -1; + } + + private void initializeIfNeeded() + { + if (initialized) { + return; + } + initialized = true; + advanceToNextNonEmptyGroup(); + } + + /** + * Open the next group whose cursor has at least one row. Sets {@code currentCursor = null} when all groups are + * exhausted. + */ + private void advanceToNextNonEmptyGroup() + { + while (++currentIdx < holderSuppliers.size()) { + final CursorHolder holder = holderSuppliers.get(currentIdx).get(); + final Cursor cursor = holder.asCursor(); + if (cursor != null && !cursor.isDone()) { + currentCursor = cursor; + wrapperFactory.setDelegate(cursor.getColumnSelectorFactory(), clusteringValuesByGroup.get(currentIdx)); + return; + } + // Group has no rows after filter application; try the next. + } + currentCursor = null; + } + + @Override + public ColumnSelectorFactory getColumnSelectorFactory() + { + initializeIfNeeded(); + return wrapperFactory; + } + + @Override + public void advance() + { + initializeIfNeeded(); + if (currentCursor == null) { + return; + } + currentCursor.advance(); + if (currentCursor.isDone()) { + advanceToNextNonEmptyGroup(); + } + } + + @Override + public void advanceUninterruptibly() + { + initializeIfNeeded(); + if (currentCursor == null) { + return; + } Review Comment: same here, can reuse `isDone()` ########## processing/src/main/java/org/apache/druid/segment/ConcatenatingCursor.java: ########## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.base.Supplier; +import org.apache.druid.error.DruidException; +import org.apache.druid.segment.projections.ClusteringColumnSelectorFactory; + +import javax.annotation.Nullable; +import java.util.List; + +/** + * {@link Cursor} that walks a sequence of per-group cursors back-to-back, presenting them to the caller as a single + * cursor over a clustered base table. On group transitions the wrapper {@link ColumnSelectorFactory} updates its + * underlying delegate and clustering values so previously-acquired delegating selectors observe the new group's data + * on their next access. + * <p> + * Each entry in {@link #holderSuppliers} is a lazy producer of a {@link CursorHolder} for one cluster group. The + * outer {@link CursorHolder} owns the lifecycle of the per-group holders. + */ +public final class ConcatenatingCursor implements Cursor +{ + private final List<Supplier<CursorHolder>> holderSuppliers; + private final List<Object[]> clusteringValuesByGroup; + private final ClusteringColumnSelectorFactory wrapperFactory; + + private int currentIdx; + @Nullable + private Cursor currentCursor; + private boolean initialized; + + public ConcatenatingCursor( + List<Supplier<CursorHolder>> holderSuppliers, + List<Object[]> clusteringValuesByGroup, + ClusteringColumnSelectorFactory wrapperFactory + ) + { + if (holderSuppliers.size() != clusteringValuesByGroup.size()) { + throw DruidException.defensive( + "holderSuppliers size [%s] must equal clusteringValuesByGroup size [%s]", + holderSuppliers.size(), + clusteringValuesByGroup.size() + ); + } + if (holderSuppliers.isEmpty()) { + throw DruidException.defensive("ConcatenatingCursor requires at least one cluster group"); + } + this.holderSuppliers = holderSuppliers; + this.clusteringValuesByGroup = clusteringValuesByGroup; + this.wrapperFactory = wrapperFactory; + this.currentIdx = -1; + } + + private void initializeIfNeeded() + { + if (initialized) { + return; + } + initialized = true; + advanceToNextNonEmptyGroup(); + } + + /** + * Open the next group whose cursor has at least one row. Sets {@code currentCursor = null} when all groups are Review Comment: nit: advance to the next group sounds slightly better and more consistent than open the next group. should this reset delegate as well if all groups are exhausted? ########## processing/src/main/java/org/apache/druid/segment/IndexIO.java: ########## @@ -983,31 +1009,51 @@ public QueryableIndex load(File inDir, ObjectMapper mapper, boolean lazy, Segmen ); projectionsColumns.put(projectionSpec.getSchema().getName(), projectionColumns); - if (projectionSpec.getSchema() instanceof AggregateProjectionSchema) { - aggProjections.add( - new AggregateProjectionMetadata( - (AggregateProjectionSchema) projectionSpec.getSchema(), - projectionSpec.getNumRows() - ) - ); - } else { - throw DruidException.defensive( - "Unexpected projection[%s] with type[%s]", - projectionSpec.getSchema().getName(), - projectionSpec.getSchema().getClass() - ); + aggProjections.add( + new AggregateProjectionMetadata( + (AggregateProjectionSchema) projectionSpec.getSchema(), + projectionSpec.getNumRows() + ) + ); + } + + final List<Map<String, Supplier<BaseColumnHolder>>> clusterGroupColumnsList; + if (isClusteredSummary) { Review Comment: nit: maybe we could just check null for clusteredBaseSummary, and it would not require this additional boolean variable ########## processing/src/main/java/org/apache/druid/segment/projections/ClusteredValueGroupsBaseTableSchema.java: ########## @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.projections; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.Lists; +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.query.OrderBy; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.segment.AggregateProjectionMetadata; +import org.apache.druid.segment.Metadata; +import org.apache.druid.segment.VirtualColumn; +import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.utils.CollectionUtils; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; + +/** + * Top-level summary for a clustered base table whose groups are identified by discrete clustering-value tuples. Each + * tuple group is internally stored as a separate table without storing the cluster columns, which are pulled into this + * metadata. This is optimizing for use cases which typically only need to read from a single group via filters present + * on a query. Cluster groups nest inside as {@link #getClusterGroups()}; their column data live in the V10 segment + * file under dictionary-id-tuple prefixes ({@code __base$<id0>_<id1>...<idK>/<col>}), where the ids index into + * {@link #getClusteringDictionaries()}. + */ +public class ClusteredValueGroupsBaseTableSchema implements BaseTableProjectionSchema +{ + public static final String TYPE_NAME = "clustered-value-groups-base-table"; + + private final VirtualColumns virtualColumns; + private final List<String> columnNames; + private final AggregatorFactory[] aggregators; + private final List<OrderBy> ordering; + private final RowSignature clusteringColumns; + private final List<String> sharedColumns; + private final ClusteringDictionaries clusteringDictionaries; + private final List<TableClusterGroupSpec> clusterGroups; + + // computed + private final int timeColumnPosition; + private final Granularity effectiveGranularity; + + @JsonCreator + public ClusteredValueGroupsBaseTableSchema( Review Comment: while reading this file, i find myself mumbling words like clustering/ group too much, some personal preference for naming just for inspiration: clusteringColumns -> groupKeySchema clusteringDictionaries → groupKeyDictionaries clusterGroups -> groupSpecs getGroupColumnNames -> getGroupInternalColumnNames getGroupDimensionNames -> getGroupInternalDimensionNames getGroupOrdering -> getGroupInternalOrdering ########## processing/src/main/java/org/apache/druid/segment/projections/ClusteredValueGroupsBaseTableSchema.java: ########## @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.projections; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.Lists; +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.query.OrderBy; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.segment.AggregateProjectionMetadata; +import org.apache.druid.segment.Metadata; +import org.apache.druid.segment.VirtualColumn; +import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.utils.CollectionUtils; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; + +/** + * Top-level summary for a clustered base table whose groups are identified by discrete clustering-value tuples. Each + * tuple group is internally stored as a separate table without storing the cluster columns, which are pulled into this + * metadata. This is optimizing for use cases which typically only need to read from a single group via filters present + * on a query. Cluster groups nest inside as {@link #getClusterGroups()}; their column data live in the V10 segment + * file under dictionary-id-tuple prefixes ({@code __base$<id0>_<id1>...<idK>/<col>}), where the ids index into + * {@link #getClusteringDictionaries()}. + */ +public class ClusteredValueGroupsBaseTableSchema implements BaseTableProjectionSchema +{ + public static final String TYPE_NAME = "clustered-value-groups-base-table"; + + private final VirtualColumns virtualColumns; + private final List<String> columnNames; + private final AggregatorFactory[] aggregators; + private final List<OrderBy> ordering; + private final RowSignature clusteringColumns; + private final List<String> sharedColumns; + private final ClusteringDictionaries clusteringDictionaries; + private final List<TableClusterGroupSpec> clusterGroups; + + // computed + private final int timeColumnPosition; + private final Granularity effectiveGranularity; + + @JsonCreator + public ClusteredValueGroupsBaseTableSchema( + @JsonProperty("virtualColumns") VirtualColumns virtualColumns, + @JsonProperty("columns") List<String> columns, + @JsonProperty("aggregators") @Nullable AggregatorFactory[] aggregators, + @JsonProperty("ordering") List<OrderBy> ordering, + @JsonProperty("clusteringColumns") RowSignature clusteringColumns, + @JsonProperty("sharedColumns") @Nullable List<String> sharedColumns, + @JsonProperty("clusteringDictionaries") @Nullable ClusteringDictionaries clusteringDictionaries, + @JsonProperty("clusterGroups") @Nullable List<TableClusterGroupSpec> clusterGroups + ) + { + if (CollectionUtils.isNullOrEmpty(columns)) { + throw DruidException.defensive("clustered base table schema columns must not be null or empty"); + } + if (ordering == null) { + throw DruidException.defensive("clustered base table schema ordering must not be null"); + } + if (clusteringColumns == null || clusteringColumns.size() == 0) { + throw DruidException.defensive( + "clustered base table schema clusteringColumns must not be null or empty" + ); + } + if (ordering.size() < clusteringColumns.size()) { + throw DruidException.defensive( + "ordering size [%s] must be at least clusteringColumns size [%s] (clustering columns must form a prefix" + + " of the segment ordering)", + ordering.size(), + clusteringColumns.size() + ); + } + for (int i = 0; i < clusteringColumns.size(); i++) { + final String clusteringColumn = clusteringColumns.getColumnName(i); + if (!columns.contains(clusteringColumn)) { + throw DruidException.defensive( + "clusteringColumn [%s] must appear in columns of the clustered base table summary", + clusteringColumn + ); + } + final ColumnType type = clusteringColumns.getColumnType(i).orElse(null); + if (!Projections.isAllowedClusteringType(type)) { + throw DruidException.defensive( + "clustering column [%s] has unsupported type [%s]; allowed types are STRING, LONG, DOUBLE, FLOAT", + clusteringColumn, + type + ); + } + // Per-group ordering is derived by dropping this prefix; pruning + cursor concatenation rely on it. + final String orderingColumn = ordering.get(i).getColumnName(); + if (!clusteringColumn.equals(orderingColumn)) { + throw DruidException.defensive( + "clustering column at position [%s] is [%s] but the segment ordering at the same position is [%s];" + + " clustering columns must form a prefix of the segment ordering", + i, + clusteringColumn, + orderingColumn + ); + } + } + final List<String> resolvedSharedColumns = sharedColumns == null ? List.of() : sharedColumns; + for (String shared : resolvedSharedColumns) { + if (!columns.contains(shared)) { + throw DruidException.defensive( + "sharedColumn [%s] must appear in columns of the clustered base table summary", + shared + ); + } + } + this.virtualColumns = virtualColumns == null ? VirtualColumns.EMPTY : virtualColumns; + this.columnNames = columns; + this.aggregators = aggregators == null ? new AggregatorFactory[0] : aggregators; + this.ordering = ordering; + this.clusteringColumns = clusteringColumns; + this.sharedColumns = resolvedSharedColumns; + this.clusterGroups = clusterGroups == null ? List.of() : List.copyOf(clusterGroups); + this.clusteringDictionaries = clusteringDictionaries == null + ? ClusteringDictionaries.EMPTY + : clusteringDictionaries; + + int foundTimePosition = -1; + Granularity granularity = null; + for (int i = 0; i < ordering.size(); i++) { + OrderBy orderBy = ordering.get(i); + if (orderBy.getColumnName().equals(ColumnHolder.TIME_COLUMN_NAME)) { + foundTimePosition = i; + final VirtualColumn vc = this.virtualColumns.getVirtualColumn(Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME); + if (vc != null) { + granularity = Granularities.fromVirtualColumn(vc); + } else { + granularity = Granularities.NONE; + } + } + } + if (granularity == null) { + throw DruidException.defensive( + "clustered base table doesn't have a [%s] column?", + ColumnHolder.TIME_COLUMN_NAME + ); + } + this.timeColumnPosition = foundTimePosition; + this.effectiveGranularity = granularity; + + // Specs always start unwired: there's a chicken-and-egg between the summary and its specs, resolved by + // deferring all summary-dependent state on the spec to setSummary, which we invoke here once the summary's + // own state is populated. + for (TableClusterGroupSpec spec : this.clusterGroups) { + spec.setSummary(this); + } + } + + @JsonIgnore + @Override + public List<String> getColumnNames() Review Comment: the `Override` doesn't seem necessary? also this function name is very similar to getColumns() except this one has the metrics, maybe we should make them a bit more distinguishable? ########## processing/src/main/java/org/apache/druid/segment/IndexIO.java: ########## @@ -952,16 +954,33 @@ public QueryableIndex load(File inDir, ObjectMapper mapper, boolean lazy, Segmen // projections can omit a __time column, but one still has to exist, so we use the interval start to make a // constant for this case final long intervalStartMillis = Intervals.of(metadata.getInterval()).getStartMillis(); - // read base table projection columns, which are shared with other projections - final Map<String, Supplier<BaseColumnHolder>> baseColumns = readProjectionColumns( - metadata, - baseProjection, - fileMapper, - Map.of(), - intervalStartMillis, - lazy, - loadFailed - ); + + // For clustered base tables the columns are always accessed through cluster groups, skip reading any base + // columns so we don't try to map files that don't exist + final boolean isClusteredSummary = baseSchema instanceof ClusteredValueGroupsBaseTableSchema; + final ClusteredValueGroupsBaseTableSchema clusteredBaseSummary; + final Map<String, Supplier<BaseColumnHolder>> baseColumns; + if (isClusteredSummary) { + clusteredBaseSummary = (ClusteredValueGroupsBaseTableSchema) baseSchema; + if (clusteredBaseSummary.getSharedColumns().isEmpty()) { Review Comment: i dont see how shared columns could be used in this PR, maybe we dont include it? ########## processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorFactory.java: ########## @@ -110,9 +145,354 @@ public List<AggregatorFactory> getAggregatorsForPreAggregated() }; } + private CursorHolder makeClusteredCursorHolder(CursorBuildSpec spec, ClusteredValueGroupsBaseTableSchema clusterSummary) + { + final ClusterGroupQueryPlan plan = Projections.planClusterGroupQuery( + new ArrayList<>(index.getClusterGroupSchemas()), + spec + ); + + if (plan.survivingGroups().isEmpty()) { + return EmptyClusteredCursorHolder.INSTANCE; + } + + if (plan.survivingGroups().size() == 1) { + return makeSingleGroupClusteredCursorHolder(spec, plan, plan.survivingGroups().get(0)); + } + return makeMultiGroupClusteredCursorHolder(spec, plan); + } + + /** + * Rebuild {@code spec} for the per-group cursor holder of {@code valueGroup}, swapping in the plan's per-group + * filter rewrite: clustering-column leaves become {@link org.apache.druid.segment.filter.TrueFilter} / + * {@link org.apache.druid.segment.filter.FalseFilter} per the group's constant clustering tuple and fold through + * AND / OR / NOT, so the per-group {@link QueryableIndex}'s filter machinery never tries to look up indexes for + * clustering columns it doesn't physically carry. Selector-side access to clustering columns (SELECT / GROUP BY) + * is still served by {@link ClusteringColumnSelectorFactory} below. + */ + private static CursorBuildSpec rebuildSpecForGroup( + CursorBuildSpec spec, + ClusterGroupQueryPlan plan, + TableClusterGroupSpec valueGroup + ) + { + if (spec.getFilter() == null) { + return spec; + } + final Filter rewritten = plan.rewriteFor(valueGroup); + if (rewritten == spec.getFilter()) { + return spec; + } + return CursorBuildSpec.builder(spec).setFilter(rewritten).build(); + } + + private CursorHolder makeSingleGroupClusteredCursorHolder( + CursorBuildSpec spec, + ClusterGroupQueryPlan plan, + TableClusterGroupSpec valueGroup + ) + { + final QueryableIndex groupIndex = index.getClusterGroupQueryableIndex(valueGroup); + if (groupIndex == null) { + throw DruidException.defensive( + "No cluster-group sub-index resolvable for clustering values " + + Arrays.toString(valueGroup.lookupClusteringValues()) + ); + } + + return new QueryableIndexCursorHolder( + groupIndex, + rebuildSpecForGroup(spec, plan, valueGroup), + QueryableIndexTimeBoundaryInspector.create(groupIndex) + ) + { + @Override + protected ColumnSelectorFactory makeColumnSelectorFactoryForOffset( + ColumnCache columnCache, + Offset baseOffset + ) + { + return new ClusteringColumnSelectorFactory( + super.makeColumnSelectorFactoryForOffset(columnCache, baseOffset), + valueGroup.getSummary().getClusteringColumns(), + valueGroup.lookupClusteringValues() + ); + } + + @Override + protected VectorColumnSelectorFactory makeVectorColumnSelectorFactoryForOffset( + ColumnCache columnCache, + VectorOffset baseOffset + ) + { + return new ClusteringVectorColumnSelectorFactory( + super.makeVectorColumnSelectorFactoryForOffset(columnCache, baseOffset), + valueGroup.getSummary().getClusteringColumns(), + valueGroup.lookupClusteringValues() + ); + } + }; + } + + /** + * Build a cursor holder that walks multiple matching cluster groups back-to-back via + * {@link ConcatenatingCursor}. Each per-group {@link CursorHolder} is built lazily inside the cursor's group + * transition, so a query that finishes early (e.g., LIMIT-bounded) doesn't open every group's offset. + */ + private CursorHolder makeMultiGroupClusteredCursorHolder( + CursorBuildSpec spec, + ClusterGroupQueryPlan plan + ) + { + final List<TableClusterGroupSpec> matching = plan.survivingGroups(); + // All matching specs share the same parent summary (they came out of one segment); grab a reference for + // getOrdering() and clusteringColumns below. + final ClusteredValueGroupsBaseTableSchema clusterSummary = matching.get(0).getSummary(); + final RowSignature clusteringColumns = clusterSummary.getClusteringColumns(); + final List<Object[]> clusteringValuesByGroup = new ArrayList<>(matching.size()); + final List<Supplier<CursorHolder>> holderSuppliers = new ArrayList<>(matching.size()); + // lifecycle management closer for per-group CursorHolders + final Closer closer = Closer.create(); + for (TableClusterGroupSpec valueGroup : matching) { + clusteringValuesByGroup.add(valueGroup.lookupClusteringValues()); + final QueryableIndex groupIndex = index.getClusterGroupQueryableIndex(valueGroup); + if (groupIndex == null) { + throw DruidException.defensive( + "No cluster-group sub-index resolvable for clustering values " + + Arrays.toString(valueGroup.lookupClusteringValues()) + ); + } + final CursorBuildSpec groupSpec = rebuildSpecForGroup(spec, plan, valueGroup); + holderSuppliers.add( + Suppliers.memoize( + () -> closer.register( + new QueryableIndexCursorHolder( + groupIndex, + groupSpec, + QueryableIndexTimeBoundaryInspector.create(groupIndex) + ) + ) + ) + ); + } + + // Initial wrapper state uses the first group's clustering values + a throwing placeholder delegate. The + // ConcatenatingCursor immediately calls setDelegate on init (before any selector is exposed). The vector + // wrapper carries the query-level max vector size from the build spec, the placeholder delegate can't be + // queried for sizing, and the value is constant across groups anyway. + final int vectorSize = spec.getQueryContext().getVectorSize(); + final ClusteringColumnSelectorFactory wrapperFactory = new ClusteringColumnSelectorFactory( + UNINITIALIZED_DELEGATE, + clusteringColumns, + clusteringValuesByGroup.get(0) + ); Review Comment: i find it confusing when the delegate is not initialized but clusteringValues are.... maybe we could set `ClusteringColumnSelectorFactory` as nullable in `ConcatenatingCursor` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
