This is an automated email from the ASF dual-hosted git repository.

abhishekrb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 47d6f34643c Emit missing `ingest/merge/time`, `ingest/merge/cpu` and 
`ingest/persists/cpu` metrics (#18866)
47d6f34643c is described below

commit 47d6f34643c42e469d4d49dfab08e2ca0e8b0412
Author: Abhishek Radhakrishnan <[email protected]>
AuthorDate: Fri Dec 26 12:22:31 2025 -0500

    Emit missing `ingest/merge/time`, `ingest/merge/cpu` and 
`ingest/persists/cpu` metrics (#18866)
    
    The metrics ingest/merge/time, ingest/merge/cpu and ingest/persists/cpu 
metrics have been documented but were previously reported as zero because they 
were not set in the streaming and batch appendators (it probably regressed in a 
refactor).
    
    This patch now correctly reports ingest/merge/time, ingest/merge/cpu and 
ingest/persists/cpu metrics for streaming and batch ingestion tasks.
    
    Also, cleaned up KafkaIndexTaskTest and KinesisIndexTaskTest by 
initializing a StubServiceEmitter as a non-static member in the base class 
SeekableStreamIndexTaskTestBase so it can be used by each unit test as needed.
---
 .../druid/indexing/kafka/KafkaIndexTaskTest.java   | 60 ++++++++++++++++------
 .../indexing/kinesis/KinesisIndexTaskTest.java     | 33 +++++++-----
 .../SeekableStreamIndexTaskTestBase.java           | 31 +++++++++++
 .../segment/realtime/SegmentGenerationMetrics.java | 15 ++++++
 .../realtime/appenderator/BatchAppenderator.java   | 31 +++++++----
 .../realtime/appenderator/StreamAppenderator.java  | 29 +++++++----
 .../appenderator/BatchAppenderatorTest.java        | 22 ++++++++
 .../appenderator/StreamAppenderatorTest.java       | 12 +++++
 .../appenderator/StreamAppenderatorTester.java     | 18 +++++--
 9 files changed, 197 insertions(+), 54 deletions(-)

diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index cb359896ee2..46b101055c5 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -88,9 +88,6 @@ import 
org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.guava.Sequences;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
 import org.apache.druid.java.util.common.parsers.ParseException;
-import org.apache.druid.java.util.emitter.EmittingLogger;
-import org.apache.druid.java.util.emitter.core.NoopEmitter;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.math.expr.ExprMacroTable;
 import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
 import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
@@ -118,6 +115,7 @@ import 
org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
 import org.apache.druid.segment.incremental.RowMeters;
 import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
 import org.apache.druid.segment.transform.ExpressionTransform;
 import org.apache.druid.segment.transform.TransformSpec;
 import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec;
@@ -196,9 +194,7 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
 
   private static TestingCluster zkServer;
   private static TestBroker kafkaServer;
-  private static ServiceEmitter emitter;
   private static int topicPostfix;
-
   static final Module TEST_MODULE = new 
SimpleModule("kafkaTestModule").registerSubtypes(
       new NamedType(TestKafkaInputFormat.class, "testKafkaInputFormat"),
       new NamedType(TestKafkaFormatWithMalformedDataDetection.class, 
"testKafkaFormatWithMalformedDataDetection")
@@ -288,14 +284,6 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
   @BeforeClass
   public static void setupClass() throws Exception
   {
-    emitter = new ServiceEmitter(
-        "service",
-        "host",
-        new NoopEmitter()
-    );
-    emitter.start();
-    EmittingLogger.registerEmitter(emitter);
-
     zkServer = new TestingCluster(1);
     zkServer.start();
 
@@ -354,8 +342,6 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
 
     zkServer.stop();
     zkServer = null;
-
-    emitter.close();
   }
 
   @Test(timeout = 60_000L)
@@ -388,7 +374,6 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
     // Wait for task to exit
     Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
     verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(2, 
5)).totalProcessed(3));
-    
Assert.assertTrue(task.getRunner().getSegmentGenerationMetrics().isProcessingDone());
 
     // Check published metadata and segments in deep storage
     assertEqualsExceptVersion(
@@ -402,6 +387,12 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
         new KafkaDataSourceMetadata(new 
SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(new 
KafkaTopicPartition(false, topic, 0), 5L))),
         newDataSchemaMetadata()
     );
