This is an automated email from the ASF dual-hosted git repository.

abhishekrb pushed a commit to branch missing_appendator_metrics
in repository https://gitbox.apache.org/repos/asf/druid.git

commit a1fda96a2d64fd55a9ea9f5140ec912fe5e50711
Author: Abhishek Balaji Radhakrishnan <[email protected]>
AuthorDate: Tue Dec 23 09:32:03 2025 -0500

    Batch appendator changes
---
 .../realtime/appenderator/BatchAppenderator.java   | 31 ++++++++++++++--------
 .../realtime/appenderator/StreamAppenderator.java  |  2 ++
 .../appenderator/BatchAppenderatorTest.java        | 22 +++++++++++++++
 .../appenderator/StreamAppenderatorTest.java       |  9 +++----
 4 files changed, 47 insertions(+), 17 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
index 4caa5c90c50..cdded12a4d4 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
@@ -22,7 +22,6 @@ package org.apache.druid.segment.realtime.appenderator;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Stopwatch;
 import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
@@ -41,6 +40,7 @@ import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.RE;
+import org.apache.druid.java.util.common.Stopwatch;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.io.Closer;
@@ -69,6 +69,7 @@ import org.apache.druid.segment.realtime.FireHydrant;
 import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
 import org.apache.druid.segment.realtime.sink.Sink;
 import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.utils.JvmUtils;
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
@@ -586,6 +587,7 @@ public class BatchAppenderator implements Appenderator
             log.info("No indexes will be persisted");
           }
           final Stopwatch persistStopwatch = Stopwatch.createStarted();
