This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 4b082a53db0 MSQ: Get row count for counters before mapping. (#19186)
4b082a53db0 is described below
commit 4b082a53db0db5edd0d916a598a110de8b05d3ab
Author: Gian Merlino <[email protected]>
AuthorDate: Fri Mar 20 08:32:03 2026 -0700
MSQ: Get row count for counters before mapping. (#19186)
Mapped segments generally do not have a PhysicalSegmentInspector, so
getting the segment row count after mapping often leads to zero.
---
.../druid/msq/querykit/BaseLeafFrameProcessor.java | 38 ++++++++++++++++++----
.../groupby/GroupByPreShuffleFrameProcessor.java | 20 ++----------
.../msq/querykit/scan/ScanQueryFrameProcessor.java | 19 ++---------
3 files changed, 35 insertions(+), 42 deletions(-)
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java
index fdde51a59c2..788da66072d 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java
@@ -30,7 +30,9 @@ import org.apache.druid.frame.processor.ReturnOrAwait;
import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.frame.write.FrameWriterFactory;
import org.apache.druid.java.util.common.Unit;
+import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.msq.counters.ChannelCounters;
import org.apache.druid.msq.exec.DataServerQueryHandler;
import org.apache.druid.msq.input.table.SegmentsInputSlice;
import org.apache.druid.segment.PhysicalSegmentInspector;
@@ -38,8 +40,8 @@ import
org.apache.druid.segment.ReferenceCountedSegmentProvider;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentMapFunction;
import org.apache.druid.segment.SegmentReference;
+import org.apache.druid.utils.CloseableUtils;
-import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
@@ -139,16 +141,38 @@ public abstract class BaseLeafFrameProcessor implements
FrameProcessor<Object>
/**
* Helper intended to be used by subclasses. Applies {@link #segmentMapFn},
which applies broadcast joins
- * if applicable to this query.
+ * and other mappings applicable to this query. Registers the {@link
SegmentReference} with the provided
+ * {@link Closer}.
*/
- @Nullable
- protected SegmentReference mapSegment(@Nullable final SegmentReference
segmentReference)
+ protected Segment mapSegment(
+ final SegmentReferenceHolder segmentHolder,
+ final Closer closer
+ )
{
+ final SegmentReference segmentReference =
segmentHolder.getSegmentReferenceOnce();
if (segmentReference == null) {
- return null;
+ throw DruidException.defensive("Missing segmentReference[%s]",
segmentHolder.getDescriptor());
+ }
+
+ try {
+ final ChannelCounters counters = segmentHolder.getInputCounters();
+ if (counters != null) {
+ // Attach a counters.addFile call to the closer, to ensure input
metrics are updated.
+ // Get row count prior to mapping, because mapped segments often do
not provide PhysicalSegmentInspector.
+ final int rowCount = getSegmentRowCount(segmentReference);
+ closer.register(() -> counters.addFile(rowCount, 0));
+ }
+ }
+ catch (Throwable e) {
+ throw CloseableUtils.closeAndWrapInCatch(e, segmentReference);
+ }
+
+ final Segment segment =
closer.register(segmentReference.map(segmentMapFn)).getSegmentReference().orElse(null);
+ if (segment == null) {
+ throw DruidException.defensive("Missing segment[%s]",
segmentHolder.getDescriptor());
}
- return segmentReference.map(segmentMapFn);
+ return segment;
}
/**
@@ -168,7 +192,7 @@ public abstract class BaseLeafFrameProcessor implements
FrameProcessor<Object>
* Helper to get the number of rows for a segment, using a {@link
PhysicalSegmentInspector}. Returns 0 when the
* number is unknown.
*/
- protected int getSegmentRowCount(final SegmentReference segmentReference)
+ private int getSegmentRowCount(final SegmentReference segmentReference)
{
return segmentReference
.getSegmentReference()
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java
index 5e8defe68aa..4435ae6f8e6 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java
@@ -24,7 +24,6 @@ import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.collections.NonBlockingPool;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.common.guava.FutureUtils;
-import org.apache.druid.error.DruidException;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.channel.ReadableFrameChannel;
import org.apache.druid.frame.channel.WritableFrameChannel;
@@ -59,7 +58,6 @@ import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.CursorFactory;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentMapFunction;
-import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.TimeBoundaryInspector;
import org.apache.druid.segment.column.RowSignature;
@@ -169,24 +167,10 @@ public class GroupByPreShuffleFrameProcessor extends
BaseLeafFrameProcessor
protected ReturnOrAwait<Unit> runWithSegment(final SegmentReferenceHolder
segmentHolder) throws IOException
{
if (resultYielder == null) {
- final SegmentReference segmentReference =
closer.register(mapSegment(segmentHolder.getSegmentReferenceOnce()));
- if (segmentReference == null) {
- throw DruidException.defensive("Missing segmentReference for[%s]",
segmentHolder.getDescriptor());
- }
-
- final Segment segment =
segmentReference.getSegmentReference().orElse(null);
- if (segment == null) {
- throw DruidException.defensive("Missing segment for[%s]",
segmentHolder.getDescriptor());
- }
-
- if (segmentHolder.getInputCounters() != null) {
- final int rowCount = getSegmentRowCount(segmentReference);
- closer.register(() ->
segmentHolder.getInputCounters().addFile(rowCount, 0));
- }
-
+ final Segment segment = mapSegment(segmentHolder, closer);
final TimeBoundaryInspector tbi =
segment.as(TimeBoundaryInspector.class);
-
final Sequence<ResultRow> rowSequence;
+
if (GroupByTimeBoundaryUtils.canUseTimeBoundaryInspector(query, tbi,
segmentHolder.getDescriptor())) {
// Resolve this query using the TimeBoundaryInspector, no need for a
cursor.
rowSequence =
Sequences.simple(List.of(GroupByTimeBoundaryUtils.computeTimeBoundaryResult(query,
tbi)));
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
index ec24a9fc12c..22fe6d2dfcd 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
@@ -73,7 +73,6 @@ import org.apache.druid.segment.CursorFactory;
import org.apache.druid.segment.CursorHolder;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentMapFunction;
-import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.SimpleAscendingOffset;
import org.apache.druid.segment.SimpleSettableOffset;
import org.apache.druid.segment.VirtualColumn;
@@ -292,28 +291,14 @@ public class ScanQueryFrameProcessor extends
BaseLeafFrameProcessor
protected ReturnOrAwait<Unit> runWithSegment(final SegmentReferenceHolder
segmentHolder) throws IOException
{
if (cursor == null) {
- final SegmentReference segmentReference =
closer.register(mapSegment(segmentHolder.getSegmentReferenceOnce()));
- if (segmentReference == null) {
- throw DruidException.defensive("Missing segmentReference for[%s]",
segmentHolder.getDescriptor());
- }
-
- final Segment segment =
segmentReference.getSegmentReference().orElse(null);
- if (segment == null) {
- throw DruidException.defensive("Missing segment for[%s]",
segmentHolder.getDescriptor());
- }
-
+ final Segment segment = mapSegment(segmentHolder, closer);
final CursorFactory cursorFactory = segment.as(CursorFactory.class);
if (cursorFactory == null) {
- throw new ISE(
+ throw DruidException.defensive(
"Null cursor factory found. Probably trying to issue a query
against a segment being memory unmapped."
);
}
- if (segmentHolder.getInputCounters() != null) {
- final int rowCount = getSegmentRowCount(segmentReference);
- closer.register(() ->
segmentHolder.getInputCounters().addFile(rowCount, 0));
- }
-
final CursorHolder nextCursorHolder =
cursorFactory.makeCursorHolder(
ScanQueryEngine.makeCursorBuildSpec(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]