This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 47bcc3f9df1 Add new metric `ingest/rows/published` to fix flaky 
`KinesisFaultToleranceTest` (#19177)
47bcc3f9df1 is described below

commit 47bcc3f9df129aaeeab7975724a5835f0fbb5647
Author: Kashif Faraz <[email protected]>
AuthorDate: Fri Mar 20 21:05:25 2026 +0530

    Add new metric `ingest/rows/published` to fix flaky 
`KinesisFaultToleranceTest` (#19177)
---
 docs/operations/metrics.md                         |   1 +
 .../embedded/indexing/StreamIndexTestBase.java     |   7 +-
 .../src/main/resources/defaultMetrics.json         |   1 +
 .../main/resources/defaultMetricDimensions.json    |   1 +
 .../druid/indexing/common/task/IndexTask.java      |   5 +
 .../druid/indexing/common/task/IndexTaskUtils.java |  17 ++
 .../parallel/ParallelIndexSupervisorTask.java      |   2 +-
 .../SeekableStreamIndexTaskRunner.java             |   4 +
 .../SeekableStreamIndexTaskRunnerTest.java         | 144 +++++++++++-
 .../org/apache/druid/msq/exec/ControllerImpl.java  |   4 +
 processing/src/main/resources/defaultMetrics.json  |   1 +
 processing/src/test/resources/defaultMetrics.json  | 251 ++++++++++++++++++++-
 12 files changed, 425 insertions(+), 13 deletions(-)

diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index 582a8d60877..e5715e821ae 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -222,6 +222,7 @@ If SQL is enabled, the Broker will emit the following 
metrics for SQL.
 |------|-----------|----------|------------|
 |`ingest/count`|Count of `1` every time an ingestion job runs (includes 
compaction jobs). Aggregate using dimensions. | `dataSource`, `taskId`, 
`taskType`, `groupId`, `taskIngestionMode`, `tags` |Always `1`.|
 |`ingest/segments/count`|Count of final segments created by job (includes 
tombstones). | `dataSource`, `taskId`, `taskType`, `groupId`, 
`taskIngestionMode`, `tags` |At least `1`.|
+|`ingest/rows/published`|Number of rows successfully published by the job. | 
`dataSource`, `taskId`, `taskType`, `groupId`, `taskIngestionMode`, `tags` |At 
least `1`.|
 |`ingest/tombstones/count`|Count of tombstones created by job. | `dataSource`, 
`taskId`, `taskType`, `groupId`, `taskIngestionMode`, `tags` |Zero or more for 
replace. Always zero for non-replace tasks (always zero for legacy replace, see 
below).|
 
 The `taskIngestionMode` dimension includes the following modes:
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/StreamIndexTestBase.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/StreamIndexTestBase.java
index c9ec82e60e5..772d1bcc9b0 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/StreamIndexTestBase.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/StreamIndexTestBase.java
@@ -129,19 +129,20 @@ public abstract class StreamIndexTestBase extends 
EmbeddedClusterTestBase
   }
 
   /**
-   * Waits until number of processed events matches {@code expectedRowCount}.
+   * Waits until the total row count of successfully published segments matches
+   * {@code expectedRowCount}.
    */
   protected void waitUntilPublishedRecordsAreIngested(int expectedRowCount)
   {
     indexer.latchableEmitter().waitForEventAggregate(
-        event -> event.hasMetricName("ingest/events/processed")
+        event -> event.hasMetricName("ingest/rows/published")
                       .hasDimension(DruidMetrics.DATASOURCE, dataSource),
         agg -> agg.hasSumAtLeast(expectedRowCount)
     );
 
     final int totalEventsProcessed = indexer
         .latchableEmitter()
-        .getMetricValues("ingest/events/processed", 
Map.of(DruidMetrics.DATASOURCE, dataSource))
+        .getMetricValues("ingest/rows/published", 
Map.of(DruidMetrics.DATASOURCE, dataSource))
         .stream()
         .mapToInt(Number::intValue)
         .sum();
diff --git 
a/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json 
b/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json
index 79be4527447..247686a838b 100644
--- 
a/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json
+++ 
b/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json
@@ -101,6 +101,7 @@
   "ingest/count" : { "dimensions" : ["dataSource", "taskType"], "type" : 
"count", "help": "Count of 1 every time an ingestion job runs (includes 
compaction jobs). Aggregate using dimensions." },
   "ingest/segments/count" : { "dimensions" : ["dataSource", "taskType"], 
"type" : "count", "help": "Count of final segments created by job (includes 
tombstones)." },
   "ingest/tombstones/count" : { "dimensions" : ["dataSource", "taskType"], 
"type" : "count", "help": "Count of tombstones created by job." },
+  "ingest/rows/published": { "dimensions" : ["dataSource", "taskType"], "type" 
: "count", "help": "Number of rows successfully published by the job." },
 
   "ingest/kafka/lag" : { "dimensions" : ["dataSource", "stream"], "type" : 
"gauge", "help": "Total lag between the offsets consumed by the Kafka indexing 
tasks and latest offsets in Kafka brokers across all partitions. Minimum 
emission period for this metric is a minute."},
   "ingest/kafka/maxLag" : { "dimensions" : ["dataSource", "stream"], "type" : 
"gauge", "help": "Max lag between the offsets consumed by the Kafka indexing 
tasks and latest offsets in Kafka brokers across all partitions. Minimum 
emission period for this metric is a minute."},
diff --git 
a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
 
b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
index d8074cb2909..828031cfb11 100644
--- 
a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
+++ 
b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
@@ -54,6 +54,7 @@
   "ingest/merge/time" : { "dimensions" : ["dataSource"], "type" : "timer" },
   "ingest/merge/cpu" : { "dimensions" : ["dataSource"], "type" : "timer" },
   "ingest/segments/count" : { "dimensions" : ["dataSource"], "type" : "count" 
},
+  "ingest/rows/published": { "dimensions" : ["dataSource"], "type" : "count" },
 
   "ingest/kafka/lag" : { "dimensions" : ["dataSource", "stream"], "type" : 
"gauge" },
   "ingest/kafka/maxLag" : { "dimensions" : ["dataSource", "stream"], "type" : 
"gauge" },
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index abbbf26400f..eb3b5c0e84f 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -986,6 +986,11 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler, Pe
         emitMetric(toolbox.getEmitter(), "ingest/segments/count",
                    published.getSegments().size() + tombStones.size()
         );
+        emitMetric(
+            toolbox.getEmitter(),
+            "ingest/rows/published",
+            IndexTaskUtils.getTotalRowCount(published.getSegments())
+        );
 
         log.debugSegments(published.getSegments(), "Published segments");
 
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java
index e08c096d393..29692e1aa98 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java
@@ -28,7 +28,9 @@ import 
org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.query.DruidMetrics;
 import org.apache.druid.timeline.DataSegment;
 
+import java.util.Collection;
 import java.util.Map;
+import java.util.Objects;
 
 public class IndexTaskUtils
 {
@@ -102,4 +104,19 @@ public class IndexTaskUtils
       toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/failure", 
1));
     }
   }
+
+  /**
+   * Gets total row count of the given segments. Legacy segments do not have 
the
+   * row count populated in the metadata and thus do not contribute to the row
+   * count.
+   */
+  public static long getTotalRowCount(Collection<DataSegment> segments)
+  {
+    return segments
+        .stream()
+        .map(DataSegment::getTotalRows)
+        .filter(Objects::nonNull)
+        .mapToLong(Integer::longValue)
+        .sum();
+  }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index c4f37192ee1..8d5e9a85e95 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -1220,7 +1220,7 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask
       // segment metrics:
       emitMetric(toolbox.getEmitter(), "ingest/tombstones/count", 
tombStones.size());
       emitMetric(toolbox.getEmitter(), "ingest/segments/count", 
newSegments.size());
-
+      emitMetric(toolbox.getEmitter(), "ingest/rows/published", 
IndexTaskUtils.getTotalRowCount(newSegments));
     } else {
       throw new ISE("Failed to publish segments");
     }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index 0ab9f613cbe..0f986dfc9c4 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -65,6 +65,7 @@ import 
