This is an automated email from the ASF dual-hosted git repository. abhishekrb pushed a commit to branch missing_appendator_metrics in repository https://gitbox.apache.org/repos/asf/druid.git
commit a1fda96a2d64fd55a9ea9f5140ec912fe5e50711 Author: Abhishek Balaji Radhakrishnan <[email protected]> AuthorDate: Tue Dec 23 09:32:03 2025 -0500 Batch appendator changes --- .../realtime/appenderator/BatchAppenderator.java | 31 ++++++++++++++-------- .../realtime/appenderator/StreamAppenderator.java | 2 ++ .../appenderator/BatchAppenderatorTest.java | 22 +++++++++++++++ .../appenderator/StreamAppenderatorTest.java | 9 +++---- 4 files changed, 47 insertions(+), 17 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java index 4caa5c90c50..cdded12a4d4 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java @@ -22,7 +22,6 @@ package org.apache.druid.segment.realtime.appenderator; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.base.Stopwatch; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -41,6 +40,7 @@ import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.RE; +import org.apache.druid.java.util.common.Stopwatch; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.io.Closer; @@ -69,6 +69,7 @@ import org.apache.druid.segment.realtime.FireHydrant; import org.apache.druid.segment.realtime.SegmentGenerationMetrics; import org.apache.druid.segment.realtime.sink.Sink; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.utils.JvmUtils; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -586,6 +587,7 @@ public class BatchAppenderator implements Appenderator log.info("No indexes will be persisted"); } final Stopwatch persistStopwatch = Stopwatch.createStarted(); + final long startPersistCpuNanos = JvmUtils.safeGetThreadCpuTime(); try { for (Pair<FireHydrant, SegmentIdWithShardSpec> pair : indexesToPersist) { metrics.incrementRowOutputCount(persistHydrant(pair.lhs, pair.rhs)); @@ -616,9 +618,10 @@ public class BatchAppenderator implements Appenderator } finally { metrics.incrementNumPersists(); - long persistMillis = persistStopwatch.elapsed(TimeUnit.MILLISECONDS); - metrics.incrementPersistTimeMillis(persistMillis); + final long persistMillis = persistStopwatch.millisElapsed(); persistStopwatch.stop(); + metrics.incrementPersistTimeMillis(persistMillis); + metrics.setPersistCpuTime(JvmUtils.safeGetThreadCpuTime() - startPersistCpuNanos); // make sure no push can start while persisting: log.info("Persisted rows[%,d] and bytes[%,d] and removed all sinks & hydrants from memory in[%d] millis", numPersistedRows, bytesPersisted, persistMillis @@ -629,7 +632,7 @@ public class BatchAppenderator implements Appenderator } ); - final long startDelay = runExecStopwatch.elapsed(TimeUnit.MILLISECONDS); + final long startDelay = runExecStopwatch.millisElapsed(); metrics.incrementPersistBackPressureMillis(startDelay); if (startDelay > PERSIST_WARN_DELAY) { log.warn("Ingestion was throttled for [%,d] millis because persists were pending.", startDelay); @@ -794,8 +797,9 @@ public class BatchAppenderator implements Appenderator } final File mergedFile; - final long mergeFinishTime; - final long startTime = System.nanoTime(); + final Stopwatch stopwatch = Stopwatch.createStarted(); + final long mergeTimeMillis; + final long startMergeCpuNanos = JvmUtils.safeGetThreadCpuTime(); List<QueryableIndex> indexes = new ArrayList<>(); long rowsinMergedSegment = 0L; Closer closer = Closer.create(); @@ -824,10 +828,14 @@ public class BatchAppenderator implements Appenderator tuningConfig.getMaxColumnsToMerge() ); - mergeFinishTime = System.nanoTime(); metrics.incrementMergedRows(rowsinMergedSegment); - log.debug("Segment[%s] built in %,dms.", identifier, (mergeFinishTime - startTime) / 1000000); + mergeTimeMillis = stopwatch.millisElapsed(); + stopwatch.restart(); + metrics.setMergeTime(mergeTimeMillis); + metrics.setMergeCpuTime(JvmUtils.safeGetThreadCpuTime() - startMergeCpuNanos); + + log.debug("Segment[%s] built in %,dms.", identifier, mergeTimeMillis); } catch (Throwable t) { throw closer.rethrow(t); @@ -864,7 +872,8 @@ public class BatchAppenderator implements Appenderator // cleanup, sink no longer needed removeDirectory(computePersistDir(identifier)); - final long pushFinishTime = System.nanoTime(); + final long pushTimeMillis = stopwatch.millisElapsed(); + stopwatch.stop(); metrics.incrementPushedRows(rowsinMergedSegment); log.info( @@ -875,8 +884,8 @@ public class BatchAppenderator implements Appenderator identifier, segment.getSize(), indexes.size(), - (mergeFinishTime - startTime) / 1000000, - (pushFinishTime - mergeFinishTime) / 1000000, + mergeTimeMillis, + pushTimeMillis, objectMapper.writeValueAsString(segment.getLoadSpec()) ); diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java index 09a940deacb..d8b748d9d6c 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java @@ -969,6 +969,7 @@ public class StreamAppenderator implements Appenderator stopwatch.restart(); metrics.setMergeTime(mergeTimeMillis); metrics.setMergeCpuTime(JvmUtils.safeGetThreadCpuTime() - startMergeCpuNanos); + log.debug("Segment[%s] built in %,dms.", identifier, mergeTimeMillis); } catch (Throwable t) { @@ -986,6 +987,7 @@ public class StreamAppenderator implements Appenderator final DataSegment segment = dataSegmentPusher.push(mergedFile, segmentToPush, useUniquePath); final long pushTimeMillis = stopwatch.millisElapsed(); + stopwatch.stop(); objectMapper.writeValue(descriptorFile, segment); log.info( diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTest.java index b80e33dfa18..0db9bc65321 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTest.java @@ -29,6 +29,7 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.segment.incremental.RowIngestionMeters; import org.apache.druid.segment.incremental.SimpleRowIngestionMeters; +import org.apache.druid.segment.realtime.SegmentGenerationMetrics; import org.apache.druid.testing.InitializedNullHandlingTest; import org.apache.druid.timeline.partition.LinearShardSpec; import org.hamcrest.CoreMatchers; @@ -127,6 +128,15 @@ public class BatchAppenderatorTest extends InitializedNullHandlingTest segmentsAndCommitMetadata.getSegments().stream().sorted().collect(Collectors.toList()) ); + SegmentGenerationMetrics segmentGenerationMetrics = tester.getMetrics(); + Assert.assertEquals(2, segmentGenerationMetrics.numPersists()); + Assert.assertEquals(4, segmentGenerationMetrics.rowOutput()); + Assert.assertTrue(segmentGenerationMetrics.persistTimeMillis() > 0); + Assert.assertTrue(segmentGenerationMetrics.persistCpuTime() > 0); + + Assert.assertTrue(segmentGenerationMetrics.mergeTimeMillis() > 0); + Assert.assertTrue(segmentGenerationMetrics.mergeCpuTime() > 0); + appenderator.close(); Assert.assertTrue(appenderator.getSegments().isEmpty()); } @@ -207,6 +217,9 @@ public class BatchAppenderatorTest extends InitializedNullHandlingTest ThrowableCauseMatcher.hasCause(ThrowableCauseMatcher.hasCause(ThrowableMessageMatcher.hasMessage( CoreMatchers.startsWith("Push failure test")))) ); + + SegmentGenerationMetrics segmentGenerationMetrics = tester.getMetrics(); + Assert.assertEquals(1, segmentGenerationMetrics.failedHandoffs()); } } @@ -707,6 +720,15 @@ public class BatchAppenderatorTest extends InitializedNullHandlingTest ).stream().sorted().collect(Collectors.toList()) ); Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory()); + + SegmentGenerationMetrics segmentGenerationMetrics = tester.getMetrics(); + Assert.assertEquals(4, segmentGenerationMetrics.numPersists()); + Assert.assertEquals(3, segmentGenerationMetrics.rowOutput()); + Assert.assertTrue(segmentGenerationMetrics.persistTimeMillis() > 0); + Assert.assertTrue(segmentGenerationMetrics.persistCpuTime() > 0); + + Assert.assertTrue(segmentGenerationMetrics.mergeTimeMillis() > 0); + Assert.assertTrue(segmentGenerationMetrics.mergeCpuTime() > 0); appenderator.close(); } } diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java index bf5e8bf0be3..91d8074c908 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java @@ -103,10 +103,8 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest @Test public void testSimpleIngestion() throws Exception { - final SegmentGenerationMetrics segmentGenerationMetrics = new SegmentGenerationMetrics(); try (final StreamAppenderatorTester tester = new StreamAppenderatorTester.Builder().maxRowsInMemory(2) - .segmentGenerationMetrics(segmentGenerationMetrics) .basePersistDirectory(temporaryFolder.newFolder()) .build()) { final Appenderator appenderator = tester.getAppenderator(); @@ -186,6 +184,7 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest ); Assert.assertEquals(sorted(tester.getPushedSegments()), sorted(segmentsAndCommitMetadata.getSegments())); + SegmentGenerationMetrics segmentGenerationMetrics = tester.getMetrics(); Assert.assertEquals(2, segmentGenerationMetrics.numPersists()); Assert.assertEquals(3, segmentGenerationMetrics.rowOutput()); Assert.assertTrue(segmentGenerationMetrics.persistTimeMillis() > 0); @@ -203,12 +202,10 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest @Test public void testPushFailure() throws Exception { - final SegmentGenerationMetrics segmentGenerationMetrics = new SegmentGenerationMetrics(); try (final StreamAppenderatorTester tester = new StreamAppenderatorTester.Builder().maxRowsInMemory(2) .basePersistDirectory(temporaryFolder.newFolder()) .enablePushFailure(true) - .segmentGenerationMetrics(segmentGenerationMetrics) .build()) { final Appenderator appenderator = tester.getAppenderator(); boolean thrown; @@ -281,9 +278,9 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest ThrowableCauseMatcher.hasCause(ThrowableCauseMatcher.hasCause(ThrowableMessageMatcher.hasMessage( CoreMatchers.startsWith("Push failure test")))) ); + SegmentGenerationMetrics segmentGenerationMetrics = tester.getMetrics(); + Assert.assertEquals(1, segmentGenerationMetrics.failedHandoffs()); } - - Assert.assertEquals(1, segmentGenerationMetrics.failedHandoffs()); } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