+
+    final SegmentGenerationMetrics observedSegmentGenerationMetrics = 
task.getRunner().getSegmentGenerationMetrics();
+    Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone());
+    Assert.assertEquals(3, observedSegmentGenerationMetrics.rowOutput());
+    Assert.assertEquals(2, observedSegmentGenerationMetrics.handOffCount());
+    
verifyPersistAndMergeTimeMetricsArePositive(observedSegmentGenerationMetrics);
   }
 
   @Test(timeout = 60_000L)
@@ -450,6 +441,12 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
         Assert.assertEquals(dimensionsSpec.getDimensionNames().get(i), 
segment.getDimensions().get(i));
       }
     }
+
+    final SegmentGenerationMetrics observedSegmentGenerationMetrics = 
task.getRunner().getSegmentGenerationMetrics();
+    Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone());
+    Assert.assertEquals(3, observedSegmentGenerationMetrics.rowOutput());
+    Assert.assertEquals(2, observedSegmentGenerationMetrics.handOffCount());
+    
verifyPersistAndMergeTimeMetricsArePositive(observedSegmentGenerationMetrics);
   }
 
   @Test(timeout = 60_000L)
@@ -739,6 +736,13 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
         ),
         newDataSchemaMetadata()
     );
+
+    final SegmentGenerationMetrics observedSegmentGenerationMetrics = 
task.getRunner().getSegmentGenerationMetrics();
+    Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone());
+    Assert.assertEquals(8, observedSegmentGenerationMetrics.rowOutput());
+    Assert.assertEquals(7, observedSegmentGenerationMetrics.handOffCount());
+    Assert.assertEquals(4, observedSegmentGenerationMetrics.numPersists());
+    
verifyPersistAndMergeTimeMetricsArePositive(observedSegmentGenerationMetrics);
   }
 
   @Test(timeout = 60_000L)
@@ -1698,6 +1702,14 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
         "{timestamp=246140482-04-24T15:36:27.903Z, dim1=x, dim2=z, dimLong=10, 
dimFloat=20.0, met1=1.0}"
     );
     Assert.assertEquals(expectedInputs, parseExceptionReport.getInputs());
+
+    emitter.verifyValue("ingest/segments/count", 4);
+
+    final SegmentGenerationMetrics observedSegmentGenerationMetrics = 
task.getRunner().getSegmentGenerationMetrics();
+    Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone());
+    Assert.assertEquals(7, observedSegmentGenerationMetrics.rowOutput());
+    Assert.assertEquals(4, observedSegmentGenerationMetrics.handOffCount());
+    
verifyPersistAndMergeTimeMetricsArePositive(observedSegmentGenerationMetrics);
   }
 
   @Test(timeout = 60_000L)
@@ -1775,6 +1787,14 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
 
     List<String> expectedInputs = Arrays.asList("", "unparseable");
     Assert.assertEquals(expectedInputs, parseExceptionReport.getInputs());
+
+    emitter.verifyNotEmitted("ingest/segments/count");
+
+    final SegmentGenerationMetrics observedSegmentGenerationMetrics = 
task.getRunner().getSegmentGenerationMetrics();
+    Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone());
+    Assert.assertEquals(3, observedSegmentGenerationMetrics.rowOutput());
+    Assert.assertEquals(1, observedSegmentGenerationMetrics.numPersists());
+    Assert.assertEquals(0, observedSegmentGenerationMetrics.handOffCount());
   }
 
   @Test(timeout = 60_000L)