org.apache.druid.indexing.common.actions.SegmentLockAcquireAction;
 import org.apache.druid.indexing.common.actions.TaskLocks;
 import org.apache.druid.indexing.common.actions.TimeChunkLockAcquireAction;
 import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
+import org.apache.druid.indexing.common.task.IndexTaskUtils;
 import org.apache.druid.indexing.input.InputRowSchemas;
 import 
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
 import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
@@ -1094,15 +1095,18 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
             );
             // emit segment count metric:
             int segmentCount = 0;
+            long totalRowCount = 0;
             if (publishedSegmentsAndCommitMetadata != null
                 && publishedSegmentsAndCommitMetadata.getSegments() != null) {
               segmentCount = 
publishedSegmentsAndCommitMetadata.getSegments().size();
+              totalRowCount = 
IndexTaskUtils.getTotalRowCount(publishedSegmentsAndCommitMetadata.getSegments());
             }
             task.emitMetric(
                 toolbox.getEmitter(),
                 "ingest/segments/count",
                 segmentCount
             );
+            task.emitMetric(toolbox.getEmitter(), "ingest/rows/published", 
totalRowCount);
           }
 
           @Override
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
index 29dcc9ea37b..e1968a457c4 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
@@ -22,45 +22,86 @@ package org.apache.druid.indexing.seekablestream;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.Futures;
+import org.apache.druid.client.DruidServer;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.JsonInputFormat;
 import org.apache.druid.data.input.impl.StringDimensionSchema;
 import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.discovery.DataNodeService;
