cecemei commented on code in PR #19460:
URL: https://github.com/apache/druid/pull/19460#discussion_r3307052466


##########
processing/src/main/java/org/apache/druid/segment/projections/ClusteringColumnSelectorFactory.java:
##########
@@ -0,0 +1,566 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.projections;
+
+import org.apache.druid.error.DruidException;
+import org.apache.druid.math.expr.ExprEval;
+import org.apache.druid.math.expr.ExpressionType;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.filter.DruidPredicateFactory;
+import org.apache.druid.query.filter.ValueMatcher;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.ConstantExprEvalSelector;
+import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.segment.IdLookup;
+import org.apache.druid.segment.RowIdSupplier;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.data.IndexedInts;
+
+import javax.annotation.Nullable;
+import java.util.function.Supplier;
+
+/**
+ * {@link ColumnSelectorFactory} wrapper that intercepts requests for 
clustering columns and returns selectors
+ * carrying the group's constant value, while delegating all other column 
lookups to a wrapped factory. This is the
+ * mechanism by which a cluster group's clustering columns, which are NOT 
stored in the per-group column data since
+ * they're constant across the group, are made visible to query engines as if 
they were ordinary columns.
+ */
+public class ClusteringColumnSelectorFactory implements ColumnSelectorFactory
+{
+  private final RowSignature clusteringColumns;
+  private ColumnSelectorFactory delegate;
+  private Object[] clusteringValues;
+  // Bumped on every setDelegate(...) so per-call selector wrappers can detect 
group transitions and rebuild their
+  // cached inner state
+  private long generation;
+
+  public ClusteringColumnSelectorFactory(
+      ColumnSelectorFactory delegate,
+      RowSignature clusteringColumns,
+      Object[] clusteringValues
+  )
+  {
+    this.clusteringColumns = clusteringColumns;
+    setDelegate(delegate, clusteringValues);
+  }
+
+  /**
+   * Update the underlying factory and the constant values for the current 
cluster group. Called by a multi-group
+   * concatenating cursor on each group transition. Selectors previously 
returned by this factory will, on their next
+   * invocation, observe the updated state; see the per-call indirection in 
the inner selector classes.
+   */
+  public void setDelegate(ColumnSelectorFactory delegate, Object[] 
clusteringValues)

Review Comment:
   I find how `setDelegate` and `generation` usage hard to reason. It felt like 
we should have all `CursorHolder` in this class and let it manage the advance 
of cursor, generation, ColumnSelectorFactory, along with the ValueMatcher all 
together in this class. and `ClusteringColumnSelectorFactory` can implement 
`Iterable<Cursor>`, WDYT?  



##########
processing/src/main/java/org/apache/druid/segment/ConcatenatingCursor.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.base.Supplier;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.segment.projections.ClusteringColumnSelectorFactory;
+
+import javax.annotation.Nullable;
+import java.util.List;
+
+/**
+ * {@link Cursor} that walks a sequence of per-group cursors back-to-back, 
presenting them to the caller as a single
+ * cursor over a clustered base table. On group transitions the wrapper {@link 
ColumnSelectorFactory} updates its
+ * underlying delegate and clustering values so previously-acquired delegating 
selectors observe the new group's data
+ * on their next access.
+ * <p>
+ * Each entry in {@link #holderSuppliers} is a lazy producer of a {@link 
CursorHolder} for one cluster group. The
+ * outer {@link CursorHolder} owns the lifecycle of the per-group holders.
+ */
+public final class ConcatenatingCursor implements Cursor
+{
+  private final List<Supplier<CursorHolder>> holderSuppliers;
+  private final List<Object[]> clusteringValuesByGroup;
+  private final ClusteringColumnSelectorFactory wrapperFactory;
+
+  private int currentIdx;
+  @Nullable
+  private Cursor currentCursor;
+  private boolean initialized;
+
+  public ConcatenatingCursor(
+      List<Supplier<CursorHolder>> holderSuppliers,
+      List<Object[]> clusteringValuesByGroup,
+      ClusteringColumnSelectorFactory wrapperFactory
+  )
+  {
+    if (holderSuppliers.size() != clusteringValuesByGroup.size()) {
+      throw DruidException.defensive(
+          "holderSuppliers size [%s] must equal clusteringValuesByGroup size 
[%s]",
+          holderSuppliers.size(),
+          clusteringValuesByGroup.size()
+      );
+    }
+    if (holderSuppliers.isEmpty()) {
+      throw DruidException.defensive("ConcatenatingCursor requires at least 
one cluster group");
+    }
+    this.holderSuppliers = holderSuppliers;
+    this.clusteringValuesByGroup = clusteringValuesByGroup;
+    this.wrapperFactory = wrapperFactory;
+    this.currentIdx = -1;
+  }
+
+  private void initializeIfNeeded()
+  {
+    if (initialized) {
+      return;
+    }
+    initialized = true;
+    advanceToNextNonEmptyGroup();
+  }
+
+  /**
+   * Open the next group whose cursor has at least one row. Sets {@code 
currentCursor = null} when all groups are
+   * exhausted.
+   */
+  private void advanceToNextNonEmptyGroup()
+  {
+    while (++currentIdx < holderSuppliers.size()) {
+      final CursorHolder holder = holderSuppliers.get(currentIdx).get();
+      final Cursor cursor = holder.asCursor();
+      if (cursor != null && !cursor.isDone()) {
+        currentCursor = cursor;
+        wrapperFactory.setDelegate(cursor.getColumnSelectorFactory(), 
clusteringValuesByGroup.get(currentIdx));
+        return;
+      }
+      // Group has no rows after filter application; try the next.
+    }
+    currentCursor = null;
+  }
+
+  @Override
+  public ColumnSelectorFactory getColumnSelectorFactory()
+  {
+    initializeIfNeeded();
+    return wrapperFactory;
+  }
+
+  @Override
+  public void advance()
+  {
+    initializeIfNeeded();
+    if (currentCursor == null) {
+      return;
+    }

Review Comment:
   nit: can reuse `isDone()`



##########
processing/src/main/java/org/apache/druid/segment/projections/Projections.java:
##########
@@ -530,6 +547,243 @@ public static String 
getProjectionSegmentInternalFilePrefix(ProjectionSchema pro
     return projectionSchema.getName() + "/";
   }
 
+  /**
+   * Check whether {@code type} is an allowed cluster group clustering-column 
type. Clustering is restricted to the
+   * primitive scalar types: {@link ValueType#STRING}, {@link ValueType#LONG}, 
{@link ValueType#DOUBLE},
+   * {@link ValueType#FLOAT}. Complex and array types are rejected.
+   */
+  public static boolean isAllowedClusteringType(@Nullable ColumnType type)
+  {
+    return type != null && type.anyOf(ValueType.STRING, ValueType.LONG, 
ValueType.DOUBLE, ValueType.FLOAT);
+  }
+
+  /**
+   * Segment internal file prefix + column for a cluster group's per-group 
column data:
+   * {@code __base$<id0>_<id1>...<idK>/<column>}
+   */
+  public static String getClusterGroupSegmentInternalFileName(List<Integer> 
clusteringValueIds, String column)
+  {
+    return getClusterGroupSegmentInternalFilePrefix(clusteringValueIds) + 
column;
+  }
+
+  public static String getClusterGroupSegmentInternalFilePrefix(List<Integer> 
clusteringValueIds)
+  {
+    if (clusteringValueIds == null || clusteringValueIds.isEmpty()) {
+      throw DruidException.defensive("clusteringValueIds must not be null or 
empty");
+    }
+    final StringBuilder sb = new StringBuilder(CLUSTER_GROUP_PREFIX);
+    for (int i = 0; i < clusteringValueIds.size(); i++) {
+      if (i > 0) {
+        sb.append('_');
+      }
+      sb.append(clusteringValueIds.get(i));
+    }
+    sb.append('/');
+    return sb.toString();
+  }
+
+  /**
+   * Build the per-query {@link ClusterGroupQueryPlan} for {@code groups} 
against a {@link CursorBuildSpec}. Walks the
+   * filter tree once per group via {@link #walkClusterGroupFilter}, folding 
clustering-column leaves to
+   * {@link TrueFilter} / {@link FalseFilter} against each group's constant 
clustering tuple and propagating those
+   * constants through AND / OR / NOT. Non-clustering filters remain in place 
so the per-group cursor evaluates them
+   * as expected. Query-VC-equivalent-to-clustering-VC resolution happens 
per-leaf via {@link #resolveClusteringIndex}.
+   * <p/>
+   * Output shape per group encodes the truth value: top-level {@link 
FalseFilter} = provably FALSE (group is
+   * pruned from {@link ClusterGroupQueryPlan#survivingGroups()}), top-level 
{@link TrueFilter} = provably TRUE
+   * (no residual filter needed at the cursor), anything else = UNKNOWN 
(residual filter passed to the per-group
+   * cursor). The walker's result is stashed on the plan so {@link 
ClusterGroupQueryPlan#rewriteFor} hands it back
+   * directly without re-walking.
+   */
+  public static ClusterGroupQueryPlan planClusterGroupQuery(
+      List<TableClusterGroupSpec> groups,
+      CursorBuildSpec cursorBuildSpec
+  )
+  {
+    final Filter queryFilter = cursorBuildSpec.getFilter();
+    final VirtualColumns queryVcs = cursorBuildSpec.getVirtualColumns();
+    if (groups.isEmpty() || queryFilter == null) {
+      // No filter (or no groups): every group survives, per-group rewrite is 
a no-op (null filter).
+      return new ClusterGroupQueryPlan(groups, group -> null);
+    }
+
+    // Every spec in the list shares one summary by construction (set once in 
the schema constructor), so
+    // clusteringColumns + groupVcs are loop-invariant, only the per-group 
clustering tuple changes.
+    final ClusteredValueGroupsBaseTableSchema summary = 
groups.getFirst().getSummary();
+    final RowSignature clusteringColumns = summary.getClusteringColumns();
+    final VirtualColumns groupVcs = summary.getVirtualColumns();
+
+    // Single walk per group: produces the rewritten filter, and a top-level 
FalseFilter means the group prunes.
+    // Cache the rewrite for every group (including pruned ones, where it's 
FalseFilter) so rewriteFor doesn't
+    // re-walk for either the cursor factory or callers that want to inspect a 
pruned group's outcome directly.
+    final List<TableClusterGroupSpec> kept = new ArrayList<>(groups.size());
+    final IdentityHashMap<TableClusterGroupSpec, Filter> rewriteCache = new 
IdentityHashMap<>();

Review Comment:
   it felt like `ClusterGroupQueryPlan` doesnt have much more than this hash 
map, is there any other reason that we want to have a class wrap it? 



##########
processing/src/main/java/org/apache/druid/segment/ConcatenatingCursor.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.base.Supplier;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.segment.projections.ClusteringColumnSelectorFactory;
+
+import javax.annotation.Nullable;
+import java.util.List;
+
+/**
+ * {@link Cursor} that walks a sequence of per-group cursors back-to-back, 
presenting them to the caller as a single
+ * cursor over a clustered base table. On group transitions the wrapper {@link 
ColumnSelectorFactory} updates its
+ * underlying delegate and clustering values so previously-acquired delegating 
selectors observe the new group's data
+ * on their next access.
+ * <p>
+ * Each entry in {@link #holderSuppliers} is a lazy producer of a {@link 
CursorHolder} for one cluster group. The
+ * outer {@link CursorHolder} owns the lifecycle of the per-group holders.
+ */
+public final class ConcatenatingCursor implements Cursor
+{
+  private final List<Supplier<CursorHolder>> holderSuppliers;
+  private final List<Object[]> clusteringValuesByGroup;
+  private final ClusteringColumnSelectorFactory wrapperFactory;
+
+  private int currentIdx;
+  @Nullable
+  private Cursor currentCursor;
+  private boolean initialized;
+
+  public ConcatenatingCursor(
+      List<Supplier<CursorHolder>> holderSuppliers,
+      List<Object[]> clusteringValuesByGroup,
+      ClusteringColumnSelectorFactory wrapperFactory
+  )
+  {
+    if (holderSuppliers.size() != clusteringValuesByGroup.size()) {
+      throw DruidException.defensive(
+          "holderSuppliers size [%s] must equal clusteringValuesByGroup size 
[%s]",
+          holderSuppliers.size(),
+          clusteringValuesByGroup.size()
+      );
+    }
+    if (holderSuppliers.isEmpty()) {
+      throw DruidException.defensive("ConcatenatingCursor requires at least 
one cluster group");
+    }
+    this.holderSuppliers = holderSuppliers;
+    this.clusteringValuesByGroup = clusteringValuesByGroup;
+    this.wrapperFactory = wrapperFactory;
+    this.currentIdx = -1;
+  }
+
+  private void initializeIfNeeded()
+  {
+    if (initialized) {
+      return;
+    }
+    initialized = true;
+    advanceToNextNonEmptyGroup();
+  }
+
+  /**
+   * Open the next group whose cursor has at least one row. Sets {@code 
currentCursor = null} when all groups are
+   * exhausted.
+   */
+  private void advanceToNextNonEmptyGroup()
+  {
+    while (++currentIdx < holderSuppliers.size()) {
+      final CursorHolder holder = holderSuppliers.get(currentIdx).get();
+      final Cursor cursor = holder.asCursor();
+      if (cursor != null && !cursor.isDone()) {
+        currentCursor = cursor;
+        wrapperFactory.setDelegate(cursor.getColumnSelectorFactory(), 
clusteringValuesByGroup.get(currentIdx));
+        return;
+      }
+      // Group has no rows after filter application; try the next.
+    }
+    currentCursor = null;
+  }
+
+  @Override
+  public ColumnSelectorFactory getColumnSelectorFactory()
+  {
+    initializeIfNeeded();
+    return wrapperFactory;
+  }
+
+  @Override
+  public void advance()
+  {
+    initializeIfNeeded();
+    if (currentCursor == null) {
+      return;
+    }
+    currentCursor.advance();
+    if (currentCursor.isDone()) {
+      advanceToNextNonEmptyGroup();
+    }
+  }
+
+  @Override
+  public void advanceUninterruptibly()
+  {
+    initializeIfNeeded();
+    if (currentCursor == null) {
+      return;
+    }

Review Comment:
   same here, can reuse `isDone()`



##########
processing/src/main/java/org/apache/druid/segment/ConcatenatingCursor.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.base.Supplier;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.segment.projections.ClusteringColumnSelectorFactory;
+
+import javax.annotation.Nullable;
+import java.util.List;
+
+/**
+ * {@link Cursor} that walks a sequence of per-group cursors back-to-back, 
presenting them to the caller as a single
+ * cursor over a clustered base table. On group transitions the wrapper {@link 
ColumnSelectorFactory} updates its
+ * underlying delegate and clustering values so previously-acquired delegating 
selectors observe the new group's data
+ * on their next access.
+ * <p>
+ * Each entry in {@link #holderSuppliers} is a lazy producer of a {@link 
CursorHolder} for one cluster group. The
+ * outer {@link CursorHolder} owns the lifecycle of the per-group holders.
+ */
+public final class ConcatenatingCursor implements Cursor
+{
+  private final List<Supplier<CursorHolder>> holderSuppliers;
+  private final List<Object[]> clusteringValuesByGroup;
+  private final ClusteringColumnSelectorFactory wrapperFactory;
+
+  private int currentIdx;
+  @Nullable
+  private Cursor currentCursor;
+  private boolean initialized;
+
+  public ConcatenatingCursor(
+      List<Supplier<CursorHolder>> holderSuppliers,
+      List<Object[]> clusteringValuesByGroup,
+      ClusteringColumnSelectorFactory wrapperFactory
+  )
+  {
+    if (holderSuppliers.size() != clusteringValuesByGroup.size()) {
+      throw DruidException.defensive(
+          "holderSuppliers size [%s] must equal clusteringValuesByGroup size 
[%s]",
+          holderSuppliers.size(),
+          clusteringValuesByGroup.size()
+      );
+    }
+    if (holderSuppliers.isEmpty()) {
+      throw DruidException.defensive("ConcatenatingCursor requires at least 
one cluster group");
+    }
+    this.holderSuppliers = holderSuppliers;
+    this.clusteringValuesByGroup = clusteringValuesByGroup;
+    this.wrapperFactory = wrapperFactory;
+    this.currentIdx = -1;
+  }
+
+  private void initializeIfNeeded()
+  {
+    if (initialized) {
+      return;
+    }
+    initialized = true;
+    advanceToNextNonEmptyGroup();
+  }
+
+  /**
+   * Open the next group whose cursor has at least one row. Sets {@code 
currentCursor = null} when all groups are

Review Comment:
   nit: advance to the next group sounds slightly better and more consistent 
than open the next group.
   should this reset delegate as well if all groups are exhausted?



##########
processing/src/main/java/org/apache/druid/segment/IndexIO.java:
##########
@@ -983,31 +1009,51 @@ public QueryableIndex load(File inDir, ObjectMapper 
mapper, boolean lazy, Segmen
         );
 
         projectionsColumns.put(projectionSpec.getSchema().getName(), 
projectionColumns);
-        if (projectionSpec.getSchema() instanceof AggregateProjectionSchema) {
-          aggProjections.add(
-              new AggregateProjectionMetadata(
-                  (AggregateProjectionSchema) projectionSpec.getSchema(),
-                  projectionSpec.getNumRows()
-              )
-          );
-        } else {
-          throw DruidException.defensive(
-              "Unexpected projection[%s] with type[%s]",
-              projectionSpec.getSchema().getName(),
-              projectionSpec.getSchema().getClass()
-          );
+        aggProjections.add(
+            new AggregateProjectionMetadata(
+                (AggregateProjectionSchema) projectionSpec.getSchema(),
+                projectionSpec.getNumRows()
+            )
+        );
+      }
+
+      final List<Map<String, Supplier<BaseColumnHolder>>> 
clusterGroupColumnsList;
+      if (isClusteredSummary) {

Review Comment:
   nit: maybe we could just check null for clusteredBaseSummary, and it would 
not require this additional boolean variable



##########
processing/src/main/java/org/apache/druid/segment/projections/ClusteredValueGroupsBaseTableSchema.java:
##########
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.projections;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Lists;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.query.OrderBy;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.AggregateProjectionMetadata;
+import org.apache.druid.segment.Metadata;
+import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.utils.CollectionUtils;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Top-level summary for a clustered base table whose groups are identified by 
discrete clustering-value tuples. Each
+ * tuple group is internally stored as a separate table without storing the 
cluster columns, which are pulled into this
+ * metadata. This is optimizing for use cases which typically only need to 
read from a single group via filters present
+ * on a query. Cluster groups nest inside as {@link #getClusterGroups()}; 
their column data live in the V10 segment
+ * file under dictionary-id-tuple prefixes ({@code 
__base$<id0>_<id1>...<idK>/<col>}), where the ids index into
+ * {@link #getClusteringDictionaries()}.
+ */
+public class ClusteredValueGroupsBaseTableSchema implements 
BaseTableProjectionSchema
+{
+  public static final String TYPE_NAME = "clustered-value-groups-base-table";
+
+  private final VirtualColumns virtualColumns;
+  private final List<String> columnNames;
+  private final AggregatorFactory[] aggregators;
+  private final List<OrderBy> ordering;
+  private final RowSignature clusteringColumns;
+  private final List<String> sharedColumns;
+  private final ClusteringDictionaries clusteringDictionaries;
+  private final List<TableClusterGroupSpec> clusterGroups;
+
+  // computed
+  private final int timeColumnPosition;
+  private final Granularity effectiveGranularity;
+
+  @JsonCreator
+  public ClusteredValueGroupsBaseTableSchema(

Review Comment:
   while reading this file, i find myself mumbling words like clustering/ group 
too much, some personal preference for naming just for inspiration:
   
   clusteringColumns -> groupKeySchema
   clusteringDictionaries → groupKeyDictionaries
   clusterGroups -> groupSpecs
   getGroupColumnNames -> getGroupInternalColumnNames
   getGroupDimensionNames -> getGroupInternalDimensionNames
   getGroupOrdering -> getGroupInternalOrdering
   



##########
processing/src/main/java/org/apache/druid/segment/projections/ClusteredValueGroupsBaseTableSchema.java:
##########
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.projections;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Lists;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.query.OrderBy;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.AggregateProjectionMetadata;
+import org.apache.druid.segment.Metadata;
+import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.utils.CollectionUtils;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Top-level summary for a clustered base table whose groups are identified by 
discrete clustering-value tuples. Each
+ * tuple group is internally stored as a separate table without storing the 
cluster columns, which are pulled into this
+ * metadata. This is optimizing for use cases which typically only need to 
read from a single group via filters present
+ * on a query. Cluster groups nest inside as {@link #getClusterGroups()}; 
their column data live in the V10 segment
+ * file under dictionary-id-tuple prefixes ({@code 
__base$<id0>_<id1>...<idK>/<col>}), where the ids index into
+ * {@link #getClusteringDictionaries()}.
+ */
+public class ClusteredValueGroupsBaseTableSchema implements 
BaseTableProjectionSchema
+{
+  public static final String TYPE_NAME = "clustered-value-groups-base-table";
+
+  private final VirtualColumns virtualColumns;
+  private final List<String> columnNames;
+  private final AggregatorFactory[] aggregators;
+  private final List<OrderBy> ordering;
+  private final RowSignature clusteringColumns;
+  private final List<String> sharedColumns;
+  private final ClusteringDictionaries clusteringDictionaries;
+  private final List<TableClusterGroupSpec> clusterGroups;
+
+  // computed
+  private final int timeColumnPosition;
+  private final Granularity effectiveGranularity;
+
+  @JsonCreator
+  public ClusteredValueGroupsBaseTableSchema(
+      @JsonProperty("virtualColumns") VirtualColumns virtualColumns,
+      @JsonProperty("columns") List<String> columns,
+      @JsonProperty("aggregators") @Nullable AggregatorFactory[] aggregators,
+      @JsonProperty("ordering") List<OrderBy> ordering,
+      @JsonProperty("clusteringColumns") RowSignature clusteringColumns,
+      @JsonProperty("sharedColumns") @Nullable List<String> sharedColumns,
+      @JsonProperty("clusteringDictionaries") @Nullable ClusteringDictionaries 
clusteringDictionaries,
+      @JsonProperty("clusterGroups") @Nullable List<TableClusterGroupSpec> 
clusterGroups
+  )
+  {
+    if (CollectionUtils.isNullOrEmpty(columns)) {
+      throw DruidException.defensive("clustered base table schema columns must 
not be null or empty");
+    }
+    if (ordering == null) {
+      throw DruidException.defensive("clustered base table schema ordering 
must not be null");
+    }
+    if (clusteringColumns == null || clusteringColumns.size() == 0) {
+      throw DruidException.defensive(
+          "clustered base table schema clusteringColumns must not be null or 
empty"
+      );
+    }
+    if (ordering.size() < clusteringColumns.size()) {
+      throw DruidException.defensive(
+          "ordering size [%s] must be at least clusteringColumns size [%s] 
(clustering columns must form a prefix"
+          + " of the segment ordering)",
+          ordering.size(),
+          clusteringColumns.size()
+      );
+    }
+    for (int i = 0; i < clusteringColumns.size(); i++) {
+      final String clusteringColumn = clusteringColumns.getColumnName(i);
+      if (!columns.contains(clusteringColumn)) {
+        throw DruidException.defensive(
+            "clusteringColumn [%s] must appear in columns of the clustered 
base table summary",
+            clusteringColumn
+        );
+      }
+      final ColumnType type = clusteringColumns.getColumnType(i).orElse(null);
+      if (!Projections.isAllowedClusteringType(type)) {
+        throw DruidException.defensive(
+            "clustering column [%s] has unsupported type [%s]; allowed types 
are STRING, LONG, DOUBLE, FLOAT",
+            clusteringColumn,
+            type
+        );
+      }
+      // Per-group ordering is derived by dropping this prefix; pruning + 
cursor concatenation rely on it.
+      final String orderingColumn = ordering.get(i).getColumnName();
+      if (!clusteringColumn.equals(orderingColumn)) {
+        throw DruidException.defensive(
+            "clustering column at position [%s] is [%s] but the segment 
ordering at the same position is [%s];"
+            + " clustering columns must form a prefix of the segment ordering",
+            i,
+            clusteringColumn,
+            orderingColumn
+        );
+      }
+    }
+    final List<String> resolvedSharedColumns = sharedColumns == null ? 
List.of() : sharedColumns;
+    for (String shared : resolvedSharedColumns) {
+      if (!columns.contains(shared)) {
+        throw DruidException.defensive(
+            "sharedColumn [%s] must appear in columns of the clustered base 
table summary",
+            shared
+        );
+      }
+    }
+    this.virtualColumns = virtualColumns == null ? VirtualColumns.EMPTY : 
virtualColumns;
+    this.columnNames = columns;
+    this.aggregators = aggregators == null ? new AggregatorFactory[0] : 
aggregators;
+    this.ordering = ordering;
+    this.clusteringColumns = clusteringColumns;
+    this.sharedColumns = resolvedSharedColumns;
+    this.clusterGroups = clusterGroups == null ? List.of() : 
List.copyOf(clusterGroups);
+    this.clusteringDictionaries = clusteringDictionaries == null
+                                  ? ClusteringDictionaries.EMPTY
+                                  : clusteringDictionaries;
+
+    int foundTimePosition = -1;
+    Granularity granularity = null;
+    for (int i = 0; i < ordering.size(); i++) {
+      OrderBy orderBy = ordering.get(i);
+      if (orderBy.getColumnName().equals(ColumnHolder.TIME_COLUMN_NAME)) {
+        foundTimePosition = i;
+        final VirtualColumn vc = 
this.virtualColumns.getVirtualColumn(Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME);
+        if (vc != null) {
+          granularity = Granularities.fromVirtualColumn(vc);
+        } else {
+          granularity = Granularities.NONE;
+        }
+      }
+    }
+    if (granularity == null) {
+      throw DruidException.defensive(
+          "clustered base table doesn't have a [%s] column?",
+          ColumnHolder.TIME_COLUMN_NAME
+      );
+    }
+    this.timeColumnPosition = foundTimePosition;
+    this.effectiveGranularity = granularity;
+
+    // Specs always start unwired: there's a chicken-and-egg between the 
summary and its specs, resolved by
+    // deferring all summary-dependent state on the spec to setSummary, which 
we invoke here once the summary's
+    // own state is populated.
+    for (TableClusterGroupSpec spec : this.clusterGroups) {
+      spec.setSummary(this);
+    }
+  }
+
+  @JsonIgnore
+  @Override
+  public List<String> getColumnNames()

Review Comment:
   the `Override` doesn't seem necessary? also this function name is very 
similar to getColumns() except this one has the metrics, maybe we should make 
them a bit more distinguishable? 



##########
processing/src/main/java/org/apache/druid/segment/IndexIO.java:
##########
@@ -952,16 +954,33 @@ public QueryableIndex load(File inDir, ObjectMapper 
mapper, boolean lazy, Segmen
       // projections can omit a __time column, but one still has to exist, so 
we use the interval start to make a
       // constant for this case
       final long intervalStartMillis = 
Intervals.of(metadata.getInterval()).getStartMillis();
-      // read base table projection columns, which are shared with other 
projections
-      final Map<String, Supplier<BaseColumnHolder>> baseColumns = 
readProjectionColumns(
-          metadata,
-          baseProjection,
-          fileMapper,
-          Map.of(),
-          intervalStartMillis,
-          lazy,
-          loadFailed
-      );
+
+      // For clustered base tables the columns are always accessed through 
cluster groups, skip reading any base
+      // columns so we don't try to map files that don't exist
+      final boolean isClusteredSummary = baseSchema instanceof 
ClusteredValueGroupsBaseTableSchema;
+      final ClusteredValueGroupsBaseTableSchema clusteredBaseSummary;
+      final Map<String, Supplier<BaseColumnHolder>> baseColumns;
+      if (isClusteredSummary) {
+        clusteredBaseSummary = (ClusteredValueGroupsBaseTableSchema) 
baseSchema;
+        if (clusteredBaseSummary.getSharedColumns().isEmpty()) {

Review Comment:
   i dont see how shared columns could be used in this PR, maybe we dont 
include it?



##########
processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorFactory.java:
##########
@@ -110,9 +145,354 @@ public List<AggregatorFactory> 
getAggregatorsForPreAggregated()
     };
   }
 
+  private CursorHolder makeClusteredCursorHolder(CursorBuildSpec spec, 
ClusteredValueGroupsBaseTableSchema clusterSummary)
+  {
+    final ClusterGroupQueryPlan plan = Projections.planClusterGroupQuery(
+        new ArrayList<>(index.getClusterGroupSchemas()),
+        spec
+    );
+
+    if (plan.survivingGroups().isEmpty()) {
+      return EmptyClusteredCursorHolder.INSTANCE;
+    }
+
+    if (plan.survivingGroups().size() == 1) {
+      return makeSingleGroupClusteredCursorHolder(spec, plan, 
plan.survivingGroups().get(0));
+    }
+    return makeMultiGroupClusteredCursorHolder(spec, plan);
+  }
+
+  /**
+   * Rebuild {@code spec} for the per-group cursor holder of {@code 
valueGroup}, swapping in the plan's per-group
+   * filter rewrite: clustering-column leaves become {@link 
org.apache.druid.segment.filter.TrueFilter} /
+   * {@link org.apache.druid.segment.filter.FalseFilter} per the group's 
constant clustering tuple and fold through
+   * AND / OR / NOT, so the per-group {@link QueryableIndex}'s filter 
machinery never tries to look up indexes for
+   * clustering columns it doesn't physically carry. Selector-side access to 
clustering columns (SELECT / GROUP BY)
+   * is still served by {@link ClusteringColumnSelectorFactory} below.
+   */
+  private static CursorBuildSpec rebuildSpecForGroup(
+      CursorBuildSpec spec,
+      ClusterGroupQueryPlan plan,
+      TableClusterGroupSpec valueGroup
+  )
+  {
+    if (spec.getFilter() == null) {
+      return spec;
+    }
+    final Filter rewritten = plan.rewriteFor(valueGroup);
+    if (rewritten == spec.getFilter()) {
+      return spec;
+    }
+    return CursorBuildSpec.builder(spec).setFilter(rewritten).build();
+  }
+
+  private CursorHolder makeSingleGroupClusteredCursorHolder(
+      CursorBuildSpec spec,
+      ClusterGroupQueryPlan plan,
+      TableClusterGroupSpec valueGroup
+  )
+  {
+    final QueryableIndex groupIndex = 
index.getClusterGroupQueryableIndex(valueGroup);
+    if (groupIndex == null) {
+      throw DruidException.defensive(
+          "No cluster-group sub-index resolvable for clustering values "
+          + Arrays.toString(valueGroup.lookupClusteringValues())
+      );
+    }
+
+    return new QueryableIndexCursorHolder(
+        groupIndex,
+        rebuildSpecForGroup(spec, plan, valueGroup),
+        QueryableIndexTimeBoundaryInspector.create(groupIndex)
+    )
+    {
+      @Override
+      protected ColumnSelectorFactory makeColumnSelectorFactoryForOffset(
+          ColumnCache columnCache,
+          Offset baseOffset
+      )
+      {
+        return new ClusteringColumnSelectorFactory(
+            super.makeColumnSelectorFactoryForOffset(columnCache, baseOffset),
+            valueGroup.getSummary().getClusteringColumns(),
+            valueGroup.lookupClusteringValues()
+        );
+      }
+
+      @Override
+      protected VectorColumnSelectorFactory 
makeVectorColumnSelectorFactoryForOffset(
+          ColumnCache columnCache,
+          VectorOffset baseOffset
+      )
+      {
+        return new ClusteringVectorColumnSelectorFactory(
+            super.makeVectorColumnSelectorFactoryForOffset(columnCache, 
baseOffset),
+            valueGroup.getSummary().getClusteringColumns(),
+            valueGroup.lookupClusteringValues()
+        );
+      }
+    };
+  }
+
+  /**
+   * Build a cursor holder that walks multiple matching cluster groups 
back-to-back via
+   * {@link ConcatenatingCursor}. Each per-group {@link CursorHolder} is built 
lazily inside the cursor's group
+   * transition, so a query that finishes early (e.g., LIMIT-bounded) doesn't 
open every group's offset.
+   */
+  private CursorHolder makeMultiGroupClusteredCursorHolder(
+      CursorBuildSpec spec,
+      ClusterGroupQueryPlan plan
+  )
+  {
+    final List<TableClusterGroupSpec> matching = plan.survivingGroups();
+    // All matching specs share the same parent summary (they came out of one 
segment); grab a reference for
+    // getOrdering() and clusteringColumns below.
+    final ClusteredValueGroupsBaseTableSchema clusterSummary = 
matching.get(0).getSummary();
+    final RowSignature clusteringColumns = 
clusterSummary.getClusteringColumns();
+    final List<Object[]> clusteringValuesByGroup = new 
ArrayList<>(matching.size());
+    final List<Supplier<CursorHolder>> holderSuppliers = new 
ArrayList<>(matching.size());
+    // lifecycle management closer for per-group CursorHolders
+    final Closer closer = Closer.create();
+    for (TableClusterGroupSpec valueGroup : matching) {
+      clusteringValuesByGroup.add(valueGroup.lookupClusteringValues());
+      final QueryableIndex groupIndex = 
index.getClusterGroupQueryableIndex(valueGroup);
+      if (groupIndex == null) {
+        throw DruidException.defensive(
+            "No cluster-group sub-index resolvable for clustering values "
+            + Arrays.toString(valueGroup.lookupClusteringValues())
+        );
+      }
+      final CursorBuildSpec groupSpec = rebuildSpecForGroup(spec, plan, 
valueGroup);
+      holderSuppliers.add(
+          Suppliers.memoize(
+              () -> closer.register(
+                  new QueryableIndexCursorHolder(
+                      groupIndex,
+                      groupSpec,
+                      QueryableIndexTimeBoundaryInspector.create(groupIndex)
+                  )
+              )
+          )
+      );
+    }
+
+    // Initial wrapper state uses the first group's clustering values + a 
throwing placeholder delegate. The
+    // ConcatenatingCursor immediately calls setDelegate on init (before any 
selector is exposed). The vector
+    // wrapper carries the query-level max vector size from the build spec, 
the placeholder delegate can't be
+    // queried for sizing, and the value is constant across groups anyway.
+    final int vectorSize = spec.getQueryContext().getVectorSize();
+    final ClusteringColumnSelectorFactory wrapperFactory = new 
ClusteringColumnSelectorFactory(
+        UNINITIALIZED_DELEGATE,
+        clusteringColumns,
+        clusteringValuesByGroup.get(0)
+    );

Review Comment:
   i find it confusing when the delegate is not initialized but 
clusteringValues are.... maybe we could set `ClusteringColumnSelectorFactory` 
as nullable in `ConcatenatingCursor`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to