@@ -3457,6 +3477,14 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
     // Check segments in deep storage
     Assert.assertEquals(ImmutableList.of("b"), readSegmentColumn("dim1", 
publishedDescriptors.get(0)));
     Assert.assertEquals(ImmutableList.of("bb"), readSegmentColumn("dim1t", 
publishedDescriptors.get(0)));
+
+    emitter.verifyValue("ingest/segments/count", 1);
+
+    final SegmentGenerationMetrics observedSegmentGenerationMetrics = 
task.getRunner().getSegmentGenerationMetrics();
+    Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone());
+    Assert.assertEquals(1, observedSegmentGenerationMetrics.rowOutput());
+    Assert.assertEquals(1, observedSegmentGenerationMetrics.handOffCount());
+    
verifyPersistAndMergeTimeMetricsArePositive(observedSegmentGenerationMetrics);
   }
 
   public static class TestKafkaInputFormat implements InputFormat
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index bd2a8aa2712..69764a5d800 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -68,9 +68,6 @@ import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervi
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
-import org.apache.druid.java.util.emitter.EmittingLogger;
-import org.apache.druid.java.util.emitter.core.NoopEmitter;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.math.expr.ExprMacroTable;
 import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
 import org.apache.druid.query.DruidProcessingConfigTest;
@@ -86,6 +83,7 @@ import 
org.apache.druid.segment.incremental.InputRowFilterResult;
 import org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.segment.incremental.RowMeters;
 import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
 import org.apache.druid.segment.transform.ExpressionTransform;
 import org.apache.druid.segment.transform.TransformSpec;
 import org.apache.druid.timeline.DataSegment;
@@ -168,7 +166,6 @@ public class KinesisIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
   );
 
   private static KinesisRecordSupplier recordSupplier;
-  private static ServiceEmitter emitter;
 
   @Parameterized.Parameters(name = "{0}")
   public static Iterable<Object[]> constructorFeeder()
@@ -194,13 +191,6 @@ public class KinesisIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
   @BeforeClass
   public static void setupClass()
   {
-    emitter = new ServiceEmitter(
-        "service",
-        "host",
-        new NoopEmitter()
-    );
-    emitter.start();
-    EmittingLogger.registerEmitter(emitter);
     taskExec = MoreExecutors.listeningDecorator(
         Executors.newCachedThreadPool(
             Execs.makeThreadFactory("kinesis-task-test-%d")
@@ -252,7 +242,6 @@ public class KinesisIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
   {
     taskExec.shutdown();
     taskExec.awaitTermination(20, TimeUnit.MINUTES);
-    emitter.close();
   }
 
   private void waitUntil(KinesisIndexTask task, Predicate<KinesisIndexTask> 
predicate)
@@ -356,6 +345,13 @@ public class KinesisIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
         ),
         newDataSchemaMetadata()
     );
+
+    final SegmentGenerationMetrics observedSegmentGenerationMetrics = 
task.getRunner().getSegmentGenerationMetrics();
+    Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone());
+    Assert.assertEquals(3, observedSegmentGenerationMetrics.rowOutput());
+    Assert.assertEquals(2, observedSegmentGenerationMetrics.handOffCount());
+    Assert.assertEquals(2, observedSegmentGenerationMetrics.numPersists());
+    
verifyPersistAndMergeTimeMetricsArePositive(observedSegmentGenerationMetrics);
   }
 
   @Test(timeout = 120_000L)
@@ -753,6 +749,13 @@ public class KinesisIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
         new KinesisDataSourceMetadata(new 
SeekableStreamEndSequenceNumbers<>(STREAM, endOffsets)),
         newDataSchemaMetadata()
     );
+
+    final SegmentGenerationMetrics observedSegmentGenerationMetrics = 
task.getRunner().getSegmentGenerationMetrics();
+    Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone());
+    Assert.assertEquals(7, observedSegmentGenerationMetrics.rowOutput());
+    Assert.assertEquals(6, observedSegmentGenerationMetrics.handOffCount());
+    Assert.assertEquals(5, observedSegmentGenerationMetrics.numPersists());
+    
verifyPersistAndMergeTimeMetricsArePositive(observedSegmentGenerationMetrics);
   }
 
 
@@ -1032,6 +1035,12 @@ public class KinesisIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
         ),
         newDataSchemaMetadata()
     );
