This is an automated email from the ASF dual-hosted git repository.

gianm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 630ca9f7024 fix: Emit publish metrics only when tasks actually 
publish. (#19395)
630ca9f7024 is described below

commit 630ca9f702482ddf1ad357f5838f780c9c47e1a1
Author: Gian Merlino <[email protected]>
AuthorDate: Tue May 12 15:08:10 2026 -0700

    fix: Emit publish metrics only when tasks actually publish. (#19395)
    
    With task replicas, this avoids double-emitting of metrics.
---
 .../SeekableStreamIndexTaskRunner.java             | 21 ++++------
 .../SeekableStreamIndexTaskRunnerTest.java         |  3 +-
 .../appenderator/BaseAppenderatorDriver.java       |  6 ++-
 .../appenderator/SegmentsAndCommitMetadata.java    | 49 +++++++++++++++++++---
 .../appenderator/StreamAppenderatorDriver.java     |  4 +-
 .../SegmentsAndCommitMetadataTest.java             | 35 ++++++++++++++++
 6 files changed, 95 insertions(+), 23 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index 515bafc5054..38de3dc49fe 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -1079,20 +1079,15 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
                 },
                 MoreExecutors.directExecutor()
             );
-            // emit segment count metric:
-            int segmentCount = 0;
-            long totalRowCount = 0;
-            if (publishedSegmentsAndCommitMetadata != null
-                && publishedSegmentsAndCommitMetadata.getSegments() != null) {
-              segmentCount = 
publishedSegmentsAndCommitMetadata.getSegments().size();
-              totalRowCount = 
IndexTaskUtils.getTotalRowCount(publishedSegmentsAndCommitMetadata.getSegments());
+
+            // Emit publish metrics only when this task actually committed the 
segments.
+            if (publishedSegmentsAndCommitMetadata.wasPublished()) {
+              final int segmentCount = 
publishedSegmentsAndCommitMetadata.getSegments().size();
+              final long totalRowCount =
+                  
IndexTaskUtils.getTotalRowCount(publishedSegmentsAndCommitMetadata.getSegments());
+              task.emitMetric(toolbox.getEmitter(), "ingest/segments/count", 
segmentCount);
+              task.emitMetric(toolbox.getEmitter(), "ingest/rows/published", 
totalRowCount);
             }
-            task.emitMetric(
-                toolbox.getEmitter(),
-                "ingest/segments/count",
-                segmentCount
-            );
-            task.emitMetric(toolbox.getEmitter(), "ingest/rows/published", 
totalRowCount);
           }
 
           @Override
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
index 797cbffd937..f08ee84d89d 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
@@ -314,7 +314,8 @@ public class SeekableStreamIndexTaskRunnerTest
         .withNumPartitions(10)
         .withNumRows(1_000)
         .eachOfSizeInMb(500);
-    final SegmentsAndCommitMetadata commitMetadata = new 
SegmentsAndCommitMetadata(segment, "offset-100");
+    final SegmentsAndCommitMetadata commitMetadata =
+        new SegmentsAndCommitMetadata(segment, 
"offset-100").withWasPublished(true);
 
     final StreamAppenderatorDriver driver = 
Mockito.mock(StreamAppenderatorDriver.class);
     Mockito.when(task.newDriver(any(), any(), any()))
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
index 7ca7fd74c2d..257b1db7278 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
@@ -566,7 +566,7 @@ public abstract class BaseAppenderatorDriver implements 
Closeable
               metadata == null ? null : ((AppenderatorDriverMetadata) 
metadata).getCallerMetadata(),
               segmentsAndCommitMetadata.getSegmentSchemaMapping(),
               segmentsAndCommitMetadata.getUpgradedSegments()
-          );
+          ).withWasPublished(segmentsAndCommitMetadata.wasPublished());
         },
         MoreExecutors.directExecutor()
     );