+import org.apache.druid.discovery.DiscoveryDruidNode;
+import org.apache.druid.discovery.DruidNodeAnnouncer;
+import org.apache.druid.discovery.LookupNodeService;
 import org.apache.druid.indexer.granularity.UniformGranularitySpec;
 import org.apache.druid.indexing.common.LockGranularity;
 import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.TestUtils;
+import org.apache.druid.indexing.common.task.NoopTestTaskReportFileWriter;
+import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
 import 
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
 import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
 import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
 import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.incremental.InputRowFilterResult;
+import org.apache.druid.segment.incremental.NoopRowIngestionMeters;
 import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.realtime.NoopChatHandlerProvider;
+import 
org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata;
+import org.apache.druid.segment.realtime.appenderator.StreamAppenderator;
+import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver;
+import org.apache.druid.server.coordination.DataSegmentServerAnnouncer;
+import org.apache.druid.server.coordination.ServerType;
+import org.apache.druid.server.coordinator.CreateDataSegments;
+import org.apache.druid.server.security.AuthTestUtils;
+import org.apache.druid.timeline.DataSegment;
 import org.joda.time.DateTime;
 import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.junit.MockitoJUnitRunner;
 
 import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 
+import static org.mockito.ArgumentMatchers.any;
+
 @RunWith(MockitoJUnitRunner.class)
 public class SeekableStreamIndexTaskRunnerTest
 {
+  @Rule
+  public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
   @Mock
   private InputRow row;
 
   @Mock
   private SeekableStreamIndexTask task;
 
+  private StubServiceEmitter emitter;
+
+  @Before
+  public void setup()
+  {
+    emitter = new StubServiceEmitter();
+  }
+
   @Test
   public void testWithinMinMaxTime()
   {
@@ -101,7 +142,7 @@ public class SeekableStreamIndexTaskRunnerTest
     Mockito.when(task.getIOConfig()).thenReturn(ioConfig);
     Mockito.when(task.getTuningConfig()).thenReturn(tuningConfig);
 
-    TestasbleSeekableStreamIndexTaskRunner runner = new 
TestasbleSeekableStreamIndexTaskRunner(
+    TestSeekableStreamIndexTaskRunner runner = new 
TestSeekableStreamIndexTaskRunner(
         task,
         LockGranularity.TIME_CHUNK
     );
@@ -156,7 +197,7 @@ public class SeekableStreamIndexTaskRunnerTest
     Mockito.when(task.getDataSchema()).thenReturn(schema);
     Mockito.when(task.getIOConfig()).thenReturn(ioConfig);
     Mockito.when(task.getTuningConfig()).thenReturn(tuningConfig);
-    TestasbleSeekableStreamIndexTaskRunner runner = new 
TestasbleSeekableStreamIndexTaskRunner(
+    TestSeekableStreamIndexTaskRunner runner = new 
TestSeekableStreamIndexTaskRunner(
         task,
         LockGranularity.TIME_CHUNK
     );
@@ -209,7 +250,7 @@ public class SeekableStreamIndexTaskRunnerTest
     Mockito.when(task.getIOConfig()).thenReturn(ioConfig);
     Mockito.when(task.getTuningConfig()).thenReturn(tuningConfig);
 
-    TestasbleSeekableStreamIndexTaskRunner runner = new 
TestasbleSeekableStreamIndexTaskRunner(
+    TestSeekableStreamIndexTaskRunner runner = new 
TestSeekableStreamIndexTaskRunner(
         task,
         LockGranularity.TIME_CHUNK
     );
@@ -218,7 +259,7 @@ public class SeekableStreamIndexTaskRunnerTest
   }
 
   @Test
-  public void testGetSupervisorId()
+  public void test_run_emitsRowCountAndSegmentCount_onSuccessfulPublish()
   {
     DimensionsSpec dimensionsSpec = new DimensionsSpec(
         Arrays.asList(
@@ -252,18 +293,105 @@ public class SeekableStreamIndexTaskRunnerTest
     Mockito.when(task.getDataSchema()).thenReturn(schema);
     Mockito.when(task.getIOConfig()).thenReturn(ioConfig);
     Mockito.when(task.getTuningConfig()).thenReturn(tuningConfig);
-
+    Mockito.when(task.getId()).thenReturn("task1");
     Mockito.when(task.getSupervisorId()).thenReturn("supervisorId");
-    TestasbleSeekableStreamIndexTaskRunner runner = new 
TestasbleSeekableStreamIndexTaskRunner(
+    TestSeekableStreamIndexTaskRunner runner = new 
TestSeekableStreamIndexTaskRunner(
         task,
         LockGranularity.TIME_CHUNK
     );
     Assert.assertEquals("supervisorId", runner.getSupervisorId());
+
+    // Setup the task to return a RecordSupplier, StreamAppenderatorDriver, 
Appenderator
+    final RecordSupplier<?, ?, ?> recordSupplier = 
Mockito.mock(RecordSupplier.class);
+    Mockito.when(task.newTaskRecordSupplier(any()))
+           .thenReturn(recordSupplier);
+
+    final StreamAppenderator appenderator = 
Mockito.mock(StreamAppenderator.class);
+    Mockito.when(task.newAppenderator(any(), any(), any(), any()))
+           .thenReturn(appenderator);
+
+    final List<DataSegment> segment = CreateDataSegments
+        .ofDatasource(schema.getDataSource())
+        .withNumPartitions(10)
+        .withNumRows(1_000)
+        .eachOfSizeInMb(500);
+    final SegmentsAndCommitMetadata commitMetadata = new 
SegmentsAndCommitMetadata(segment, "offset-100");
+
+    final StreamAppenderatorDriver driver = 
Mockito.mock(StreamAppenderatorDriver.class);
+    Mockito.when(task.newDriver(any(), any(), any()))
+           .thenReturn(driver);
+    Mockito.when(driver.publish(any(), any(), any()))
+           .thenReturn(Futures.immediateFuture(commitMetadata));
+    Mockito.when(driver.registerHandoff(any()))
+           .thenReturn(Futures.immediateFuture(commitMetadata));
+
+    Mockito.doAnswer(invocation -> {
+      final String metricName = invocation.getArgument(1);
+      final Number value = invocation.getArgument(2);
+      emitter.emit(ServiceMetricEvent.builder().setMetric(metricName, 
value).build("test", "localhost"));
+      return null;
+    }).when(task).emitMetric(any(), any(), any());
+
+    runner.run(createTaskToolbox());
+    emitter.verifyValue("ingest/segments/count", 10);
+    emitter.verifyValue("ingest/rows/published", 10_000L);
+  }
+
+  private TaskToolbox createTaskToolbox()
+  {
+    final TestUtils testUtils = new TestUtils();
+    final File taskWorkDir = createTaskWorkDirectory();
+    return new TaskToolbox
+        .Builder()
+        .indexIO(TestHelper.getTestIndexIO())
+        .taskWorkDir(taskWorkDir)
+        .taskReportFileWriter(new NoopTestTaskReportFileWriter())
+        .authorizerMapper(AuthTestUtils.TEST_AUTHORIZER_MAPPER)
+        .rowIngestionMetersFactory(NoopRowIngestionMeters::new)
+        .indexMerger(testUtils.getIndexMergerV9Factory().create(true))
+        .chatHandlerProvider(new NoopChatHandlerProvider())
+        .dataNodeService(new DataNodeService(DruidServer.DEFAULT_TIER, 100L, 
null, ServerType.HISTORICAL, 1))
+        .lookupNodeService(new LookupNodeService(DruidServer.DEFAULT_TIER))
+        .appenderatorsManager(new TestAppenderatorsManager())
+        .serverAnnouncer(new DataSegmentServerAnnouncer.Noop())
+        .druidNodeAnnouncer(new NoopDruidNodeAnnouncer())
+        .jsonMapper(TestHelper.JSON_MAPPER)
+        .emitter(emitter)
+        .build();
+  }
+
+  private File createTaskWorkDirectory()
+  {
+    try {
+      final File taskWorkDir = temporaryFolder.newFolder();
+      FileUtils.mkdirp(taskWorkDir);
+      FileUtils.mkdirp(new File(taskWorkDir, "persist"));
+      return taskWorkDir;
+    }
+    catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static class NoopDruidNodeAnnouncer implements DruidNodeAnnouncer
+  {
+
+    @Override
+    public void announce(DiscoveryDruidNode discoveryDruidNode)
+    {
+
+    }
+
+    @Override
+    public void unannounce(DiscoveryDruidNode discoveryDruidNode)
+    {
+
+    }
   }
 
-  static class TestasbleSeekableStreamIndexTaskRunner extends 
SeekableStreamIndexTaskRunner
+  static class TestSeekableStreamIndexTaskRunner extends 
SeekableStreamIndexTaskRunner
   {
-    public TestasbleSeekableStreamIndexTaskRunner(
+    public TestSeekableStreamIndexTaskRunner(
         SeekableStreamIndexTask task,
         LockGranularity lockGranularityToUse
     )
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index 6a064c82c5d..86223a20ac2 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -76,6 +76,7 @@ import 
org.apache.druid.indexing.common.actions.SegmentTransactionalReplaceActio
 import org.apache.druid.indexing.common.actions.TaskAction;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
+import org.apache.druid.indexing.common.task.IndexTaskUtils;
 import org.apache.druid.indexing.common.task.Tasks;
 import org.apache.druid.indexing.common.task.batch.TooManyBucketsException;
 import org.apache.druid.indexing.common.task.batch.parallel.TombstoneHelper;
@@ -1595,6 +1596,9 @@ public class ControllerImpl implements Controller
     // Include tombstones in the reported segments count
     metricBuilder.setMetric("ingest/segments/count", 
segmentsWithTombstones.size());
     context.emitMetric(metricBuilder);
+
+    metricBuilder.setMetric("ingest/rows/published", 
IndexTaskUtils.getTotalRowCount(segmentsWithTombstones));
+    context.emitMetric(metricBuilder);
   }
 
   private static TaskAction<SegmentPublishResult> createAppendAction(
diff --git a/processing/src/main/resources/defaultMetrics.json 
b/processing/src/main/resources/defaultMetrics.json
index cad4dcf1bfd..c4647a459c1 100644
--- a/processing/src/main/resources/defaultMetrics.json
+++ b/processing/src/main/resources/defaultMetrics.json
@@ -37,6 +37,7 @@
     "ingest/persists/time": [],
     "ingest/rows/output": [],
     "ingest/segments/count": [],
+    "ingest/rows/published": [],
     "ingest/sink/count": [],
     "ingest/tombstones/count": [],
     "interval/compacted/count": [],
diff --git a/processing/src/test/resources/defaultMetrics.json 
b/processing/src/test/resources/defaultMetrics.json
index 22a9d0329fc..9ec51afff2d 100644
--- a/processing/src/test/resources/defaultMetrics.json
+++ b/processing/src/test/resources/defaultMetrics.json
@@ -1 +1,250 @@
-{"compact/segmentAnalyzer/fetchAndProcessMillis":[],"compact/task/count":[],"compactTask/availableSlot/count":[],"compactTask/maxSlot/count":[],"coordinator/global/time":[],"coordinator/time":[],"groupBy/maxMergeDictionarySize":[],"groupBy/maxSpilledBytes":[],"groupBy/mergeDictionarySize":[],"groupBy/spilledBytes":[],"groupBy/spilledQueries":[],"ingest/count":[],"ingest/events/duplicate":[],"ingest/events/messageGap":[],"ingest/events/processed":[],"ingest/events/processedWithError":[],"
 [...]
+{
+  "compact/segmentAnalyzer/fetchAndProcessMillis": [],
+  "compact/task/count": [],
+  "compactTask/availableSlot/count": [],
+  "compactTask/maxSlot/count": [],
+  "coordinator/global/time": [],
+  "coordinator/time": [],
+  "groupBy/maxMergeDictionarySize": [],
+  "groupBy/maxSpilledBytes": [],
+  "groupBy/mergeDictionarySize": [],
+  "groupBy/spilledBytes": [],
+  "groupBy/spilledQueries": [],
+  "ingest/count": [],
+  "ingest/events/duplicate": [],
+  "ingest/events/messageGap": [],
+  "ingest/events/processed": [],
+  "ingest/events/processedWithError": [],
+  "ingest/events/thrownAway": [],
+  "ingest/events/unparseable": [],
+  "ingest/handoff/count": [],
+  "ingest/handoff/failed": [],
+  "ingest/handoff/time": [],
+  "ingest/input/bytes": [],
+  "ingest/kafka/avgLag": [],
+  "ingest/kafka/lag": [],
+  "ingest/kafka/maxLag": [],
+  "ingest/kafka/partitionLag": [],
+  "ingest/merge/cpu": [],
+  "ingest/merge/time": [],
+  "ingest/notices/queueSize": [],
+  "ingest/notices/time": [],
+  "ingest/pause/time": [],
+  "ingest/persists/backPressure": [],
+  "ingest/persists/count": [],
+  "ingest/persists/cpu": [],
+  "ingest/persists/failed": [],
+  "ingest/persists/time": [],
+  "ingest/rows/output": [],
+  "ingest/segments/count": [],
+  "ingest/rows/published": [],
+  "ingest/sink/count": [],
+  "ingest/tombstones/count": [],
+  "interval/compacted/count": [],
+  "interval/skipCompact/count": [],
+  "interval/waitCompact/count": [],
+  "jetty/numOpenConnections": [],
+  "jetty/threadPool/busy": [],
+  "jetty/threadPool/idle": [],
+  "jetty/threadPool/isLowOnThreads": [],
+  "jetty/threadPool/max": [],
+  "jetty/threadPool/min": [],
+  "jetty/threadPool/queueSize": [],
+  "jetty/threadPool/ready": [],
+  "jetty/threadPool/total": [],
+  "jetty/threadPool/utilizationRate": [],
+  "jetty/threadPool/utilized": [],
+  "jvm/bufferpool/capacity": [],
+  "jvm/bufferpool/count": [],
+  "jvm/bufferpool/used": [],
+  "jvm/gc/count": [],
+  "jvm/gc/cpu": [],
+  "jvm/mem/committed": [],
+  "jvm/mem/init": [],
+  "jvm/mem/max": [],
+  "jvm/mem/used": [],
+  "jvm/pool/committed": [],
+  "jvm/pool/init": [],
+  "jvm/pool/max": [],
+  "jvm/pool/used": [],
+  "kafka/consumer/bytesConsumed": [],
+  "kafka/consumer/fetch": [],
+  "kafka/consumer/fetchLatencyAvg": [],
+  "kafka/consumer/fetchLatencyMax": [],
+  "kafka/consumer/fetchRate": [],
+  "kafka/consumer/fetchSizeAvg": [],
+  "kafka/consumer/fetchSizeMax": [],
+  "kafka/consumer/incomingBytes": [],
+  "kafka/consumer/outgoingBytes": [],
+  "kafka/consumer/recordsConsumed": [],
+  "kafka/consumer/recordsLag": [],
+  "kafka/consumer/recordsPerRequestAvg": [],
+  "kill/eligibleUnusedSegments/count": [],
+  "kill/pendingSegments/count": [],
+  "kill/task/count": [],
+  "killTask/availableSlot/count": [],
+  "killTask/maxSlot/count": [],
+  "mergeBuffer/acquisitionTimeNs": [],
+  "mergeBuffer/maxAcquisitionTimeNs": [],
+  "mergeBuffer/pendingRequests": [],
+  "mergeBuffer/queries": [],
+  "mergeBuffer/used": [],
+  "metadata/kill/audit/count": [],
+  "metadata/kill/compaction/count": [],
+  "metadata/kill/datasource/count": [],
+  "metadata/kill/rule/count": [],
+  "metadata/kill/supervisor/count": [],
+  "metadatacache/backfill/count": [],
+  "metadatacache/init/time": [],
+  "metadatacache/refresh/count": [],
+  "metadatacache/refresh/time": [],
+  "metadatacache/schemaPoll/count": [],
+  "metadatacache/schemaPoll/failed": [],
+  "metadatacache/schemaPoll/time": [],
+  "query/bytes": [],
+  "query/cache/delta/averageBytes": [],
+  "query/cache/delta/errors": [],
+  "query/cache/delta/evictions": [],
+  "query/cache/delta/hitRate": [],
+  "query/cache/delta/hits": [],
+  "query/cache/delta/misses": [],
+  "query/cache/delta/numEntries": [],
+  "query/cache/delta/put/error": [],
+  "query/cache/delta/put/ok": [],
+  "query/cache/delta/put/oversized": [],
+  "query/cache/delta/sizeBytes": [],
+  "query/cache/delta/timeouts": [],
+  "query/cache/total/averageBytes": [],
+  "query/cache/total/errors": [],
+  "query/cache/total/evictions": [],
+  "query/cache/total/hitRate": [],
+  "query/cache/total/hits": [],
+  "query/cache/total/misses": [],
+  "query/cache/total/numEntries": [],
+  "query/cache/total/put/error": [],
+  "query/cache/total/put/ok": [],
+  "query/cache/total/put/oversized": [],
+  "query/cache/total/sizeBytes": [],
+  "query/cache/total/timeouts": [],
+  "query/count": [],
+  "query/cpu/time": [],
+  "query/failed/count": [],
+  "query/interrupted/count": [],
+  "query/node/bytes": [],
+  "query/node/time": [],
+  "query/node/ttfb": [],
+  "query/segment/time": [],
+  "query/segmentAndCache/time": [],
+  "query/success/count": [],
+  "query/time": [],
+  "query/timeout/count": [],
+  "query/wait/time": [],
+  "schemacache/finalizedSchemaPayload/count": [],
+  "schemacache/finalizedSegmentMetadata/count": [],
+  "schemacache/inTransitSMQPublishedResults/count": [],
+  "schemacache/inTransitSMQResults/count": [],
+  "schemacache/realtime/count": [],
+  "segment/added/bytes": [],
+  "segment/assignSkipped/count": [],
+  "segment/assigned/count": [],
+  "segment/availableDeepStorageOnly/count": [],
+  "segment/compacted/bytes": [],
+  "segment/compacted/count": [],
+  "segment/count": [],
+  "segment/deleted/count": [],
+  "segment/dropQueue/count": [],
+  "segment/dropSkipped/count": [],
+  "segment/dropped/count": [],
+  "segment/loadQueue/assigned": [],
+  "segment/loadQueue/cancelled": [],
+  "segment/loadQueue/count": [],
+  "segment/loadQueue/failed": [],
+  "segment/loadQueue/size": [],
+  "segment/loadQueue/success": [],
+  "segment/max": [],
+  "segment/moveSkipped/count": [],
+  "segment/moved/bytes": [],
+  "segment/moved/count": [],
+  "segment/nuked/bytes": [],
+  "segment/overShadowed/count": [],
+  "segment/pendingDelete": [],
+  "segment/scan/active": [],
+  "segment/scan/pending": [],
+  "segment/size": [],
+  "segment/skipCompact/bytes": [],
+  "segment/skipCompact/count": [],
+  "segment/unavailable/count": [],
+  "segment/underReplicated/count": [],
+  "segment/unneeded/count": [],
+  "segment/unneededEternityTombstone/count": [],
+  "segment/used": [],
+  "segment/usedPercent": [],
+  "segment/waitCompact/bytes": [],
+  "segment/waitCompact/count": [],
+  "serverview/init/time": [],
+  "serverview/sync/healthy": [],
+  "serverview/sync/unstableTime": [],
+  "service/heartbeat": [],
+  "sqlQuery/bytes": [],
+  "sqlQuery/planningTimeMs": [],
+  "sqlQuery/time": [],
+  "sys/cpu": [],
+  "sys/disk/queue": [],
+  "sys/disk/read/count": [],
+  "sys/disk/read/size": [],
+  "sys/disk/transferTime": [],
+  "sys/disk/write/count": [],
+  "sys/disk/write/size": [],
+  "sys/fs/files/count": [],
+  "sys/fs/files/free": [],
+  "sys/fs/max": [],
+  "sys/fs/used": [],
+  "sys/mem/free": [],
+  "sys/mem/max": [],
+  "sys/mem/used": [],
+  "sys/net/read/dropped": [],
+  "sys/net/read/errors": [],
+  "sys/net/read/packets": [],
+  "sys/net/read/size": [],
+  "sys/net/write/collisions": [],
+  "sys/net/write/errors": [],
+  "sys/net/write/packets": [],
+  "sys/net/write/size": [],
+  "sys/storage/used": [],
+  "sys/swap/free": [],
+  "sys/swap/max": [],
+  "sys/swap/pageIn": [],
+  "sys/swap/pageOut": [],
+  "sys/tcpv4/activeOpens": [],
+  "sys/tcpv4/attemptFails": [],
+  "sys/tcpv4/estabResets": [],
+  "sys/tcpv4/in/errs": [],
+  "sys/tcpv4/in/segs": [],
+  "sys/tcpv4/out/rsts": [],
+  "sys/tcpv4/out/segs": [],
+  "sys/tcpv4/passiveOpens": [],
+  "sys/tcpv4/retrans/segs": [],
+  "sys/uptime": [],
+  "task/action/batch/attempts": [],
+  "task/action/batch/queueTime": [],
+  "task/action/batch/runTime": [],
+  "task/action/batch/size": [],
+  "task/action/failed/count": [],
+  "task/action/run/time": [],
+  "task/action/success/count": [],
+  "task/autoScaler/requiredCount": [],
+  "task/failed/count": [],
+  "task/pending/count": [],
+  "task/pending/time": [],
+  "task/run/time": [],
+  "task/running/count": [],
+  "task/segmentAvailability/wait/time": [],
+  "task/success/count": [],
+  "task/waiting/count": [],
+  "tier/historical/count": [],
+  "tier/replication/factor": [],
+  "tier/required/capacity": [],
+  "tier/total/capacity": [],
+  "zk/connected": [],
+  "zk/reconnect/time": []
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to