+    final SegmentGenerationMetrics observedSegmentGenerationMetrics = 
task.getRunner().getSegmentGenerationMetrics();
+    Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone());
+    Assert.assertEquals(3, observedSegmentGenerationMetrics.rowOutput());
+    Assert.assertEquals(2, observedSegmentGenerationMetrics.handOffCount());
+    Assert.assertEquals(2, observedSegmentGenerationMetrics.numPersists());
+    
verifyPersistAndMergeTimeMetricsArePositive(observedSegmentGenerationMetrics);
   }
 
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
index 19ff6ec4d0b..f0c273baa90 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
@@ -83,8 +83,10 @@ import 
org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.guava.Comparators;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.metrics.MonitorScheduler;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
 import org.apache.druid.metadata.DerbyMetadataStorageActionHandlerFactory;
 import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
 import org.apache.druid.metadata.TestDerbyConnector;
@@ -120,6 +122,7 @@ import 
org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
 import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
 import org.apache.druid.segment.metadata.SegmentSchemaManager;
 import org.apache.druid.segment.realtime.NoopChatHandlerProvider;
+import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
 import org.apache.druid.segment.realtime.appenderator.StreamAppenderator;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.coordination.DataSegmentServerAnnouncer;
@@ -134,7 +137,9 @@ import org.assertj.core.api.Assertions;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockSupport;
 import org.joda.time.Interval;
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.rules.TemporaryFolder;
 
@@ -198,6 +203,8 @@ public abstract class SeekableStreamIndexTaskTestBase 
extends EasyMockSupport
 
   protected final List<Task> runningTasks = new ArrayList<>();
   protected final LockGranularity lockGranularity;
+
+  protected StubServiceEmitter emitter;
   protected File directory;
   protected File reportsFile;
   protected TaskToolboxFactory toolboxFactory;
@@ -251,6 +258,20 @@ public abstract class SeekableStreamIndexTaskTestBase 
extends EasyMockSupport
     this.lockGranularity = lockGranularity;
   }
 
+  @Before
+  public void setupBase()
+  {
+    emitter = new StubServiceEmitter();
+    emitter.start();
+    EmittingLogger.registerEmitter(emitter);
+  }
+
+  @After
+  public void tearDownBase() throws IOException
+  {
+    emitter.close();
+  }
+
   protected static ByteEntity jb(
       String timestamp,
       String dim1,
@@ -408,6 +429,16 @@ public abstract class SeekableStreamIndexTaskTestBase 
extends EasyMockSupport
     return new SegmentDescriptor(interval, "fakeVersion", partitionNum);
   }
 
+  protected void 
verifyPersistAndMergeTimeMetricsArePositive(SegmentGenerationMetrics 
observedSegmentGenerationMetrics)
+  {
+    Assert.assertNotNull(observedSegmentGenerationMetrics);
+    Assert.assertTrue(observedSegmentGenerationMetrics.persistTimeMillis() > 
0);
+    Assert.assertTrue(observedSegmentGenerationMetrics.persistCpuTime() > 0);
+
+    Assert.assertTrue(observedSegmentGenerationMetrics.mergeTimeMillis() > 0);
+    Assert.assertTrue(observedSegmentGenerationMetrics.mergeCpuTime() > 0);
+  }
+
   protected void assertEqualsExceptVersion(
       List<SegmentDescriptorAndExpectedDim1Values> expectedDescriptors,
       List<SegmentDescriptor> actualDescriptors
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/SegmentGenerationMetrics.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/SegmentGenerationMetrics.java
index 2afc2b72831..97d347b0e3a 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/SegmentGenerationMetrics.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/SegmentGenerationMetrics.java
@@ -186,6 +186,21 @@ public class SegmentGenerationMetrics
     }
   }
 
+  public void setMergeTime(long elapsedMergeTimeMillis)
+  {
+    mergeTimeMillis.set(elapsedMergeTimeMillis);
+  }
+
+  public void setMergeCpuTime(long elapsedCpuTimeNanos)
+  {
+    mergeCpuTime.set(elapsedCpuTimeNanos);
+  }
+
+  public void setPersistCpuTime(long elpasedCpuTimeNanos)
+  {
+    persistCpuTime.set(elpasedCpuTimeNanos);
+  }
+
   public void markProcessingDone()
   {
     this.processingDone.set(true);
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
index 4caa5c90c50..852b79ce036 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
@@ -22,7 +22,6 @@ package org.apache.druid.segment.realtime.appenderator;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Stopwatch;
 import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
@@ -41,6 +40,7 @@ import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.RE;
+import org.apache.druid.java.util.common.Stopwatch;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.io.Closer;
@@ -69,6 +69,7 @@ import org.apache.druid.segment.realtime.FireHydrant;
 import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
 import org.apache.druid.segment.realtime.sink.Sink;
 import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.utils.JvmUtils;
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
@@ -586,6 +587,7 @@ public class BatchAppenderator implements Appenderator
             log.info("No indexes will be persisted");
           }
           final Stopwatch persistStopwatch = Stopwatch.createStarted();