+          final long startPersistCpuNanos = JvmUtils.safeGetThreadCpuTime();
           try {
             for (Pair<FireHydrant, SegmentIdWithShardSpec> pair : 
indexesToPersist) {
               metrics.incrementRowOutputCount(persistHydrant(pair.lhs, 
pair.rhs));
@@ -616,9 +618,10 @@ public class BatchAppenderator implements Appenderator
           }
           finally {
             metrics.incrementNumPersists();
-            long persistMillis = 
persistStopwatch.elapsed(TimeUnit.MILLISECONDS);
-            metrics.incrementPersistTimeMillis(persistMillis);
+            final long persistMillis = persistStopwatch.millisElapsed();
             persistStopwatch.stop();
+            metrics.incrementPersistTimeMillis(persistMillis);
+            metrics.setPersistCpuTime(JvmUtils.safeGetThreadCpuTime() - 
startPersistCpuNanos);
             // make sure no push can start while persisting:
             log.info("Persisted rows[%,d] and bytes[%,d] and removed all sinks 
& hydrants from memory in[%d] millis",
                      numPersistedRows, bytesPersisted, persistMillis
@@ -629,7 +632,7 @@ public class BatchAppenderator implements Appenderator
         }
     );
 
-    final long startDelay = runExecStopwatch.elapsed(TimeUnit.MILLISECONDS);
+    final long startDelay = runExecStopwatch.millisElapsed();
     metrics.incrementPersistBackPressureMillis(startDelay);
     if (startDelay > PERSIST_WARN_DELAY) {
       log.warn("Ingestion was throttled for [%,d] millis because persists were 
pending.", startDelay);
@@ -794,8 +797,9 @@ public class BatchAppenderator implements Appenderator
       }
 
       final File mergedFile;
-      final long mergeFinishTime;
-      final long startTime = System.nanoTime();
+      final Stopwatch stopwatch = Stopwatch.createStarted();
+      final long mergeTimeMillis;
+      final long startMergeCpuNanos = JvmUtils.safeGetThreadCpuTime();
       List<QueryableIndex> indexes = new ArrayList<>();
       long rowsinMergedSegment = 0L;
       Closer closer = Closer.create();
@@ -824,10 +828,14 @@ public class BatchAppenderator implements Appenderator
             tuningConfig.getMaxColumnsToMerge()
         );
 
-        mergeFinishTime = System.nanoTime();
         metrics.incrementMergedRows(rowsinMergedSegment);
 
-        log.debug("Segment[%s] built in %,dms.", identifier, (mergeFinishTime 
- startTime) / 1000000);
+        mergeTimeMillis = stopwatch.millisElapsed();
+        stopwatch.restart();
+        metrics.setMergeTime(mergeTimeMillis);
+        metrics.setMergeCpuTime(JvmUtils.safeGetThreadCpuTime() - 
startMergeCpuNanos);
+
+        log.debug("Segment[%s] built in %,dms.", identifier, mergeTimeMillis);
       }
       catch (Throwable t) {
         throw closer.rethrow(t);
@@ -864,7 +872,8 @@ public class BatchAppenderator implements Appenderator
       // cleanup, sink no longer needed
       removeDirectory(computePersistDir(identifier));
 
-      final long pushFinishTime = System.nanoTime();
+      final long pushTimeMillis = stopwatch.millisElapsed();
+      stopwatch.stop();
       metrics.incrementPushedRows(rowsinMergedSegment);
 
       log.info(
@@ -875,8 +884,8 @@ public class BatchAppenderator implements Appenderator
           identifier,
           segment.getSize(),
           indexes.size(),
-          (mergeFinishTime - startTime) / 1000000,
-          (pushFinishTime - mergeFinishTime) / 1000000,
+          mergeTimeMillis,
+          pushTimeMillis,
           objectMapper.writeValueAsString(segment.getLoadSpec())
       );
 
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
index 09a940deacb..d8b748d9d6c 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
@@ -969,6 +969,7 @@ public class StreamAppenderator implements Appenderator
         stopwatch.restart();
         metrics.setMergeTime(mergeTimeMillis);
         metrics.setMergeCpuTime(JvmUtils.safeGetThreadCpuTime() - 
startMergeCpuNanos);
+
         log.debug("Segment[%s] built in %,dms.", identifier, mergeTimeMillis);
       }
       catch (Throwable t) {
@@ -986,6 +987,7 @@ public class StreamAppenderator implements Appenderator
       final DataSegment segment = dataSegmentPusher.push(mergedFile, 
segmentToPush, useUniquePath);
 
       final long pushTimeMillis = stopwatch.millisElapsed();
+      stopwatch.stop();
       objectMapper.writeValue(descriptorFile, segment);
 
       log.info(
diff --git 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTest.java
 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTest.java
index b80e33dfa18..0db9bc65321 100644
--- 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTest.java
@@ -29,6 +29,7 @@ import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
+import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
 import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.apache.druid.timeline.partition.LinearShardSpec;
 import org.hamcrest.CoreMatchers;
@@ -127,6 +128,15 @@ public class BatchAppenderatorTest extends 
InitializedNullHandlingTest
           
segmentsAndCommitMetadata.getSegments().stream().sorted().collect(Collectors.toList())
       );
 
+      SegmentGenerationMetrics segmentGenerationMetrics = tester.getMetrics();
+      Assert.assertEquals(2, segmentGenerationMetrics.numPersists());
+      Assert.assertEquals(4, segmentGenerationMetrics.rowOutput());
+      Assert.assertTrue(segmentGenerationMetrics.persistTimeMillis() > 0);
+      Assert.assertTrue(segmentGenerationMetrics.persistCpuTime() > 0);
+
+      Assert.assertTrue(segmentGenerationMetrics.mergeTimeMillis() > 0);
+      Assert.assertTrue(segmentGenerationMetrics.mergeCpuTime() > 0);
+
       appenderator.close();
       Assert.assertTrue(appenderator.getSegments().isEmpty());
     }
@@ -207,6 +217,9 @@ public class BatchAppenderatorTest extends 
InitializedNullHandlingTest
           
ThrowableCauseMatcher.hasCause(ThrowableCauseMatcher.hasCause(ThrowableMessageMatcher.hasMessage(
               CoreMatchers.startsWith("Push failure test"))))
       );
