This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b082a53db0 MSQ: Get row count for counters before mapping. (#19186)
4b082a53db0 is described below

commit 4b082a53db0db5edd0d916a598a110de8b05d3ab
Author: Gian Merlino <[email protected]>
AuthorDate: Fri Mar 20 08:32:03 2026 -0700

    MSQ: Get row count for counters before mapping. (#19186)
    
    Mapped segments generally do not have a PhysicalSegmentInspector, so
    getting the segment row count after mapping often leads to zero.
---
 .../druid/msq/querykit/BaseLeafFrameProcessor.java | 38 ++++++++++++++++++----
 .../groupby/GroupByPreShuffleFrameProcessor.java   | 20 ++----------
 .../msq/querykit/scan/ScanQueryFrameProcessor.java | 19 ++---------
 3 files changed, 35 insertions(+), 42 deletions(-)

diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java
index fdde51a59c2..788da66072d 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java
@@ -30,7 +30,9 @@ import org.apache.druid.frame.processor.ReturnOrAwait;
 import org.apache.druid.frame.read.FrameReader;
 import org.apache.druid.frame.write.FrameWriterFactory;
 import org.apache.druid.java.util.common.Unit;
+import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.msq.counters.ChannelCounters;
 import org.apache.druid.msq.exec.DataServerQueryHandler;
 import org.apache.druid.msq.input.table.SegmentsInputSlice;
 import org.apache.druid.segment.PhysicalSegmentInspector;
@@ -38,8 +40,8 @@ import 
org.apache.druid.segment.ReferenceCountedSegmentProvider;
 import org.apache.druid.segment.Segment;
 import org.apache.druid.segment.SegmentMapFunction;
 import org.apache.druid.segment.SegmentReference;
+import org.apache.druid.utils.CloseableUtils;
 
-import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
@@ -139,16 +141,38 @@ public abstract class BaseLeafFrameProcessor implements 
FrameProcessor<Object>
 
   /**
    * Helper intended to be used by subclasses. Applies {@link #segmentMapFn}, 
which applies broadcast joins
-   * if applicable to this query.
+   * and other mappings applicable to this query. Registers the {@link 
SegmentReference} with the provided
+   * {@link Closer}.
    */
-  @Nullable
-  protected SegmentReference mapSegment(@Nullable final SegmentReference 
segmentReference)
+  protected Segment mapSegment(
+      final SegmentReferenceHolder segmentHolder,
+      final Closer closer
+  )
   {
+    final SegmentReference segmentReference = 
segmentHolder.getSegmentReferenceOnce();
     if (segmentReference == null) {
-      return null;
+      throw DruidException.defensive("Missing segmentReference[%s]", 
segmentHolder.getDescriptor());
+    }
+
+    try {
+      final ChannelCounters counters = segmentHolder.getInputCounters();
+      if (counters != null) {
+        // Attach a counters.addFile call to the closer, to ensure input 
metrics are updated.
+        // Get row count prior to mapping, because mapped segments often do 
not provide PhysicalSegmentInspector.
+        final int rowCount = getSegmentRowCount(segmentReference);
+        closer.register(() -> counters.addFile(rowCount, 0));
+      }
+    }
+    catch (Throwable e) {
+      throw CloseableUtils.closeAndWrapInCatch(e, segmentReference);
+    }
+
+    final Segment segment = 
closer.register(segmentReference.map(segmentMapFn)).getSegmentReference().orElse(null);
+    if (segment == null) {
+      throw DruidException.defensive("Missing segment[%s]", 
segmentHolder.getDescriptor());
     }
 
-    return segmentReference.map(segmentMapFn);
+    return segment;
   }
 
   /**
@@ -168,7 +192,7 @@ public abstract class BaseLeafFrameProcessor implements 
FrameProcessor<Object>
    * Helper to get the number of rows for a segment, using a {@link 
PhysicalSegmentInspector}. Returns 0 when the
    * number is unknown.
    */
-  protected int getSegmentRowCount(final SegmentReference segmentReference)
+  private int getSegmentRowCount(final SegmentReference segmentReference)
   {
     return segmentReference
         .getSegmentReference()
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java
index 5e8defe68aa..4435ae6f8e6 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java
@@ -24,7 +24,6 @@ import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.druid.collections.NonBlockingPool;
 import org.apache.druid.collections.ResourceHolder;
 import org.apache.druid.common.guava.FutureUtils;
-import org.apache.druid.error.DruidException;
 import org.apache.druid.frame.Frame;
 import org.apache.druid.frame.channel.ReadableFrameChannel;
 import org.apache.druid.frame.channel.WritableFrameChannel;
@@ -59,7 +58,6 @@ import org.apache.druid.segment.ColumnSelectorFactory;
 import org.apache.druid.segment.CursorFactory;
 import org.apache.druid.segment.Segment;
 import org.apache.druid.segment.SegmentMapFunction;
-import org.apache.druid.segment.SegmentReference;
 import org.apache.druid.segment.TimeBoundaryInspector;
 import org.apache.druid.segment.column.RowSignature;
 
@@ -169,24 +167,10 @@ public class GroupByPreShuffleFrameProcessor extends 
BaseLeafFrameProcessor
   protected ReturnOrAwait<Unit> runWithSegment(final SegmentReferenceHolder 
segmentHolder) throws IOException
   {
     if (resultYielder == null) {
-      final SegmentReference segmentReference = 
closer.register(mapSegment(segmentHolder.getSegmentReferenceOnce()));
-      if (segmentReference == null) {
-        throw DruidException.defensive("Missing segmentReference for[%s]", 
segmentHolder.getDescriptor());
-      }
-
-      final Segment segment = 
segmentReference.getSegmentReference().orElse(null);
-      if (segment == null) {
-        throw DruidException.defensive("Missing segment for[%s]", 
segmentHolder.getDescriptor());
-      }
-
-      if (segmentHolder.getInputCounters() != null) {
-        final int rowCount = getSegmentRowCount(segmentReference);
-        closer.register(() -> 
segmentHolder.getInputCounters().addFile(rowCount, 0));
-      }
-
+      final Segment segment = mapSegment(segmentHolder, closer);
       final TimeBoundaryInspector tbi = 
segment.as(TimeBoundaryInspector.class);
-
       final Sequence<ResultRow> rowSequence;
+
       if (GroupByTimeBoundaryUtils.canUseTimeBoundaryInspector(query, tbi, 
segmentHolder.getDescriptor())) {
         // Resolve this query using the TimeBoundaryInspector, no need for a 
cursor.
         rowSequence = 
Sequences.simple(List.of(GroupByTimeBoundaryUtils.computeTimeBoundaryResult(query,
 tbi)));
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
index ec24a9fc12c..22fe6d2dfcd 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
@@ -73,7 +73,6 @@ import org.apache.druid.segment.CursorFactory;
 import org.apache.druid.segment.CursorHolder;
 import org.apache.druid.segment.Segment;
 import org.apache.druid.segment.SegmentMapFunction;
-import org.apache.druid.segment.SegmentReference;
 import org.apache.druid.segment.SimpleAscendingOffset;
 import org.apache.druid.segment.SimpleSettableOffset;
 import org.apache.druid.segment.VirtualColumn;
@@ -292,28 +291,14 @@ public class ScanQueryFrameProcessor extends 
BaseLeafFrameProcessor
   protected ReturnOrAwait<Unit> runWithSegment(final SegmentReferenceHolder 
segmentHolder) throws IOException
   {
     if (cursor == null) {
-      final SegmentReference segmentReference = 
closer.register(mapSegment(segmentHolder.getSegmentReferenceOnce()));
-      if (segmentReference == null) {
-        throw DruidException.defensive("Missing segmentReference for[%s]", 
segmentHolder.getDescriptor());
-      }
-
-      final Segment segment = 
segmentReference.getSegmentReference().orElse(null);
-      if (segment == null) {
-        throw DruidException.defensive("Missing segment for[%s]", 
segmentHolder.getDescriptor());
-      }
-
+      final Segment segment = mapSegment(segmentHolder, closer);
       final CursorFactory cursorFactory = segment.as(CursorFactory.class);
       if (cursorFactory == null) {
-        throw new ISE(
+        throw DruidException.defensive(
             "Null cursor factory found. Probably trying to issue a query 
against a segment being memory unmapped."
         );
       }
 
-      if (segmentHolder.getInputCounters() != null) {
-        final int rowCount = getSegmentRowCount(segmentReference);
-        closer.register(() -> 
segmentHolder.getInputCounters().addFile(rowCount, 0));
-      }
-
       final CursorHolder nextCursorHolder =
           cursorFactory.makeCursorHolder(
               ScanQueryEngine.makeCursorBuildSpec(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to