+          final long startPersistCpuNanos = JvmUtils.safeGetThreadCpuTime();
           try {
             for (Pair<FireHydrant, SegmentIdWithShardSpec> pair : 
indexesToPersist) {
               metrics.incrementRowOutputCount(persistHydrant(pair.lhs, 
pair.rhs));
@@ -615,9 +617,10 @@ public class BatchAppenderator implements Appenderator
             throw e;
           }
           finally {
-            metrics.incrementNumPersists();
-            long persistMillis = 
persistStopwatch.elapsed(TimeUnit.MILLISECONDS);
+            metrics.setPersistCpuTime(JvmUtils.safeGetThreadCpuTime() - 
startPersistCpuNanos);
+            final long persistMillis = persistStopwatch.millisElapsed();
             metrics.incrementPersistTimeMillis(persistMillis);
+            metrics.incrementNumPersists();
             persistStopwatch.stop();
             // make sure no push can start while persisting:
             log.info("Persisted rows[%,d] and bytes[%,d] and removed all sinks 
& hydrants from memory in[%d] millis",
@@ -629,7 +632,7 @@ public class BatchAppenderator implements Appenderator
         }
     );
 
-    final long startDelay = runExecStopwatch.elapsed(TimeUnit.MILLISECONDS);
+    final long startDelay = runExecStopwatch.millisElapsed();
     metrics.incrementPersistBackPressureMillis(startDelay);
     if (startDelay > PERSIST_WARN_DELAY) {
       log.warn("Ingestion was throttled for [%,d] millis because persists were 
pending.", startDelay);
@@ -794,8 +797,9 @@ public class BatchAppenderator implements Appenderator
       }
 
       final File mergedFile;
-      final long mergeFinishTime;
-      final long startTime = System.nanoTime();
+      final Stopwatch mergeStopwatch = Stopwatch.createStarted();
+      final long mergeTimeMillis;
+      final long startMergeCpuNanos = JvmUtils.safeGetThreadCpuTime();
       List<QueryableIndex> indexes = new ArrayList<>();
       long rowsinMergedSegment = 0L;
       Closer closer = Closer.create();
@@ -824,10 +828,13 @@ public class BatchAppenderator implements Appenderator
             tuningConfig.getMaxColumnsToMerge()
         );
 
-        mergeFinishTime = System.nanoTime();
         metrics.incrementMergedRows(rowsinMergedSegment);
 
-        log.debug("Segment[%s] built in %,dms.", identifier, (mergeFinishTime 
- startTime) / 1000000);
+        metrics.setMergeCpuTime(JvmUtils.safeGetThreadCpuTime() - 
startMergeCpuNanos);
+        mergeTimeMillis = mergeStopwatch.millisElapsed();
+        metrics.setMergeTime(mergeTimeMillis);
+
+        log.debug("Segment[%s] built in %,dms.", identifier, mergeTimeMillis);
       }
       catch (Throwable t) {
         throw closer.rethrow(t);
@@ -836,6 +843,8 @@ public class BatchAppenderator implements Appenderator
         closer.close();
       }
 
+      final Stopwatch pushStopwatch = Stopwatch.createStarted();
+
       // dataSegmentPusher retries internally when appropriate; no need for 