+
+      SegmentGenerationMetrics segmentGenerationMetrics = tester.getMetrics();
+      Assert.assertEquals(1, segmentGenerationMetrics.failedHandoffs());
     }
   }
 
@@ -707,6 +720,15 @@ public class BatchAppenderatorTest extends 
InitializedNullHandlingTest
           ).stream().sorted().collect(Collectors.toList())
       );
       Assert.assertEquals(0, ((BatchAppenderator) 
appenderator).getRowsInMemory());
+
+      SegmentGenerationMetrics segmentGenerationMetrics = tester.getMetrics();
+      Assert.assertEquals(4, segmentGenerationMetrics.numPersists());
+      Assert.assertEquals(3, segmentGenerationMetrics.rowOutput());
+      Assert.assertTrue(segmentGenerationMetrics.persistTimeMillis() > 0);
+      Assert.assertTrue(segmentGenerationMetrics.persistCpuTime() > 0);
+
+      Assert.assertTrue(segmentGenerationMetrics.mergeTimeMillis() > 0);
+      Assert.assertTrue(segmentGenerationMetrics.mergeCpuTime() > 0);
       appenderator.close();
     }
   }
diff --git 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
index bf5e8bf0be3..91d8074c908 100644
--- 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
@@ -103,10 +103,8 @@ public class StreamAppenderatorTest extends 
InitializedNullHandlingTest
   @Test
   public void testSimpleIngestion() throws Exception
   {
-    final SegmentGenerationMetrics segmentGenerationMetrics = new 
SegmentGenerationMetrics();
     try (final StreamAppenderatorTester tester =
              new StreamAppenderatorTester.Builder().maxRowsInMemory(2)
-                                                   
.segmentGenerationMetrics(segmentGenerationMetrics)
                                                    
.basePersistDirectory(temporaryFolder.newFolder())
                                                    .build()) {
       final Appenderator appenderator = tester.getAppenderator();
@@ -186,6 +184,7 @@ public class StreamAppenderatorTest extends 
InitializedNullHandlingTest
       );
       Assert.assertEquals(sorted(tester.getPushedSegments()), 
sorted(segmentsAndCommitMetadata.getSegments()));
 
+      SegmentGenerationMetrics segmentGenerationMetrics = tester.getMetrics();
       Assert.assertEquals(2, segmentGenerationMetrics.numPersists());
       Assert.assertEquals(3, segmentGenerationMetrics.rowOutput());
       Assert.assertTrue(segmentGenerationMetrics.persistTimeMillis() > 0);
@@ -203,12 +202,10 @@ public class StreamAppenderatorTest extends 
InitializedNullHandlingTest
   @Test
   public void testPushFailure() throws Exception
   {
-    final SegmentGenerationMetrics segmentGenerationMetrics = new 
SegmentGenerationMetrics();
     try (final StreamAppenderatorTester tester =
              new StreamAppenderatorTester.Builder().maxRowsInMemory(2)
                                                    
.basePersistDirectory(temporaryFolder.newFolder())
                                                    .enablePushFailure(true)
-                                                   
.segmentGenerationMetrics(segmentGenerationMetrics)
                                                    .build()) {
       final Appenderator appenderator = tester.getAppenderator();
       boolean thrown;
@@ -281,9 +278,9 @@ public class StreamAppenderatorTest extends 
InitializedNullHandlingTest
           
ThrowableCauseMatcher.hasCause(ThrowableCauseMatcher.hasCause(ThrowableMessageMatcher.hasMessage(
               CoreMatchers.startsWith("Push failure test"))))
       );
+      SegmentGenerationMetrics segmentGenerationMetrics = tester.getMetrics();
+      Assert.assertEquals(1, segmentGenerationMetrics.failedHandoffs());
     }
-
-    Assert.assertEquals(1, segmentGenerationMetrics.failedHandoffs());
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to