@@ -648,7 +648,9 @@ public abstract class BaseAppenderatorDriver implements 
Closeable
                   }
 
                   log.info("Published segment schemas[%s].", 
segmentsAndCommitMetadata.getSegmentSchemaMapping());
-                  return 
segmentsAndCommitMetadata.withUpgradedSegments(upgradedSegments);
+                  return segmentsAndCommitMetadata
+                      .withUpgradedSegments(upgradedSegments)
+                      .withWasPublished(true);
                 } else {
                   // Publishing didn't affirmatively succeed. However, segments
                   // with these IDs may have already been published:
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadata.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadata.java
index dfbfa621a23..dac9b1b8f34 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadata.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadata.java
@@ -38,12 +38,17 @@ public class SegmentsAndCommitMetadata
 
   private final ImmutableSet<DataSegment> upgradedSegments;
 
+  /**
+   * Whether this object represents segments that were published to the 
metadata store.
+   */
+  private final boolean wasPublished;
+
   public SegmentsAndCommitMetadata(
       List<DataSegment> segments,
       Object commitMetadata
   )
   {
-    this(segments, commitMetadata, null, null);
+    this(segments, commitMetadata, null, null, false);
   }
 
   public SegmentsAndCommitMetadata(
@@ -52,7 +57,7 @@ public class SegmentsAndCommitMetadata
       SegmentSchemaMapping segmentSchemaMapping
   )
   {
-    this(segments, commitMetadata, segmentSchemaMapping, null);
+    this(segments, commitMetadata, segmentSchemaMapping, null, false);
   }
 
   public SegmentsAndCommitMetadata(
@@ -61,11 +66,23 @@ public class SegmentsAndCommitMetadata
       @Nullable SegmentSchemaMapping segmentSchemaMapping,
       @Nullable Set<DataSegment> upgradedSegments
   )
+  {
+    this(segments, commitMetadata, segmentSchemaMapping, upgradedSegments, 
false);
+  }
+
+  private SegmentsAndCommitMetadata(
+      List<DataSegment> segments,
+      @Nullable Object commitMetadata,
+      @Nullable SegmentSchemaMapping segmentSchemaMapping,
+      @Nullable Set<DataSegment> upgradedSegments,
+      boolean wasPublished
+  )
   {
     this.segments = ImmutableList.copyOf(segments);
     this.commitMetadata = commitMetadata;
     this.upgradedSegments = upgradedSegments == null ? null : 
ImmutableSet.copyOf(upgradedSegments);
     this.segmentSchemaMapping = segmentSchemaMapping;
+    this.wasPublished = wasPublished;
   }
 
   public SegmentsAndCommitMetadata withUpgradedSegments(Set<DataSegment> 
upgradedSegments)
@@ -74,7 +91,19 @@ public class SegmentsAndCommitMetadata
         this.segments,
         this.commitMetadata,
         this.segmentSchemaMapping,
-        upgradedSegments
+        upgradedSegments,
+        this.wasPublished
+    );
+  }
+
+  public SegmentsAndCommitMetadata withWasPublished(boolean wasPublished)
+  {
+    return new SegmentsAndCommitMetadata(
+        this.segments,
+        this.commitMetadata,
+        this.segmentSchemaMapping,
+        this.upgradedSegments,
+        wasPublished
     );
   }
 
@@ -103,6 +132,14 @@ public class SegmentsAndCommitMetadata
     return segmentSchemaMapping;
   }
 
+  /**
+   * @see #wasPublished
+   */
+  public boolean wasPublished()
+  {
+    return wasPublished;
+  }
+
   @Override
   public boolean equals(Object o)
   {
@@ -113,7 +150,8 @@ public class SegmentsAndCommitMetadata
       return false;
     }
     SegmentsAndCommitMetadata that = (SegmentsAndCommitMetadata) o;
-    return Objects.equals(commitMetadata, that.commitMetadata) &&
+    return wasPublished == that.wasPublished &&
+           Objects.equals(commitMetadata, that.commitMetadata) &&
            Objects.equals(upgradedSegments, that.upgradedSegments) &&
            Objects.equals(segmentSchemaMapping, that.segmentSchemaMapping) &&
            Objects.equals(segments, that.segments);
@@ -122,7 +160,7 @@ public class SegmentsAndCommitMetadata
   @Override
   public int hashCode()
   {
-    return Objects.hash(commitMetadata, segments, upgradedSegments, 
segmentSchemaMapping);
+    return Objects.hash(commitMetadata, segments, upgradedSegments, 
segmentSchemaMapping, wasPublished);
   }
 
   @Override
@@ -133,6 +171,7 @@ public class SegmentsAndCommitMetadata
            ", segments=" + SegmentUtils.commaSeparatedIdentifiers(segments) +
            ", upgradedSegments=" + 
SegmentUtils.commaSeparatedIdentifiers(upgradedSegments) +
            ", segmentSchemaMapping=" + segmentSchemaMapping +
+           ", wasPublished=" + wasPublished +
            '}';
   }
 }
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java
index 20430294d3b..53b0fcbba3b 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java
@@ -341,7 +341,7 @@ public class StreamAppenderatorDriver extends 
BaseAppenderatorDriver
                 ((AppenderatorDriverMetadata) metadata).getCallerMetadata(),
                 segmentsAndCommitMetadata.getSegmentSchemaMapping(),
                 segmentsAndCommitMetadata.getUpgradedSegments()
-            )
+            ).withWasPublished(segmentsAndCommitMetadata.wasPublished())
         );
       }
 
@@ -385,7 +385,7 @@ public class StreamAppenderatorDriver extends 
BaseAppenderatorDriver
                                 ((AppenderatorDriverMetadata) 
metadata).getCallerMetadata(),
                                 
segmentsAndCommitMetadata.getSegmentSchemaMapping(),
                                 segmentsAndCommitMetadata.getUpgradedSegments()
-                            )
+                            
).withWasPublished(segmentsAndCommitMetadata.wasPublished())
                         );
                       }
                     }
diff --git 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadataTest.java
 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadataTest.java
new file mode 100644
index 00000000000..43df387598c
--- /dev/null
+++ 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadataTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.realtime.appenderator;
+
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.junit.Test;
+
+public class SegmentsAndCommitMetadataTest
+{
+  @Test
+  public void testEquals()
+  {
+    EqualsVerifier.forClass(SegmentsAndCommitMetadata.class)
+                  .withNonnullFields("segments")
+                  .usingGetClass()
+                  .verify();
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to