retries here.
       final DataSegment segment = dataSegmentPusher.push(
           mergedFile,
@@ -864,7 +873,7 @@ public class BatchAppenderator implements Appenderator
       // cleanup, sink no longer needed
       removeDirectory(computePersistDir(identifier));
 
-      final long pushFinishTime = System.nanoTime();
+      final long pushTimeMillis = pushStopwatch.millisElapsed();
       metrics.incrementPushedRows(rowsinMergedSegment);
 
       log.info(
@@ -875,8 +884,8 @@ public class BatchAppenderator implements Appenderator
           identifier,
           segment.getSize(),
           indexes.size(),
-          (mergeFinishTime - startTime) / 1000000,
-          (pushFinishTime - mergeFinishTime) / 1000000,
+          mergeTimeMillis,
+          pushTimeMillis,
           objectMapper.writeValueAsString(segment.getLoadSpec())
       );
 
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
index 578b1f32da2..c95434cbab6 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
@@ -24,7 +24,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Stopwatch;
 import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
@@ -46,6 +45,7 @@ import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.RE;
+import org.apache.druid.java.util.common.Stopwatch;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
@@ -81,6 +81,7 @@ import org.apache.druid.segment.realtime.sink.Sink;
 import org.apache.druid.server.coordination.DataSegmentAnnouncer;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.utils.JvmUtils;
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
@@ -682,6 +683,7 @@ public class StreamAppenderator implements Appenderator
           @Override
           public Object call() throws IOException
           {
+            final long startPersistCpuNanos = JvmUtils.safeGetThreadCpuTime();
             try {
               setTaskThreadContext();
               for (Pair<FireHydrant, SegmentIdWithShardSpec> pair : 
indexesToPersist) {
@@ -747,15 +749,16 @@ public class StreamAppenderator implements Appenderator
               throw e;
             }
             finally {
+              metrics.setPersistCpuTime(JvmUtils.safeGetThreadCpuTime() - 
startPersistCpuNanos);
+              
metrics.incrementPersistTimeMillis(persistStopwatch.millisElapsed());
               metrics.incrementNumPersists();
-              
metrics.incrementPersistTimeMillis(persistStopwatch.elapsed(TimeUnit.MILLISECONDS));
               persistStopwatch.stop();
             }
           }
         }
     );
 
-    final long startDelay = runExecStopwatch.elapsed(TimeUnit.MILLISECONDS);
+    final long startDelay = runExecStopwatch.millisElapsed();
     metrics.incrementPersistBackPressureMillis(startDelay);
     if (startDelay > WARN_DELAY) {
       log.warn("Ingestion was throttled for [%,d] millis because persists were 
pending.", startDelay);
@@ -935,8 +938,9 @@ public class StreamAppenderator implements Appenderator
       }
 
       final File mergedFile;
-      final long mergeFinishTime;
-      final long startTime = System.nanoTime();
+      final Stopwatch mergeStopwatch = Stopwatch.createStarted();
+      final long mergeTimeMillis;
+      final long startMergeCpuNanos = JvmUtils.safeGetThreadCpuTime();
       List<QueryableIndex> indexes = new ArrayList<>();
       Closer closer = Closer.create();
       try {
@@ -961,9 +965,11 @@ public class StreamAppenderator implements Appenderator
             tuningConfig.getMaxColumnsToMerge()
         );
 
-        mergeFinishTime = System.nanoTime();
+        metrics.setMergeCpuTime(JvmUtils.safeGetThreadCpuTime() - 
startMergeCpuNanos);
+        mergeTimeMillis = mergeStopwatch.millisElapsed();
+        metrics.setMergeTime(mergeTimeMillis);
 
-        log.debug("Segment[%s] built in %,dms.", identifier, (mergeFinishTime 
- startTime) / 1000000);
+        log.debug("Segment[%s] built in %,dms.", identifier, mergeTimeMillis);
       }
       catch (Throwable t) {
         throw closer.rethrow(t);
@@ -972,6 +978,8 @@ public class StreamAppenderator implements Appenderator
         closer.close();
       }
 
+      final Stopwatch pushStopwatch = Stopwatch.createStarted();
+
       final DataSegment segmentToPush = sink.getSegment().withDimensions(
           IndexMerger.getMergedDimensionsFromQueryableIndexes(indexes, 
schema.getDimensionsSpec())
       );
