FrankChen021 commented on code in PR #19460:
URL: https://github.com/apache/druid/pull/19460#discussion_r3281516304
##########
processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorFactory.java:
##########
@@ -110,9 +145,354 @@ public List<AggregatorFactory>
getAggregatorsForPreAggregated()
};
}
+ private CursorHolder makeClusteredCursorHolder(CursorBuildSpec spec,
ClusteredValueGroupsBaseTableSchema clusterSummary)
+ {
+ final ClusterGroupQueryPlan plan = Projections.planClusterGroupQuery(
+ new ArrayList<>(index.getClusterGroupSchemas()),
+ spec
+ );
+
+ if (plan.survivingGroups().isEmpty()) {
+ return EmptyClusteredCursorHolder.INSTANCE;
+ }
+
+ if (plan.survivingGroups().size() == 1) {
+ return makeSingleGroupClusteredCursorHolder(spec, plan,
plan.survivingGroups().get(0));
+ }
+ return makeMultiGroupClusteredCursorHolder(spec, plan);
+ }
+
+ /**
+ * Rebuild {@code spec} for the per-group cursor holder of {@code
valueGroup}, swapping in the plan's per-group
+ * filter rewrite: clustering-column leaves become {@link
org.apache.druid.segment.filter.TrueFilter} /
+ * {@link org.apache.druid.segment.filter.FalseFilter} per the group's
constant clustering tuple and fold through
+ * AND / OR / NOT, so the per-group {@link QueryableIndex}'s filter
machinery never tries to look up indexes for
+ * clustering columns it doesn't physically carry. Selector-side access to
clustering columns (SELECT / GROUP BY)
+ * is still served by {@link ClusteringColumnSelectorFactory} below.
+ */
+ private static CursorBuildSpec rebuildSpecForGroup(
+ CursorBuildSpec spec,
+ ClusterGroupQueryPlan plan,
+ TableClusterGroupSpec valueGroup
+ )
+ {
+ if (spec.getFilter() == null) {
+ return spec;
+ }
+ final Filter rewritten = plan.rewriteFor(valueGroup);
+ if (rewritten == spec.getFilter()) {
+ return spec;
+ }
+ return CursorBuildSpec.builder(spec).setFilter(rewritten).build();
+ }
+
+ private CursorHolder makeSingleGroupClusteredCursorHolder(
+ CursorBuildSpec spec,
+ ClusterGroupQueryPlan plan,
+ TableClusterGroupSpec valueGroup
+ )
+ {
+ final QueryableIndex groupIndex =
index.getClusterGroupQueryableIndex(valueGroup);
+ if (groupIndex == null) {
+ throw DruidException.defensive(
+ "No cluster-group sub-index resolvable for clustering values "
+ + Arrays.toString(valueGroup.lookupClusteringValues())
+ );
+ }
+
+ return new QueryableIndexCursorHolder(
+ groupIndex,
+ rebuildSpecForGroup(spec, plan, valueGroup),
+ QueryableIndexTimeBoundaryInspector.create(groupIndex)
+ )
+ {
+ @Override
+ protected ColumnSelectorFactory makeColumnSelectorFactoryForOffset(
+ ColumnCache columnCache,
+ Offset baseOffset
+ )
+ {
+ return new ClusteringColumnSelectorFactory(
+ super.makeColumnSelectorFactoryForOffset(columnCache, baseOffset),
+ valueGroup.getSummary().getClusteringColumns(),
+ valueGroup.lookupClusteringValues()
+ );
+ }
+
+ @Override
+ protected VectorColumnSelectorFactory
makeVectorColumnSelectorFactoryForOffset(
+ ColumnCache columnCache,
+ VectorOffset baseOffset
+ )
+ {
+ return new ClusteringVectorColumnSelectorFactory(
+ super.makeVectorColumnSelectorFactoryForOffset(columnCache,
baseOffset),
+ valueGroup.getSummary().getClusteringColumns(),
+ valueGroup.lookupClusteringValues()
+ );
+ }
+ };
+ }
+
+ /**
+ * Build a cursor holder that walks multiple matching cluster groups
back-to-back via
+ * {@link ConcatenatingCursor}. Each per-group {@link CursorHolder} is built
lazily inside the cursor's group
+ * transition, so a query that finishes early (e.g., LIMIT-bounded) doesn't
open every group's offset.
+ */
+ private CursorHolder makeMultiGroupClusteredCursorHolder(
+ CursorBuildSpec spec,
+ ClusterGroupQueryPlan plan
+ )
+ {
+ final List<TableClusterGroupSpec> matching = plan.survivingGroups();
+ // All matching specs share the same parent summary (they came out of one
segment); grab a reference for
+ // getOrdering() and clusteringColumns below.
+ final ClusteredValueGroupsBaseTableSchema clusterSummary =
matching.get(0).getSummary();
+ final RowSignature clusteringColumns =
clusterSummary.getClusteringColumns();
+ final List<Object[]> clusteringValuesByGroup = new
ArrayList<>(matching.size());
+ final List<Supplier<CursorHolder>> holderSuppliers = new
ArrayList<>(matching.size());
+ // lifecycle management closer for per-group CursorHolders
+ final Closer closer = Closer.create();
+ for (TableClusterGroupSpec valueGroup : matching) {
+ clusteringValuesByGroup.add(valueGroup.lookupClusteringValues());
+ final QueryableIndex groupIndex =
index.getClusterGroupQueryableIndex(valueGroup);
+ if (groupIndex == null) {
+ throw DruidException.defensive(
+ "No cluster-group sub-index resolvable for clustering values "
+ + Arrays.toString(valueGroup.lookupClusteringValues())
+ );
+ }
+ final CursorBuildSpec groupSpec = rebuildSpecForGroup(spec, plan,
valueGroup);
+ holderSuppliers.add(
+ Suppliers.memoize(
+ () -> closer.register(
+ new QueryableIndexCursorHolder(
+ groupIndex,
+ groupSpec,
+ QueryableIndexTimeBoundaryInspector.create(groupIndex)
+ )
+ )
+ )
+ );
+ }
+
+ // Initial wrapper state uses the first group's clustering values + a
throwing placeholder delegate. The
+ // ConcatenatingCursor immediately calls setDelegate on init (before any
selector is exposed). The vector
+ // wrapper carries the query-level max vector size from the build spec,
the placeholder delegate can't be
+ // queried for sizing, and the value is constant across groups anyway.
+ final int vectorSize = spec.getQueryContext().getVectorSize();
+ final ClusteringColumnSelectorFactory wrapperFactory = new
ClusteringColumnSelectorFactory(
+ UNINITIALIZED_DELEGATE,
+ clusteringColumns,
+ clusteringValuesByGroup.get(0)
+ );
+ final ClusteringVectorColumnSelectorFactory vectorWrapperFactory = new
ClusteringVectorColumnSelectorFactory(
+ UNINITIALIZED_VECTOR_DELEGATE,
+ clusteringColumns,
+ clusteringValuesByGroup.get(0),
+ vectorSize
+ );
+
+ final ConcatenatingCursor cursor = new ConcatenatingCursor(
+ holderSuppliers,
+ clusteringValuesByGroup,
+ wrapperFactory
+ );
+ final ConcatenatingVectorCursor vectorCursor = new
ConcatenatingVectorCursor(
+ holderSuppliers,
+ clusteringValuesByGroup,
+ vectorWrapperFactory
+ );
+
+ // canVectorize() is determined by the per-group holders. Probe the first
one (lazily, `Suppliers.memoize`
+ // means this opens it once and is reused by ConcatenatingVectorCursor).
+ final boolean canVectorize = holderSuppliers.get(0).get().canVectorize();
+
+ return new CursorHolder()
+ {
+ @Override
+ public Cursor asCursor()
+ {
+ return cursor;
+ }
+
+ @Override
+ public VectorCursor asVectorCursor()
+ {
+ return vectorCursor;
+ }
+
+ @Override
+ public boolean canVectorize()
+ {
+ return canVectorize;
+ }
+
+ @Override
+ public List<OrderBy> getOrdering()
+ {
+ // Cluster groups are written in clustering-value order
(writer-enforced; see ClusteredValueGroupsBaseTableSchema),
+ // and within each group rows are sorted by the segment ordering's
tail (clustering prefix dropped). So
+ // back-to-back walking yields rows in the full segment ordering; the
writer-side contract makes the
+ // concatenation order-preserving without any merge work at read time.
+ return clusterSummary.getOrdering();
+ }
+
+ @Override
+ public void close()
+ {
+ try {
+ closer.close();
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ /**
+ * Placeholder delegate for the {@link ClusteringColumnSelectorFactory}
constructed by
+ * {@link #makeMultiGroupClusteredCursorHolder}. Throws on any access;
replaced by the concatenating cursor's
+ * lazy init before the wrapper is exposed to the caller.
+ */
+ private static final ColumnSelectorFactory UNINITIALIZED_DELEGATE = new
ColumnSelectorFactory()
+ {
+ @Override
+ public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec)
+ {
+ throw DruidException.defensive("ConcatenatingCursor delegate accessed
before initialization");
+ }
+
+ @Override
+ public ColumnValueSelector makeColumnValueSelector(String columnName)
+ {
+ throw DruidException.defensive("ConcatenatingCursor delegate accessed
before initialization");
+ }
+
+ @Nullable
+ @Override
+ public ColumnCapabilities getColumnCapabilities(String column)
+ {
+ return null;
+ }
+ };
+
+ /**
+ * Vector counterpart of {@link #UNINITIALIZED_DELEGATE}. Replaced by
+ * {@link ConcatenatingVectorCursor}'s lazy init before the wrapper is
exposed.
+ */
+ private static final VectorColumnSelectorFactory
UNINITIALIZED_VECTOR_DELEGATE = new VectorColumnSelectorFactory()
+ {
+ @Override
+ public ReadableVectorInspector getReadableVectorInspector()
+ {
+ throw DruidException.defensive("ConcatenatingVectorCursor delegate
accessed before initialization");
+ }
+
+ @Override
+ public SingleValueDimensionVectorSelector
makeSingleValueDimensionSelector(DimensionSpec dimensionSpec)
+ {
+ throw DruidException.defensive("ConcatenatingVectorCursor delegate
accessed before initialization");
+ }
+
+ @Override
+ public MultiValueDimensionVectorSelector
makeMultiValueDimensionSelector(DimensionSpec dimensionSpec)
+ {
+ throw DruidException.defensive("ConcatenatingVectorCursor delegate
accessed before initialization");
+ }
+
+ @Override
+ public VectorValueSelector makeValueSelector(String column)
+ {
+ throw DruidException.defensive("ConcatenatingVectorCursor delegate
accessed before initialization");
+ }
+
+ @Override
+ public VectorObjectSelector makeObjectSelector(String column)
+ {
+ throw DruidException.defensive("ConcatenatingVectorCursor delegate
accessed before initialization");
+ }
+
+ @Nullable
+ @Override
+ public ColumnCapabilities getColumnCapabilities(String column)
+ {
+ return null;
+ }
+ };
+
+ /**
+ * CursorHolder that yields no rows. Used when {@link
Projections#planClusterGroupQuery} excludes every cluster
+ * group, so the filter is provably unsatisfiable on this segment.
+ */
+ private static final class EmptyClusteredCursorHolder implements CursorHolder
+ {
+ static final EmptyClusteredCursorHolder INSTANCE = new
EmptyClusteredCursorHolder();
+
+ @Override
+ public Cursor asCursor()
+ {
+ return new Cursor()
+ {
+ @Override
+ public ColumnSelectorFactory getColumnSelectorFactory()
Review Comment:
[P1] Empty pruned cursor must expose selectors
When all cluster groups are pruned, this holder returns a done cursor whose
`getColumnSelectorFactory()` always throws. Query engines commonly obtain
selector factories immediately after creating a non-null cursor, even when the
cursor is already done, so filters such as `tenant = 'missing'` can fail the
query instead of returning an empty result. The empty cursor should either
return null from `asCursor()` or provide a harmless selector factory.
##########
processing/src/main/java/org/apache/druid/segment/IndexIO.java:
##########
@@ -1080,6 +1126,63 @@ private Map<String, Supplier<BaseColumnHolder>>
readProjectionColumns(
return projectionColumns;
}
+ /**
+ * Read the per-column data for cluster group {@code groupIndex}. Mirrors
{@link #readProjectionColumns} but
+ * with the dictionary-id-tuple smoosh prefix {@code
__base$<id0>_<id1>...<idK>/<col>}; the column set
+ * excludes clustering columns (constants, injected at query time).
+ */
+ private Map<String, Supplier<BaseColumnHolder>> readClusterGroupColumns(
+ SegmentFileMetadata metadata,
+ ClusteredValueGroupsBaseTableSchema summary,
+ int groupIndex,
+ SegmentFileMapper segmentFileMapper,
+ long intervalStartMillis,
+ boolean lazy,
+ SegmentLazyLoadFailCallback loadFailed
+ ) throws IOException
+ {
+ final TableClusterGroupSpec spec =
summary.getClusterGroups().get(groupIndex);
+ final List<Integer> clusteringValueIds = spec.getClusteringValueIds();
+ final String timeColumnName = summary.getTimeColumnName();
+ final boolean renameTime =
!ColumnHolder.TIME_COLUMN_NAME.equals(timeColumnName);
+ final Map<String, Supplier<BaseColumnHolder>> groupColumns = new
LinkedHashMap<>();
+
+ for (String column : summary.getGroupColumnNames()) {
+ final String smooshName =
Projections.getClusterGroupSegmentInternalFileName(clusteringValueIds, column);
+ final ByteBuffer colBuffer = segmentFileMapper.mapFile(smooshName);
Review Comment:
[P1] Check descriptor before mapping sparse group column
The new clustered-group reader maps the smoosh file before checking whether
the column descriptor exists. For sparse clustered groups, a logical column can
be present in the summary but absent from an individual group, and the existing
null-descriptor check suggests that case is expected. As written,
`mapFile(smooshName)` fails before the code can skip the missing column, so
loading such clustered segments throws instead of exposing the column as
absent/null for that group.
##########
processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorFactory.java:
##########
@@ -110,9 +145,354 @@ public List<AggregatorFactory>
getAggregatorsForPreAggregated()
};
}
+ private CursorHolder makeClusteredCursorHolder(CursorBuildSpec spec,
ClusteredValueGroupsBaseTableSchema clusterSummary)
+ {
+ final ClusterGroupQueryPlan plan = Projections.planClusterGroupQuery(
+ new ArrayList<>(index.getClusterGroupSchemas()),
+ spec
+ );
+
+ if (plan.survivingGroups().isEmpty()) {
+ return EmptyClusteredCursorHolder.INSTANCE;
+ }
+
+ if (plan.survivingGroups().size() == 1) {
+ return makeSingleGroupClusteredCursorHolder(spec, plan,
plan.survivingGroups().get(0));
+ }
+ return makeMultiGroupClusteredCursorHolder(spec, plan);
+ }
+
+ /**
+ * Rebuild {@code spec} for the per-group cursor holder of {@code
valueGroup}, swapping in the plan's per-group
+ * filter rewrite: clustering-column leaves become {@link
org.apache.druid.segment.filter.TrueFilter} /
+ * {@link org.apache.druid.segment.filter.FalseFilter} per the group's
constant clustering tuple and fold through
+ * AND / OR / NOT, so the per-group {@link QueryableIndex}'s filter
machinery never tries to look up indexes for
+ * clustering columns it doesn't physically carry. Selector-side access to
clustering columns (SELECT / GROUP BY)
+ * is still served by {@link ClusteringColumnSelectorFactory} below.
+ */
+ private static CursorBuildSpec rebuildSpecForGroup(
+ CursorBuildSpec spec,
+ ClusterGroupQueryPlan plan,
+ TableClusterGroupSpec valueGroup
+ )
+ {
+ if (spec.getFilter() == null) {
+ return spec;
+ }
+ final Filter rewritten = plan.rewriteFor(valueGroup);
+ if (rewritten == spec.getFilter()) {
+ return spec;
+ }
+ return CursorBuildSpec.builder(spec).setFilter(rewritten).build();
+ }
+
+ private CursorHolder makeSingleGroupClusteredCursorHolder(
+ CursorBuildSpec spec,
+ ClusterGroupQueryPlan plan,
+ TableClusterGroupSpec valueGroup
+ )
+ {
+ final QueryableIndex groupIndex =
index.getClusterGroupQueryableIndex(valueGroup);
+ if (groupIndex == null) {
+ throw DruidException.defensive(
+ "No cluster-group sub-index resolvable for clustering values "
+ + Arrays.toString(valueGroup.lookupClusteringValues())
+ );
+ }
+
+ return new QueryableIndexCursorHolder(
+ groupIndex,
+ rebuildSpecForGroup(spec, plan, valueGroup),
+ QueryableIndexTimeBoundaryInspector.create(groupIndex)
+ )
+ {
+ @Override
+ protected ColumnSelectorFactory makeColumnSelectorFactoryForOffset(
+ ColumnCache columnCache,
+ Offset baseOffset
+ )
+ {
+ return new ClusteringColumnSelectorFactory(
+ super.makeColumnSelectorFactoryForOffset(columnCache, baseOffset),
+ valueGroup.getSummary().getClusteringColumns(),
+ valueGroup.lookupClusteringValues()
+ );
+ }
+
+ @Override
+ protected VectorColumnSelectorFactory
makeVectorColumnSelectorFactoryForOffset(
+ ColumnCache columnCache,
+ VectorOffset baseOffset
+ )
+ {
+ return new ClusteringVectorColumnSelectorFactory(
+ super.makeVectorColumnSelectorFactoryForOffset(columnCache,
baseOffset),
+ valueGroup.getSummary().getClusteringColumns(),
+ valueGroup.lookupClusteringValues()
+ );
+ }
+ };
+ }
+
+ /**
+ * Build a cursor holder that walks multiple matching cluster groups
back-to-back via
+ * {@link ConcatenatingCursor}. Each per-group {@link CursorHolder} is built
lazily inside the cursor's group
+ * transition, so a query that finishes early (e.g., LIMIT-bounded) doesn't
open every group's offset.
+ */
+ private CursorHolder makeMultiGroupClusteredCursorHolder(
+ CursorBuildSpec spec,
+ ClusterGroupQueryPlan plan
+ )
+ {
+ final List<TableClusterGroupSpec> matching = plan.survivingGroups();
+ // All matching specs share the same parent summary (they came out of one
segment); grab a reference for
+ // getOrdering() and clusteringColumns below.
+ final ClusteredValueGroupsBaseTableSchema clusterSummary =
matching.get(0).getSummary();
+ final RowSignature clusteringColumns =
clusterSummary.getClusteringColumns();
+ final List<Object[]> clusteringValuesByGroup = new
ArrayList<>(matching.size());
+ final List<Supplier<CursorHolder>> holderSuppliers = new
ArrayList<>(matching.size());
+ // lifecycle management closer for per-group CursorHolders
+ final Closer closer = Closer.create();
+ for (TableClusterGroupSpec valueGroup : matching) {
+ clusteringValuesByGroup.add(valueGroup.lookupClusteringValues());
+ final QueryableIndex groupIndex =
index.getClusterGroupQueryableIndex(valueGroup);
+ if (groupIndex == null) {
+ throw DruidException.defensive(
+ "No cluster-group sub-index resolvable for clustering values "
+ + Arrays.toString(valueGroup.lookupClusteringValues())
+ );
+ }
+ final CursorBuildSpec groupSpec = rebuildSpecForGroup(spec, plan,
valueGroup);
+ holderSuppliers.add(
+ Suppliers.memoize(
+ () -> closer.register(
+ new QueryableIndexCursorHolder(
+ groupIndex,
+ groupSpec,
+ QueryableIndexTimeBoundaryInspector.create(groupIndex)
+ )
+ )
+ )
+ );
+ }
+
+ // Initial wrapper state uses the first group's clustering values + a
throwing placeholder delegate. The
+ // ConcatenatingCursor immediately calls setDelegate on init (before any
selector is exposed). The vector
+ // wrapper carries the query-level max vector size from the build spec,
the placeholder delegate can't be
+ // queried for sizing, and the value is constant across groups anyway.
+ final int vectorSize = spec.getQueryContext().getVectorSize();
+ final ClusteringColumnSelectorFactory wrapperFactory = new
ClusteringColumnSelectorFactory(
+ UNINITIALIZED_DELEGATE,
+ clusteringColumns,
+ clusteringValuesByGroup.get(0)
+ );
+ final ClusteringVectorColumnSelectorFactory vectorWrapperFactory = new
ClusteringVectorColumnSelectorFactory(
+ UNINITIALIZED_VECTOR_DELEGATE,
+ clusteringColumns,
+ clusteringValuesByGroup.get(0),
+ vectorSize
+ );
+
+ final ConcatenatingCursor cursor = new ConcatenatingCursor(
+ holderSuppliers,
+ clusteringValuesByGroup,
+ wrapperFactory
+ );
+ final ConcatenatingVectorCursor vectorCursor = new
ConcatenatingVectorCursor(
+ holderSuppliers,
+ clusteringValuesByGroup,
+ vectorWrapperFactory
+ );
+
+ // canVectorize() is determined by the per-group holders. Probe the first
one (lazily, `Suppliers.memoize`
+ // means this opens it once and is reused by ConcatenatingVectorCursor).
+ final boolean canVectorize = holderSuppliers.get(0).get().canVectorize();
Review Comment:
[P2] Vectorization decision only checks first group
The multi-group clustered holder reports `canVectorize()` based only on the
first per-group holder, but each group gets a different rewritten filter. A
first group whose filter folds to `TrueFilter` can vectorize while a later
group's residual filter cannot; the engine will choose the vector path and
`ConcatenatingVectorCursor` will later call `asVectorCursor()` on the
non-vectorizable holder, which throws. This should require all surviving group
holders to vectorize, or otherwise disable vectorization for the combined
holder.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]