FrankChen021 commented on code in PR #19460:
URL: https://github.com/apache/druid/pull/19460#discussion_r3281516304


##########
processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorFactory.java:
##########
@@ -110,9 +145,354 @@ public List<AggregatorFactory> 
getAggregatorsForPreAggregated()
     };
   }
 
+  private CursorHolder makeClusteredCursorHolder(CursorBuildSpec spec, 
ClusteredValueGroupsBaseTableSchema clusterSummary)
+  {
+    final ClusterGroupQueryPlan plan = Projections.planClusterGroupQuery(
+        new ArrayList<>(index.getClusterGroupSchemas()),
+        spec
+    );
+
+    if (plan.survivingGroups().isEmpty()) {
+      return EmptyClusteredCursorHolder.INSTANCE;
+    }
+
+    if (plan.survivingGroups().size() == 1) {
+      return makeSingleGroupClusteredCursorHolder(spec, plan, 
plan.survivingGroups().get(0));
+    }
+    return makeMultiGroupClusteredCursorHolder(spec, plan);
+  }
+
+  /**
+   * Rebuild {@code spec} for the per-group cursor holder of {@code 
valueGroup}, swapping in the plan's per-group
+   * filter rewrite: clustering-column leaves become {@link 
org.apache.druid.segment.filter.TrueFilter} /
+   * {@link org.apache.druid.segment.filter.FalseFilter} per the group's 
constant clustering tuple and fold through
+   * AND / OR / NOT, so the per-group {@link QueryableIndex}'s filter 
machinery never tries to look up indexes for
+   * clustering columns it doesn't physically carry. Selector-side access to 
clustering columns (SELECT / GROUP BY)
+   * is still served by {@link ClusteringColumnSelectorFactory} below.
+   */
+  private static CursorBuildSpec rebuildSpecForGroup(
+      CursorBuildSpec spec,
+      ClusterGroupQueryPlan plan,
+      TableClusterGroupSpec valueGroup
+  )
+  {
+    if (spec.getFilter() == null) {
+      return spec;
+    }
+    final Filter rewritten = plan.rewriteFor(valueGroup);
+    if (rewritten == spec.getFilter()) {
+      return spec;
+    }
+    return CursorBuildSpec.builder(spec).setFilter(rewritten).build();
+  }
+
+  private CursorHolder makeSingleGroupClusteredCursorHolder(
+      CursorBuildSpec spec,
+      ClusterGroupQueryPlan plan,
+      TableClusterGroupSpec valueGroup
+  )
+  {
+    final QueryableIndex groupIndex = 
index.getClusterGroupQueryableIndex(valueGroup);
+    if (groupIndex == null) {
+      throw DruidException.defensive(
+          "No cluster-group sub-index resolvable for clustering values "
+          + Arrays.toString(valueGroup.lookupClusteringValues())
+      );
+    }
+
+    return new QueryableIndexCursorHolder(
+        groupIndex,
+        rebuildSpecForGroup(spec, plan, valueGroup),
+        QueryableIndexTimeBoundaryInspector.create(groupIndex)
+    )
+    {
+      @Override
+      protected ColumnSelectorFactory makeColumnSelectorFactoryForOffset(
+          ColumnCache columnCache,
+          Offset baseOffset
+      )
+      {
+        return new ClusteringColumnSelectorFactory(
+            super.makeColumnSelectorFactoryForOffset(columnCache, baseOffset),
+            valueGroup.getSummary().getClusteringColumns(),
+            valueGroup.lookupClusteringValues()
+        );
+      }
+
+      @Override
+      protected VectorColumnSelectorFactory 
makeVectorColumnSelectorFactoryForOffset(
+          ColumnCache columnCache,
+          VectorOffset baseOffset
+      )
+      {
+        return new ClusteringVectorColumnSelectorFactory(
+            super.makeVectorColumnSelectorFactoryForOffset(columnCache, 
baseOffset),
+            valueGroup.getSummary().getClusteringColumns(),
+            valueGroup.lookupClusteringValues()
+        );
+      }
+    };
+  }
+
+  /**
+   * Build a cursor holder that walks multiple matching cluster groups 
back-to-back via
+   * {@link ConcatenatingCursor}. Each per-group {@link CursorHolder} is built 
lazily inside the cursor's group
+   * transition, so a query that finishes early (e.g., LIMIT-bounded) doesn't 
open every group's offset.
+   */
+  private CursorHolder makeMultiGroupClusteredCursorHolder(
+      CursorBuildSpec spec,
+      ClusterGroupQueryPlan plan
+  )
+  {
+    final List<TableClusterGroupSpec> matching = plan.survivingGroups();
+    // All matching specs share the same parent summary (they came out of one 
segment); grab a reference for
+    // getOrdering() and clusteringColumns below.
+    final ClusteredValueGroupsBaseTableSchema clusterSummary = 
matching.get(0).getSummary();
+    final RowSignature clusteringColumns = 
clusterSummary.getClusteringColumns();
+    final List<Object[]> clusteringValuesByGroup = new 
ArrayList<>(matching.size());
+    final List<Supplier<CursorHolder>> holderSuppliers = new 
ArrayList<>(matching.size());
+    // lifecycle management closer for per-group CursorHolders
+    final Closer closer = Closer.create();
+    for (TableClusterGroupSpec valueGroup : matching) {
+      clusteringValuesByGroup.add(valueGroup.lookupClusteringValues());
+      final QueryableIndex groupIndex = 
index.getClusterGroupQueryableIndex(valueGroup);
+      if (groupIndex == null) {
+        throw DruidException.defensive(
+            "No cluster-group sub-index resolvable for clustering values "
+            + Arrays.toString(valueGroup.lookupClusteringValues())
+        );
+      }
+      final CursorBuildSpec groupSpec = rebuildSpecForGroup(spec, plan, 
valueGroup);
+      holderSuppliers.add(
+          Suppliers.memoize(
+              () -> closer.register(
+                  new QueryableIndexCursorHolder(
+                      groupIndex,
+                      groupSpec,
+                      QueryableIndexTimeBoundaryInspector.create(groupIndex)
+                  )
+              )
+          )
+      );
+    }
+
+    // Initial wrapper state uses the first group's clustering values + a 
throwing placeholder delegate. The
+    // ConcatenatingCursor immediately calls setDelegate on init (before any 
selector is exposed). The vector
+    // wrapper carries the query-level max vector size from the build spec, 
the placeholder delegate can't be
+    // queried for sizing, and the value is constant across groups anyway.
+    final int vectorSize = spec.getQueryContext().getVectorSize();
+    final ClusteringColumnSelectorFactory wrapperFactory = new 
ClusteringColumnSelectorFactory(
+        UNINITIALIZED_DELEGATE,
+        clusteringColumns,
+        clusteringValuesByGroup.get(0)
+    );
+    final ClusteringVectorColumnSelectorFactory vectorWrapperFactory = new 
ClusteringVectorColumnSelectorFactory(
+        UNINITIALIZED_VECTOR_DELEGATE,
+        clusteringColumns,
+        clusteringValuesByGroup.get(0),
+        vectorSize
+    );
+
+    final ConcatenatingCursor cursor = new ConcatenatingCursor(
+        holderSuppliers,
+        clusteringValuesByGroup,
+        wrapperFactory
+    );
+    final ConcatenatingVectorCursor vectorCursor = new 
ConcatenatingVectorCursor(
+        holderSuppliers,
+        clusteringValuesByGroup,
+        vectorWrapperFactory
+    );
+
+    // canVectorize() is determined by the per-group holders. Probe the first 
one (lazily, `Suppliers.memoize`
+    // means this opens it once and is reused by ConcatenatingVectorCursor).
+    final boolean canVectorize = holderSuppliers.get(0).get().canVectorize();
+
+    return new CursorHolder()
+    {
+      @Override
+      public Cursor asCursor()
+      {
+        return cursor;
+      }
+
+      @Override
+      public VectorCursor asVectorCursor()
+      {
+        return vectorCursor;
+      }
+
+      @Override
+      public boolean canVectorize()
+      {
+        return canVectorize;
+      }
+
+      @Override
+      public List<OrderBy> getOrdering()
+      {
+        // Cluster groups are written in clustering-value order 
(writer-enforced; see ClusteredValueGroupsBaseTableSchema),
+        // and within each group rows are sorted by the segment ordering's 
tail (clustering prefix dropped). So
+        // back-to-back walking yields rows in the full segment ordering; the 
writer-side contract makes the
+        // concatenation order-preserving without any merge work at read time.
+        return clusterSummary.getOrdering();
+      }
+
+      @Override
+      public void close()
+      {
+        try {
+          closer.close();
+        }
+        catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
+
+  /**
+   * Placeholder delegate for the {@link ClusteringColumnSelectorFactory} 
constructed by
+   * {@link #makeMultiGroupClusteredCursorHolder}. Throws on any access; 
replaced by the concatenating cursor's
+   * lazy init before the wrapper is exposed to the caller.
+   */
+  private static final ColumnSelectorFactory UNINITIALIZED_DELEGATE = new 
ColumnSelectorFactory()
+  {
+    @Override
+    public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec)
+    {
+      throw DruidException.defensive("ConcatenatingCursor delegate accessed 
before initialization");
+    }
+
+    @Override
+    public ColumnValueSelector makeColumnValueSelector(String columnName)
+    {
+      throw DruidException.defensive("ConcatenatingCursor delegate accessed 
before initialization");
+    }
+
+    @Nullable
+    @Override
+    public ColumnCapabilities getColumnCapabilities(String column)
+    {
+      return null;
+    }
+  };
+
+  /**
+   * Vector counterpart of {@link #UNINITIALIZED_DELEGATE}. Replaced by
+   * {@link ConcatenatingVectorCursor}'s lazy init before the wrapper is 
exposed.
+   */
+  private static final VectorColumnSelectorFactory 
UNINITIALIZED_VECTOR_DELEGATE = new VectorColumnSelectorFactory()
+  {
+    @Override
+    public ReadableVectorInspector getReadableVectorInspector()
+    {
+      throw DruidException.defensive("ConcatenatingVectorCursor delegate 
accessed before initialization");
+    }
+
+    @Override
+    public SingleValueDimensionVectorSelector 
makeSingleValueDimensionSelector(DimensionSpec dimensionSpec)
+    {
+      throw DruidException.defensive("ConcatenatingVectorCursor delegate 
accessed before initialization");
+    }
+
+    @Override
+    public MultiValueDimensionVectorSelector 
makeMultiValueDimensionSelector(DimensionSpec dimensionSpec)
+    {
+      throw DruidException.defensive("ConcatenatingVectorCursor delegate 
accessed before initialization");
+    }
+
+    @Override
+    public VectorValueSelector makeValueSelector(String column)
+    {
+      throw DruidException.defensive("ConcatenatingVectorCursor delegate 
accessed before initialization");
+    }
+
+    @Override
+    public VectorObjectSelector makeObjectSelector(String column)
+    {
+      throw DruidException.defensive("ConcatenatingVectorCursor delegate 
accessed before initialization");
+    }
+
+    @Nullable
+    @Override
+    public ColumnCapabilities getColumnCapabilities(String column)
+    {
+      return null;
+    }
+  };
+
+  /**
+   * CursorHolder that yields no rows. Used when {@link 
Projections#planClusterGroupQuery} excludes every cluster
+   * group, so the filter is provably unsatisfiable on this segment.
+   */
+  private static final class EmptyClusteredCursorHolder implements CursorHolder
+  {
+    static final EmptyClusteredCursorHolder INSTANCE = new 
EmptyClusteredCursorHolder();
+
+    @Override
+    public Cursor asCursor()
+    {
+      return new Cursor()
+      {
+        @Override
+        public ColumnSelectorFactory getColumnSelectorFactory()

Review Comment:
   [P1] Empty pruned cursor must expose selectors
   
   When all cluster groups are pruned, this holder returns a done cursor whose 
`getColumnSelectorFactory()` always throws. Query engines commonly obtain 
selector factories immediately after creating a non-null cursor, even when the 
cursor is already done, so filters such as `tenant = 'missing'` can fail the 
query instead of returning an empty result. The empty cursor should either 
return null from `asCursor()` or provide a harmless selector factory.



##########
processing/src/main/java/org/apache/druid/segment/IndexIO.java:
##########
@@ -1080,6 +1126,63 @@ private Map<String, Supplier<BaseColumnHolder>> 
readProjectionColumns(
       return projectionColumns;
     }
 
+    /**
+     * Read the per-column data for cluster group {@code groupIndex}. Mirrors 
{@link #readProjectionColumns} but
+     * with the dictionary-id-tuple smoosh prefix {@code 
__base$<id0>_<id1>...<idK>/<col>}; the column set
+     * excludes clustering columns (constants, injected at query time).
+     */
+    private Map<String, Supplier<BaseColumnHolder>> readClusterGroupColumns(
+        SegmentFileMetadata metadata,
+        ClusteredValueGroupsBaseTableSchema summary,
+        int groupIndex,
+        SegmentFileMapper segmentFileMapper,
+        long intervalStartMillis,
+        boolean lazy,
+        SegmentLazyLoadFailCallback loadFailed
+    ) throws IOException
+    {
+      final TableClusterGroupSpec spec = 
summary.getClusterGroups().get(groupIndex);
+      final List<Integer> clusteringValueIds = spec.getClusteringValueIds();
+      final String timeColumnName = summary.getTimeColumnName();
+      final boolean renameTime = 
!ColumnHolder.TIME_COLUMN_NAME.equals(timeColumnName);
+      final Map<String, Supplier<BaseColumnHolder>> groupColumns = new 
LinkedHashMap<>();
+
+      for (String column : summary.getGroupColumnNames()) {
+        final String smooshName = 
Projections.getClusterGroupSegmentInternalFileName(clusteringValueIds, column);
+        final ByteBuffer colBuffer = segmentFileMapper.mapFile(smooshName);

Review Comment:
   [P1] Check descriptor before mapping sparse group column
   
   The new clustered-group reader maps the smoosh file before checking whether 
the column descriptor exists. For sparse clustered groups, a logical column can 
be present in the summary but absent from an individual group, and the existing 
null-descriptor check suggests that case is expected. As written, 
`mapFile(smooshName)` fails before the code can skip the missing column, so 
loading such clustered segments throws instead of exposing the column as 
absent/null for that group.



##########
processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorFactory.java:
##########
@@ -110,9 +145,354 @@ public List<AggregatorFactory> 
getAggregatorsForPreAggregated()
     };
   }
 
+  private CursorHolder makeClusteredCursorHolder(CursorBuildSpec spec, 
ClusteredValueGroupsBaseTableSchema clusterSummary)
+  {
+    final ClusterGroupQueryPlan plan = Projections.planClusterGroupQuery(
+        new ArrayList<>(index.getClusterGroupSchemas()),
+        spec
+    );
+
+    if (plan.survivingGroups().isEmpty()) {
+      return EmptyClusteredCursorHolder.INSTANCE;
+    }
+
+    if (plan.survivingGroups().size() == 1) {
+      return makeSingleGroupClusteredCursorHolder(spec, plan, 
plan.survivingGroups().get(0));
+    }
+    return makeMultiGroupClusteredCursorHolder(spec, plan);
+  }
+
+  /**
+   * Rebuild {@code spec} for the per-group cursor holder of {@code 
valueGroup}, swapping in the plan's per-group
+   * filter rewrite: clustering-column leaves become {@link 
org.apache.druid.segment.filter.TrueFilter} /
+   * {@link org.apache.druid.segment.filter.FalseFilter} per the group's 
constant clustering tuple and fold through
+   * AND / OR / NOT, so the per-group {@link QueryableIndex}'s filter 
machinery never tries to look up indexes for
+   * clustering columns it doesn't physically carry. Selector-side access to 
clustering columns (SELECT / GROUP BY)
+   * is still served by {@link ClusteringColumnSelectorFactory} below.
+   */
+  private static CursorBuildSpec rebuildSpecForGroup(
+      CursorBuildSpec spec,
+      ClusterGroupQueryPlan plan,
+      TableClusterGroupSpec valueGroup
+  )
+  {
+    if (spec.getFilter() == null) {
+      return spec;
+    }
+    final Filter rewritten = plan.rewriteFor(valueGroup);
+    if (rewritten == spec.getFilter()) {
+      return spec;
+    }
+    return CursorBuildSpec.builder(spec).setFilter(rewritten).build();
+  }
+
+  private CursorHolder makeSingleGroupClusteredCursorHolder(
+      CursorBuildSpec spec,
+      ClusterGroupQueryPlan plan,
+      TableClusterGroupSpec valueGroup
+  )
+  {
+    final QueryableIndex groupIndex = 
index.getClusterGroupQueryableIndex(valueGroup);
+    if (groupIndex == null) {
+      throw DruidException.defensive(
+          "No cluster-group sub-index resolvable for clustering values "
+          + Arrays.toString(valueGroup.lookupClusteringValues())
+      );
+    }
+
+    return new QueryableIndexCursorHolder(
+        groupIndex,
+        rebuildSpecForGroup(spec, plan, valueGroup),
+        QueryableIndexTimeBoundaryInspector.create(groupIndex)
+    )
+    {
+      @Override
+      protected ColumnSelectorFactory makeColumnSelectorFactoryForOffset(
+          ColumnCache columnCache,
+          Offset baseOffset
+      )
+      {
+        return new ClusteringColumnSelectorFactory(
+            super.makeColumnSelectorFactoryForOffset(columnCache, baseOffset),
+            valueGroup.getSummary().getClusteringColumns(),
+            valueGroup.lookupClusteringValues()
+        );
+      }
+
+      @Override
+      protected VectorColumnSelectorFactory 
makeVectorColumnSelectorFactoryForOffset(
+          ColumnCache columnCache,
+          VectorOffset baseOffset
+      )
+      {
+        return new ClusteringVectorColumnSelectorFactory(
+            super.makeVectorColumnSelectorFactoryForOffset(columnCache, 
baseOffset),
+            valueGroup.getSummary().getClusteringColumns(),
+            valueGroup.lookupClusteringValues()
+        );
+      }
+    };
+  }
+
+  /**
+   * Build a cursor holder that walks multiple matching cluster groups 
back-to-back via
+   * {@link ConcatenatingCursor}. Each per-group {@link CursorHolder} is built 
lazily inside the cursor's group
+   * transition, so a query that finishes early (e.g., LIMIT-bounded) doesn't 
open every group's offset.
+   */
+  private CursorHolder makeMultiGroupClusteredCursorHolder(
+      CursorBuildSpec spec,
+      ClusterGroupQueryPlan plan
+  )
+  {
+    final List<TableClusterGroupSpec> matching = plan.survivingGroups();
+    // All matching specs share the same parent summary (they came out of one 
segment); grab a reference for
+    // getOrdering() and clusteringColumns below.
+    final ClusteredValueGroupsBaseTableSchema clusterSummary = 
matching.get(0).getSummary();
+    final RowSignature clusteringColumns = 
clusterSummary.getClusteringColumns();
+    final List<Object[]> clusteringValuesByGroup = new 
ArrayList<>(matching.size());
+    final List<Supplier<CursorHolder>> holderSuppliers = new 
ArrayList<>(matching.size());
+    // lifecycle management closer for per-group CursorHolders
+    final Closer closer = Closer.create();
+    for (TableClusterGroupSpec valueGroup : matching) {
+      clusteringValuesByGroup.add(valueGroup.lookupClusteringValues());
+      final QueryableIndex groupIndex = 
index.getClusterGroupQueryableIndex(valueGroup);
+      if (groupIndex == null) {
+        throw DruidException.defensive(
+            "No cluster-group sub-index resolvable for clustering values "
+            + Arrays.toString(valueGroup.lookupClusteringValues())
+        );
+      }
+      final CursorBuildSpec groupSpec = rebuildSpecForGroup(spec, plan, 
valueGroup);
+      holderSuppliers.add(
+          Suppliers.memoize(
+              () -> closer.register(
+                  new QueryableIndexCursorHolder(
+                      groupIndex,
+                      groupSpec,
+                      QueryableIndexTimeBoundaryInspector.create(groupIndex)
+                  )
+              )
+          )
+      );
+    }
+
+    // Initial wrapper state uses the first group's clustering values + a 
throwing placeholder delegate. The
+    // ConcatenatingCursor immediately calls setDelegate on init (before any 
selector is exposed). The vector
+    // wrapper carries the query-level max vector size from the build spec, 
the placeholder delegate can't be
+    // queried for sizing, and the value is constant across groups anyway.
+    final int vectorSize = spec.getQueryContext().getVectorSize();
+    final ClusteringColumnSelectorFactory wrapperFactory = new 
ClusteringColumnSelectorFactory(
+        UNINITIALIZED_DELEGATE,
+        clusteringColumns,
+        clusteringValuesByGroup.get(0)
+    );
+    final ClusteringVectorColumnSelectorFactory vectorWrapperFactory = new 
ClusteringVectorColumnSelectorFactory(
+        UNINITIALIZED_VECTOR_DELEGATE,
+        clusteringColumns,
+        clusteringValuesByGroup.get(0),
+        vectorSize
+    );
+
+    final ConcatenatingCursor cursor = new ConcatenatingCursor(
+        holderSuppliers,
+        clusteringValuesByGroup,
+        wrapperFactory
+    );
+    final ConcatenatingVectorCursor vectorCursor = new 
ConcatenatingVectorCursor(
+        holderSuppliers,
+        clusteringValuesByGroup,
+        vectorWrapperFactory
+    );
+
+    // canVectorize() is determined by the per-group holders. Probe the first 
one (lazily, `Suppliers.memoize`
+    // means this opens it once and is reused by ConcatenatingVectorCursor).
+    final boolean canVectorize = holderSuppliers.get(0).get().canVectorize();

Review Comment:
   [P2] Vectorization decision only checks first group
   
   The multi-group clustered holder reports `canVectorize()` based only on the 
first per-group holder, but each group gets a different rewritten filter. A 
first group whose filter folds to `TrueFilter` can vectorize while a later 
group's residual filter cannot; the engine will choose the vector path and 
`ConcatenatingVectorCursor` will later call `asVectorCursor()` on the 
non-vectorizable holder, which throws. This should require all surviving group 
holders to vectorize, or otherwise disable vectorization for the combined 
holder.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to