@@ -979,8 +987,7 @@ public class StreamAppenderator implements Appenderator
       // dataSegmentPusher retries internally when appropriate; no need for 
retries here.
       final DataSegment segment = dataSegmentPusher.push(mergedFile, 
segmentToPush, useUniquePath);
 
-      final long pushFinishTime = System.nanoTime();
-
+      final long pushTimeMillis = pushStopwatch.millisElapsed();
       objectMapper.writeValue(descriptorFile, segment);
 
       log.info(
@@ -991,8 +998,8 @@ public class StreamAppenderator implements Appenderator
           identifier,
           segment.getSize(),
           indexes.size(),
-          (mergeFinishTime - startTime) / 1000000,
-          (pushFinishTime - mergeFinishTime) / 1000000,
+          mergeTimeMillis,
+          pushTimeMillis,
           objectMapper.writeValueAsString(segment.getLoadSpec())
       );
 
diff --git 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTest.java
 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTest.java
index b80e33dfa18..0db9bc65321 100644
--- 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTest.java
@@ -29,6 +29,7 @@ import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
+import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
 import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.apache.druid.timeline.partition.LinearShardSpec;
 import org.hamcrest.CoreMatchers;
@@ -127,6 +128,15 @@ public class BatchAppenderatorTest extends 
InitializedNullHandlingTest
           
segmentsAndCommitMetadata.getSegments().stream().sorted().collect(Collectors.toList())
       );
 
+      SegmentGenerationMetrics segmentGenerationMetrics = tester.getMetrics();
+      Assert.assertEquals(2, segmentGenerationMetrics.numPersists());
+      Assert.assertEquals(4, segmentGenerationMetrics.rowOutput());
+      Assert.assertTrue(segmentGenerationMetrics.persistTimeMillis() > 0);
+      Assert.assertTrue(segmentGenerationMetrics.persistCpuTime() > 0);
+
+      Assert.assertTrue(segmentGenerationMetrics.mergeTimeMillis() > 0);
+      Assert.assertTrue(segmentGenerationMetrics.mergeCpuTime() > 0);
+
       appenderator.close();
       Assert.assertTrue(appenderator.getSegments().isEmpty());
     }
@@ -207,6 +217,9 @@ public class BatchAppenderatorTest extends 
InitializedNullHandlingTest
           
ThrowableCauseMatcher.hasCause(ThrowableCauseMatcher.hasCause(ThrowableMessageMatcher.hasMessage(
               CoreMatchers.startsWith("Push failure test"))))
       );
+
+      SegmentGenerationMetrics segmentGenerationMetrics = tester.getMetrics();
+      Assert.assertEquals(1, segmentGenerationMetrics.failedHandoffs());
     }
   }
 
@@ -707,6 +720,15 @@ public class BatchAppenderatorTest extends 
InitializedNullHandlingTest
           ).stream().sorted().collect(Collectors.toList())
       );
       Assert.assertEquals(0, ((BatchAppenderator) 
appenderator).getRowsInMemory());
+
+      SegmentGenerationMetrics segmentGenerationMetrics = tester.getMetrics();
+      Assert.assertEquals(4, segmentGenerationMetrics.numPersists());
+      Assert.assertEquals(3, segmentGenerationMetrics.rowOutput());
+      Assert.assertTrue(segmentGenerationMetrics.persistTimeMillis() > 0);
+      Assert.assertTrue(segmentGenerationMetrics.persistCpuTime() > 0);
+
+      Assert.assertTrue(segmentGenerationMetrics.mergeTimeMillis() > 0);
+      Assert.assertTrue(segmentGenerationMetrics.mergeCpuTime() > 0);
       appenderator.close();
     }
   }
diff --git 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
index 22074f4f78f..91d8074c908 100644
--- 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
@@ -56,6 +56,7 @@ import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
 import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
+import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
 import org.apache.druid.segment.realtime.sink.Committers;
 import org.apache.druid.server.coordination.DataSegmentAnnouncer;
 import org.apache.druid.testing.InitializedNullHandlingTest;
@@ -183,6 +184,15 @@ public class StreamAppenderatorTest extends 
InitializedNullHandlingTest
       );
       Assert.assertEquals(sorted(tester.getPushedSegments()), 
sorted(segmentsAndCommitMetadata.getSegments()));
 
+      SegmentGenerationMetrics segmentGenerationMetrics = tester.getMetrics();
+      Assert.assertEquals(2, segmentGenerationMetrics.numPersists());
+      Assert.assertEquals(3, segmentGenerationMetrics.rowOutput());
+      Assert.assertTrue(segmentGenerationMetrics.persistTimeMillis() > 0);
+      Assert.assertTrue(segmentGenerationMetrics.persistCpuTime() > 0);
+
+      Assert.assertTrue(segmentGenerationMetrics.mergeTimeMillis() > 0);
+      Assert.assertTrue(segmentGenerationMetrics.mergeCpuTime() > 0);
+
       // clear
       appenderator.clear();
       Assert.assertTrue(appenderator.getSegments().isEmpty());
@@ -268,6 +278,8 @@ public class StreamAppenderatorTest extends 
InitializedNullHandlingTest
           
ThrowableCauseMatcher.hasCause(ThrowableCauseMatcher.hasCause(ThrowableMessageMatcher.hasMessage(
               CoreMatchers.startsWith("Push failure test"))))
       );
+      SegmentGenerationMetrics segmentGenerationMetrics = tester.getMetrics();
+      Assert.assertEquals(1, segmentGenerationMetrics.failedHandoffs());
     }
   }
 
diff --git 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java
 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java
index 06745e98a04..b8c1065812b 100644
--- 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java
+++ 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java
@@ -115,7 +115,8 @@ public class StreamAppenderatorTester implements 
AutoCloseable
       final ServiceEmitter serviceEmitter,
       final PolicyEnforcer policyEnforcer,
       final boolean releaseLocksOnHandoff,
-      final TaskIntervalUnlocker taskIntervalUnlocker
+      final TaskIntervalUnlocker taskIntervalUnlocker,
+      final SegmentGenerationMetrics segmentGenerationMetrics
   )
   {
     objectMapper = new DefaultObjectMapper();
@@ -165,7 +166,7 @@ public class StreamAppenderatorTester implements 
AutoCloseable
         releaseLocksOnHandoff
     );
 
-    metrics = new SegmentGenerationMetrics();
+    metrics = segmentGenerationMetrics == null ? new 
SegmentGenerationMetrics() : segmentGenerationMetrics;
     queryExecutor = Execs.singleThreaded("queryExecutor(%d)");
 
     IndexIO indexIO = new IndexIO(
@@ -364,6 +365,7 @@ public class StreamAppenderatorTester implements 
AutoCloseable
     private PolicyEnforcer policyEnforcer = NoopPolicyEnforcer.instance();
     private boolean releaseLocksOnHandoff;
     private TaskIntervalUnlocker taskIntervalUnlocker = interval -> {};
+    private SegmentGenerationMetrics segmentGenerationMetrics;
 
     public Builder maxRowsInMemory(final int maxRowsInMemory)
     {
@@ -389,6 +391,12 @@ public class StreamAppenderatorTester implements 
AutoCloseable
       return this;
     }
 
+    public Builder segmentGenerationMetrics(final SegmentGenerationMetrics 
segmentGenerationMetrics)
+    {
+      this.segmentGenerationMetrics = segmentGenerationMetrics;
+      return this;
+    }
+
     public Builder rowIngestionMeters(final RowIngestionMeters 
rowIngestionMeters)
     {
       this.rowIngestionMeters = rowIngestionMeters;
@@ -446,7 +454,8 @@ public class StreamAppenderatorTester implements 
AutoCloseable
           serviceEmitter,
           policyEnforcer,
           releaseLocksOnHandoff,
-          taskIntervalUnlocker
+          taskIntervalUnlocker,
+          segmentGenerationMetrics
       );
     }
 
@@ -468,7 +477,8 @@ public class StreamAppenderatorTester implements 
AutoCloseable
           serviceEmitter,
           policyEnforcer,
           releaseLocksOnHandoff,
-          taskIntervalUnlocker
+          taskIntervalUnlocker,
+          segmentGenerationMetrics
       );
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to