This is an automated email from the ASF dual-hosted git repository.
abhishekrb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 47d6f34643c Emit missing `ingest/merge/time`, `ingest/merge/cpu` and
`ingest/persists/cpu` metrics (#18866)
47d6f34643c is described below
commit 47d6f34643c42e469d4d49dfab08e2ca0e8b0412
Author: Abhishek Radhakrishnan <[email protected]>
AuthorDate: Fri Dec 26 12:22:31 2025 -0500
Emit missing `ingest/merge/time`, `ingest/merge/cpu` and
`ingest/persists/cpu` metrics (#18866)
The metrics ingest/merge/time, ingest/merge/cpu and ingest/persists/cpu
metrics have been documented but were previously reported as zero because they
were not set in the streaming and batch appendators (it probably regressed in a
refactor).
This patch now correctly reports ingest/merge/time, ingest/merge/cpu and
ingest/persists/cpu metrics for streaming and batch ingestion tasks.
Also, cleaned up KafkaIndexTaskTest and KinesisIndexTaskTest by
initializing a StubServiceEmitter as a non-static member in the base class
SeekableStreamIndexTaskTestBase so it can be used by each unit test as needed.
---
.../druid/indexing/kafka/KafkaIndexTaskTest.java | 60 ++++++++++++++++------
.../indexing/kinesis/KinesisIndexTaskTest.java | 33 +++++++-----
.../SeekableStreamIndexTaskTestBase.java | 31 +++++++++++
.../segment/realtime/SegmentGenerationMetrics.java | 15 ++++++
.../realtime/appenderator/BatchAppenderator.java | 31 +++++++----
.../realtime/appenderator/StreamAppenderator.java | 29 +++++++----
.../appenderator/BatchAppenderatorTest.java | 22 ++++++++
.../appenderator/StreamAppenderatorTest.java | 12 +++++
.../appenderator/StreamAppenderatorTester.java | 18 +++++--
9 files changed, 197 insertions(+), 54 deletions(-)
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index cb359896ee2..46b101055c5 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -88,9 +88,6 @@ import
org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.ParseException;
-import org.apache.druid.java.util.emitter.EmittingLogger;
-import org.apache.druid.java.util.emitter.core.NoopEmitter;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
@@ -118,6 +115,7 @@ import
org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
import org.apache.druid.segment.incremental.RowMeters;
import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
import org.apache.druid.segment.transform.ExpressionTransform;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec;
@@ -196,9 +194,7 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
private static TestingCluster zkServer;
private static TestBroker kafkaServer;
- private static ServiceEmitter emitter;
private static int topicPostfix;
-
static final Module TEST_MODULE = new
SimpleModule("kafkaTestModule").registerSubtypes(
new NamedType(TestKafkaInputFormat.class, "testKafkaInputFormat"),
new NamedType(TestKafkaFormatWithMalformedDataDetection.class,
"testKafkaFormatWithMalformedDataDetection")
@@ -288,14 +284,6 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
@BeforeClass
public static void setupClass() throws Exception
{
- emitter = new ServiceEmitter(
- "service",
- "host",
- new NoopEmitter()
- );
- emitter.start();
- EmittingLogger.registerEmitter(emitter);
-
zkServer = new TestingCluster(1);
zkServer.start();
@@ -354,8 +342,6 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
zkServer.stop();
zkServer = null;
-
- emitter.close();
}
@Test(timeout = 60_000L)
@@ -388,7 +374,6 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
// Wait for task to exit
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(2,
5)).totalProcessed(3));
-
Assert.assertTrue(task.getRunner().getSegmentGenerationMetrics().isProcessingDone());
// Check published metadata and segments in deep storage
assertEqualsExceptVersion(
@@ -402,6 +387,12 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
new KafkaDataSourceMetadata(new
SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(new
KafkaTopicPartition(false, topic, 0), 5L))),
newDataSchemaMetadata()
);
+
+ final SegmentGenerationMetrics observedSegmentGenerationMetrics =
task.getRunner().getSegmentGenerationMetrics();
+ Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone());
+ Assert.assertEquals(3, observedSegmentGenerationMetrics.rowOutput());
+ Assert.assertEquals(2, observedSegmentGenerationMetrics.handOffCount());
+
verifyPersistAndMergeTimeMetricsArePositive(observedSegmentGenerationMetrics);
}
@Test(timeout = 60_000L)
@@ -450,6 +441,12 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
Assert.assertEquals(dimensionsSpec.getDimensionNames().get(i),
segment.getDimensions().get(i));
}
}
+
+ final SegmentGenerationMetrics observedSegmentGenerationMetrics =
task.getRunner().getSegmentGenerationMetrics();
+ Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone());
+ Assert.assertEquals(3, observedSegmentGenerationMetrics.rowOutput());
+ Assert.assertEquals(2, observedSegmentGenerationMetrics.handOffCount());
+
verifyPersistAndMergeTimeMetricsArePositive(observedSegmentGenerationMetrics);
}
@Test(timeout = 60_000L)
@@ -739,6 +736,13 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
),
newDataSchemaMetadata()
);
+
+ final SegmentGenerationMetrics observedSegmentGenerationMetrics =
task.getRunner().getSegmentGenerationMetrics();
+ Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone());
+ Assert.assertEquals(8, observedSegmentGenerationMetrics.rowOutput());
+ Assert.assertEquals(7, observedSegmentGenerationMetrics.handOffCount());
+ Assert.assertEquals(4, observedSegmentGenerationMetrics.numPersists());
+
verifyPersistAndMergeTimeMetricsArePositive(observedSegmentGenerationMetrics);
}
@Test(timeout = 60_000L)
@@ -1698,6 +1702,14 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
"{timestamp=246140482-04-24T15:36:27.903Z, dim1=x, dim2=z, dimLong=10,
dimFloat=20.0, met1=1.0}"
);
Assert.assertEquals(expectedInputs, parseExceptionReport.getInputs());
+
+ emitter.verifyValue("ingest/segments/count", 4);
+
+ final SegmentGenerationMetrics observedSegmentGenerationMetrics =
task.getRunner().getSegmentGenerationMetrics();
+ Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone());
+ Assert.assertEquals(7, observedSegmentGenerationMetrics.rowOutput());
+ Assert.assertEquals(4, observedSegmentGenerationMetrics.handOffCount());
+
verifyPersistAndMergeTimeMetricsArePositive(observedSegmentGenerationMetrics);
}
@Test(timeout = 60_000L)
@@ -1775,6 +1787,14 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
List<String> expectedInputs = Arrays.asList("", "unparseable");
Assert.assertEquals(expectedInputs, parseExceptionReport.getInputs());
+
+ emitter.verifyNotEmitted("ingest/segments/count");
+
+ final SegmentGenerationMetrics observedSegmentGenerationMetrics =
task.getRunner().getSegmentGenerationMetrics();
+ Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone());
+ Assert.assertEquals(3, observedSegmentGenerationMetrics.rowOutput());
+ Assert.assertEquals(1, observedSegmentGenerationMetrics.numPersists());
+ Assert.assertEquals(0, observedSegmentGenerationMetrics.handOffCount());
}
@Test(timeout = 60_000L)
@@ -3457,6 +3477,14 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
// Check segments in deep storage
Assert.assertEquals(ImmutableList.of("b"), readSegmentColumn("dim1",
publishedDescriptors.get(0)));
Assert.assertEquals(ImmutableList.of("bb"), readSegmentColumn("dim1t",
publishedDescriptors.get(0)));
+
+ emitter.verifyValue("ingest/segments/count", 1);
+
+ final SegmentGenerationMetrics observedSegmentGenerationMetrics =
task.getRunner().getSegmentGenerationMetrics();
+ Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone());
+ Assert.assertEquals(1, observedSegmentGenerationMetrics.rowOutput());
+ Assert.assertEquals(1, observedSegmentGenerationMetrics.handOffCount());
+
verifyPersistAndMergeTimeMetricsArePositive(observedSegmentGenerationMetrics);
}
public static class TestKafkaInputFormat implements InputFormat
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index bd2a8aa2712..69764a5d800 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -68,9 +68,6 @@ import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervi
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
-import org.apache.druid.java.util.emitter.EmittingLogger;
-import org.apache.druid.java.util.emitter.core.NoopEmitter;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
import org.apache.druid.query.DruidProcessingConfigTest;
@@ -86,6 +83,7 @@ import
org.apache.druid.segment.incremental.InputRowFilterResult;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.incremental.RowMeters;
import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
import org.apache.druid.segment.transform.ExpressionTransform;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.timeline.DataSegment;
@@ -168,7 +166,6 @@ public class KinesisIndexTaskTest extends
SeekableStreamIndexTaskTestBase
);
private static KinesisRecordSupplier recordSupplier;
- private static ServiceEmitter emitter;
@Parameterized.Parameters(name = "{0}")
public static Iterable<Object[]> constructorFeeder()
@@ -194,13 +191,6 @@ public class KinesisIndexTaskTest extends
SeekableStreamIndexTaskTestBase
@BeforeClass
public static void setupClass()
{
- emitter = new ServiceEmitter(
- "service",
- "host",
- new NoopEmitter()
- );
- emitter.start();
- EmittingLogger.registerEmitter(emitter);
taskExec = MoreExecutors.listeningDecorator(
Executors.newCachedThreadPool(
Execs.makeThreadFactory("kinesis-task-test-%d")
@@ -252,7 +242,6 @@ public class KinesisIndexTaskTest extends
SeekableStreamIndexTaskTestBase
{
taskExec.shutdown();
taskExec.awaitTermination(20, TimeUnit.MINUTES);
- emitter.close();
}
private void waitUntil(KinesisIndexTask task, Predicate<KinesisIndexTask>
predicate)
@@ -356,6 +345,13 @@ public class KinesisIndexTaskTest extends
SeekableStreamIndexTaskTestBase
),
newDataSchemaMetadata()
);
+
+ final SegmentGenerationMetrics observedSegmentGenerationMetrics =
task.getRunner().getSegmentGenerationMetrics();
+ Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone());
+ Assert.assertEquals(3, observedSegmentGenerationMetrics.rowOutput());
+ Assert.assertEquals(2, observedSegmentGenerationMetrics.handOffCount());
+ Assert.assertEquals(2, observedSegmentGenerationMetrics.numPersists());
+
verifyPersistAndMergeTimeMetricsArePositive(observedSegmentGenerationMetrics);
}
@Test(timeout = 120_000L)
@@ -753,6 +749,13 @@ public class KinesisIndexTaskTest extends
SeekableStreamIndexTaskTestBase
new KinesisDataSourceMetadata(new
SeekableStreamEndSequenceNumbers<>(STREAM, endOffsets)),
newDataSchemaMetadata()
);
+
+ final SegmentGenerationMetrics observedSegmentGenerationMetrics =
task.getRunner().getSegmentGenerationMetrics();
+ Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone());
+ Assert.assertEquals(7, observedSegmentGenerationMetrics.rowOutput());
+ Assert.assertEquals(6, observedSegmentGenerationMetrics.handOffCount());
+ Assert.assertEquals(5, observedSegmentGenerationMetrics.numPersists());
+
verifyPersistAndMergeTimeMetricsArePositive(observedSegmentGenerationMetrics);
}
@@ -1032,6 +1035,12 @@ public class KinesisIndexTaskTest extends
SeekableStreamIndexTaskTestBase
),
newDataSchemaMetadata()
);
+ final SegmentGenerationMetrics observedSegmentGenerationMetrics =
task.getRunner().getSegmentGenerationMetrics();
+ Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone());
+ Assert.assertEquals(3, observedSegmentGenerationMetrics.rowOutput());
+ Assert.assertEquals(2, observedSegmentGenerationMetrics.handOffCount());
+ Assert.assertEquals(2, observedSegmentGenerationMetrics.numPersists());
+
verifyPersistAndMergeTimeMetricsArePositive(observedSegmentGenerationMetrics);
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
index 19ff6ec4d0b..f0c273baa90 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
@@ -83,8 +83,10 @@ import
org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.MonitorScheduler;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.metadata.DerbyMetadataStorageActionHandlerFactory;
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import org.apache.druid.metadata.TestDerbyConnector;
@@ -120,6 +122,7 @@ import
org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.metadata.SegmentSchemaManager;
import org.apache.druid.segment.realtime.NoopChatHandlerProvider;
+import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
import org.apache.druid.segment.realtime.appenderator.StreamAppenderator;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.DataSegmentServerAnnouncer;
@@ -134,7 +137,9 @@ import org.assertj.core.api.Assertions;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.joda.time.Interval;
+import org.junit.After;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
@@ -198,6 +203,8 @@ public abstract class SeekableStreamIndexTaskTestBase
extends EasyMockSupport
protected final List<Task> runningTasks = new ArrayList<>();
protected final LockGranularity lockGranularity;
+
+ protected StubServiceEmitter emitter;
protected File directory;
protected File reportsFile;
protected TaskToolboxFactory toolboxFactory;
@@ -251,6 +258,20 @@ public abstract class SeekableStreamIndexTaskTestBase
extends EasyMockSupport
this.lockGranularity = lockGranularity;
}
+ @Before
+ public void setupBase()
+ {
+ emitter = new StubServiceEmitter();
+ emitter.start();
+ EmittingLogger.registerEmitter(emitter);
+ }
+
+ @After
+ public void tearDownBase() throws IOException
+ {
+ emitter.close();
+ }
+
protected static ByteEntity jb(
String timestamp,
String dim1,
@@ -408,6 +429,16 @@ public abstract class SeekableStreamIndexTaskTestBase
extends EasyMockSupport
return new SegmentDescriptor(interval, "fakeVersion", partitionNum);
}
+ protected void
verifyPersistAndMergeTimeMetricsArePositive(SegmentGenerationMetrics
observedSegmentGenerationMetrics)
+ {
+ Assert.assertNotNull(observedSegmentGenerationMetrics);
+ Assert.assertTrue(observedSegmentGenerationMetrics.persistTimeMillis() >
0);
+ Assert.assertTrue(observedSegmentGenerationMetrics.persistCpuTime() > 0);
+
+ Assert.assertTrue(observedSegmentGenerationMetrics.mergeTimeMillis() > 0);
+ Assert.assertTrue(observedSegmentGenerationMetrics.mergeCpuTime() > 0);
+ }
+
protected void assertEqualsExceptVersion(
List<SegmentDescriptorAndExpectedDim1Values> expectedDescriptors,
List<SegmentDescriptor> actualDescriptors
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/SegmentGenerationMetrics.java
b/server/src/main/java/org/apache/druid/segment/realtime/SegmentGenerationMetrics.java
index 2afc2b72831..97d347b0e3a 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/SegmentGenerationMetrics.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/SegmentGenerationMetrics.java
@@ -186,6 +186,21 @@ public class SegmentGenerationMetrics
}
}
+ public void setMergeTime(long elapsedMergeTimeMillis)
+ {
+ mergeTimeMillis.set(elapsedMergeTimeMillis);
+ }
+
+ public void setMergeCpuTime(long elapsedCpuTimeNanos)
+ {
+ mergeCpuTime.set(elapsedCpuTimeNanos);
+ }
+
+ public void setPersistCpuTime(long elpasedCpuTimeNanos)
+ {
+ persistCpuTime.set(elpasedCpuTimeNanos);
+ }
+
public void markProcessingDone()
{
this.processingDone.set(true);
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
index 4caa5c90c50..852b79ce036 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
@@ -22,7 +22,6 @@ package org.apache.druid.segment.realtime.appenderator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -41,6 +40,7 @@ import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.RE;
+import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.io.Closer;
@@ -69,6 +69,7 @@ import org.apache.druid.segment.realtime.FireHydrant;
import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
import org.apache.druid.segment.realtime.sink.Sink;
import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.utils.JvmUtils;
import org.joda.time.Interval;
import javax.annotation.Nullable;
@@ -586,6 +587,7 @@ public class BatchAppenderator implements Appenderator
log.info("No indexes will be persisted");
}
final Stopwatch persistStopwatch = Stopwatch.createStarted();
+ final long startPersistCpuNanos = JvmUtils.safeGetThreadCpuTime();
try {
for (Pair<FireHydrant, SegmentIdWithShardSpec> pair :
indexesToPersist) {
metrics.incrementRowOutputCount(persistHydrant(pair.lhs,
pair.rhs));
@@ -615,9 +617,10 @@ public class BatchAppenderator implements Appenderator
throw e;
}
finally {
- metrics.incrementNumPersists();
- long persistMillis =
persistStopwatch.elapsed(TimeUnit.MILLISECONDS);
+ metrics.setPersistCpuTime(JvmUtils.safeGetThreadCpuTime() -
startPersistCpuNanos);
+ final long persistMillis = persistStopwatch.millisElapsed();
metrics.incrementPersistTimeMillis(persistMillis);
+ metrics.incrementNumPersists();
persistStopwatch.stop();
// make sure no push can start while persisting:
log.info("Persisted rows[%,d] and bytes[%,d] and removed all sinks
& hydrants from memory in[%d] millis",
@@ -629,7 +632,7 @@ public class BatchAppenderator implements Appenderator
}
);
- final long startDelay = runExecStopwatch.elapsed(TimeUnit.MILLISECONDS);
+ final long startDelay = runExecStopwatch.millisElapsed();
metrics.incrementPersistBackPressureMillis(startDelay);
if (startDelay > PERSIST_WARN_DELAY) {
log.warn("Ingestion was throttled for [%,d] millis because persists were
pending.", startDelay);
@@ -794,8 +797,9 @@ public class BatchAppenderator implements Appenderator
}
final File mergedFile;
- final long mergeFinishTime;
- final long startTime = System.nanoTime();
+ final Stopwatch mergeStopwatch = Stopwatch.createStarted();
+ final long mergeTimeMillis;
+ final long startMergeCpuNanos = JvmUtils.safeGetThreadCpuTime();
List<QueryableIndex> indexes = new ArrayList<>();
long rowsinMergedSegment = 0L;
Closer closer = Closer.create();
@@ -824,10 +828,13 @@ public class BatchAppenderator implements Appenderator
tuningConfig.getMaxColumnsToMerge()
);
- mergeFinishTime = System.nanoTime();
metrics.incrementMergedRows(rowsinMergedSegment);
- log.debug("Segment[%s] built in %,dms.", identifier, (mergeFinishTime
- startTime) / 1000000);
+ metrics.setMergeCpuTime(JvmUtils.safeGetThreadCpuTime() -
startMergeCpuNanos);
+ mergeTimeMillis = mergeStopwatch.millisElapsed();
+ metrics.setMergeTime(mergeTimeMillis);
+
+ log.debug("Segment[%s] built in %,dms.", identifier, mergeTimeMillis);
}
catch (Throwable t) {
throw closer.rethrow(t);
@@ -836,6 +843,8 @@ public class BatchAppenderator implements Appenderator
closer.close();
}
+ final Stopwatch pushStopwatch = Stopwatch.createStarted();
+
// dataSegmentPusher retries internally when appropriate; no need for
retries here.
final DataSegment segment = dataSegmentPusher.push(
mergedFile,
@@ -864,7 +873,7 @@ public class BatchAppenderator implements Appenderator
// cleanup, sink no longer needed
removeDirectory(computePersistDir(identifier));
- final long pushFinishTime = System.nanoTime();
+ final long pushTimeMillis = pushStopwatch.millisElapsed();
metrics.incrementPushedRows(rowsinMergedSegment);
log.info(
@@ -875,8 +884,8 @@ public class BatchAppenderator implements Appenderator
identifier,
segment.getSize(),
indexes.size(),
- (mergeFinishTime - startTime) / 1000000,
- (pushFinishTime - mergeFinishTime) / 1000000,
+ mergeTimeMillis,
+ pushTimeMillis,
objectMapper.writeValueAsString(segment.getLoadSpec())
);
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
index 578b1f32da2..c95434cbab6 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
@@ -24,7 +24,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
-import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
@@ -46,6 +45,7 @@ import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.RE;
+import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
@@ -81,6 +81,7 @@ import org.apache.druid.segment.realtime.sink.Sink;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.utils.JvmUtils;
import org.joda.time.Interval;
import javax.annotation.Nullable;
@@ -682,6 +683,7 @@ public class StreamAppenderator implements Appenderator
@Override
public Object call() throws IOException
{
+ final long startPersistCpuNanos = JvmUtils.safeGetThreadCpuTime();
try {
setTaskThreadContext();
for (Pair<FireHydrant, SegmentIdWithShardSpec> pair :
indexesToPersist) {
@@ -747,15 +749,16 @@ public class StreamAppenderator implements Appenderator
throw e;
}
finally {
+ metrics.setPersistCpuTime(JvmUtils.safeGetThreadCpuTime() -
startPersistCpuNanos);
+
metrics.incrementPersistTimeMillis(persistStopwatch.millisElapsed());
metrics.incrementNumPersists();
-
metrics.incrementPersistTimeMillis(persistStopwatch.elapsed(TimeUnit.MILLISECONDS));
persistStopwatch.stop();
}
}
}
);
- final long startDelay = runExecStopwatch.elapsed(TimeUnit.MILLISECONDS);
+ final long startDelay = runExecStopwatch.millisElapsed();
metrics.incrementPersistBackPressureMillis(startDelay);
if (startDelay > WARN_DELAY) {
log.warn("Ingestion was throttled for [%,d] millis because persists were
pending.", startDelay);
@@ -935,8 +938,9 @@ public class StreamAppenderator implements Appenderator
}
final File mergedFile;
- final long mergeFinishTime;
- final long startTime = System.nanoTime();
+ final Stopwatch mergeStopwatch = Stopwatch.createStarted();
+ final long mergeTimeMillis;
+ final long startMergeCpuNanos = JvmUtils.safeGetThreadCpuTime();
List<QueryableIndex> indexes = new ArrayList<>();
Closer closer = Closer.create();
try {
@@ -961,9 +965,11 @@ public class StreamAppenderator implements Appenderator
tuningConfig.getMaxColumnsToMerge()
);
- mergeFinishTime = System.nanoTime();
+ metrics.setMergeCpuTime(JvmUtils.safeGetThreadCpuTime() -
startMergeCpuNanos);
+ mergeTimeMillis = mergeStopwatch.millisElapsed();
+ metrics.setMergeTime(mergeTimeMillis);
- log.debug("Segment[%s] built in %,dms.", identifier, (mergeFinishTime
- startTime) / 1000000);
+ log.debug("Segment[%s] built in %,dms.", identifier, mergeTimeMillis);
}
catch (Throwable t) {
throw closer.rethrow(t);
@@ -972,6 +978,8 @@ public class StreamAppenderator implements Appenderator
closer.close();
}
+ final Stopwatch pushStopwatch = Stopwatch.createStarted();
+
final DataSegment segmentToPush = sink.getSegment().withDimensions(
IndexMerger.getMergedDimensionsFromQueryableIndexes(indexes,
schema.getDimensionsSpec())
);
@@ -979,8 +987,7 @@ public class StreamAppenderator implements Appenderator
// dataSegmentPusher retries internally when appropriate; no need for
retries here.
final DataSegment segment = dataSegmentPusher.push(mergedFile,
segmentToPush, useUniquePath);
- final long pushFinishTime = System.nanoTime();
-
+ final long pushTimeMillis = pushStopwatch.millisElapsed();
objectMapper.writeValue(descriptorFile, segment);
log.info(
@@ -991,8 +998,8 @@ public class StreamAppenderator implements Appenderator
identifier,
segment.getSize(),
indexes.size(),
- (mergeFinishTime - startTime) / 1000000,
- (pushFinishTime - mergeFinishTime) / 1000000,
+ mergeTimeMillis,
+ pushTimeMillis,
objectMapper.writeValueAsString(segment.getLoadSpec())
);
diff --git
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTest.java
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTest.java
index b80e33dfa18..0db9bc65321 100644
---
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTest.java
+++
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTest.java
@@ -29,6 +29,7 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
+import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.hamcrest.CoreMatchers;
@@ -127,6 +128,15 @@ public class BatchAppenderatorTest extends
InitializedNullHandlingTest
segmentsAndCommitMetadata.getSegments().stream().sorted().collect(Collectors.toList())
);
+ SegmentGenerationMetrics segmentGenerationMetrics = tester.getMetrics();
+ Assert.assertEquals(2, segmentGenerationMetrics.numPersists());
+ Assert.assertEquals(4, segmentGenerationMetrics.rowOutput());
+ Assert.assertTrue(segmentGenerationMetrics.persistTimeMillis() > 0);
+ Assert.assertTrue(segmentGenerationMetrics.persistCpuTime() > 0);
+
+ Assert.assertTrue(segmentGenerationMetrics.mergeTimeMillis() > 0);
+ Assert.assertTrue(segmentGenerationMetrics.mergeCpuTime() > 0);
+
appenderator.close();
Assert.assertTrue(appenderator.getSegments().isEmpty());
}
@@ -207,6 +217,9 @@ public class BatchAppenderatorTest extends
InitializedNullHandlingTest
ThrowableCauseMatcher.hasCause(ThrowableCauseMatcher.hasCause(ThrowableMessageMatcher.hasMessage(
CoreMatchers.startsWith("Push failure test"))))
);
+
+ SegmentGenerationMetrics segmentGenerationMetrics = tester.getMetrics();
+ Assert.assertEquals(1, segmentGenerationMetrics.failedHandoffs());
}
}
@@ -707,6 +720,15 @@ public class BatchAppenderatorTest extends
InitializedNullHandlingTest
).stream().sorted().collect(Collectors.toList())
);
Assert.assertEquals(0, ((BatchAppenderator)
appenderator).getRowsInMemory());
+
+ SegmentGenerationMetrics segmentGenerationMetrics = tester.getMetrics();
+ Assert.assertEquals(4, segmentGenerationMetrics.numPersists());
+ Assert.assertEquals(3, segmentGenerationMetrics.rowOutput());
+ Assert.assertTrue(segmentGenerationMetrics.persistTimeMillis() > 0);
+ Assert.assertTrue(segmentGenerationMetrics.persistCpuTime() > 0);
+
+ Assert.assertTrue(segmentGenerationMetrics.mergeTimeMillis() > 0);
+ Assert.assertTrue(segmentGenerationMetrics.mergeCpuTime() > 0);
appenderator.close();
}
}
diff --git
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
index 22074f4f78f..91d8074c908 100644
---
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
+++
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
@@ -56,6 +56,7 @@ import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
+import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
import org.apache.druid.segment.realtime.sink.Committers;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.apache.druid.testing.InitializedNullHandlingTest;
@@ -183,6 +184,15 @@ public class StreamAppenderatorTest extends
InitializedNullHandlingTest
);
Assert.assertEquals(sorted(tester.getPushedSegments()),
sorted(segmentsAndCommitMetadata.getSegments()));
+ SegmentGenerationMetrics segmentGenerationMetrics = tester.getMetrics();
+ Assert.assertEquals(2, segmentGenerationMetrics.numPersists());
+ Assert.assertEquals(3, segmentGenerationMetrics.rowOutput());
+ Assert.assertTrue(segmentGenerationMetrics.persistTimeMillis() > 0);
+ Assert.assertTrue(segmentGenerationMetrics.persistCpuTime() > 0);
+
+ Assert.assertTrue(segmentGenerationMetrics.mergeTimeMillis() > 0);
+ Assert.assertTrue(segmentGenerationMetrics.mergeCpuTime() > 0);
+
// clear
appenderator.clear();
Assert.assertTrue(appenderator.getSegments().isEmpty());
@@ -268,6 +278,8 @@ public class StreamAppenderatorTest extends
InitializedNullHandlingTest
ThrowableCauseMatcher.hasCause(ThrowableCauseMatcher.hasCause(ThrowableMessageMatcher.hasMessage(
CoreMatchers.startsWith("Push failure test"))))
);
+ SegmentGenerationMetrics segmentGenerationMetrics = tester.getMetrics();
+ Assert.assertEquals(1, segmentGenerationMetrics.failedHandoffs());
}
}
diff --git
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java
index 06745e98a04..b8c1065812b 100644
---
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java
+++
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java
@@ -115,7 +115,8 @@ public class StreamAppenderatorTester implements
AutoCloseable
final ServiceEmitter serviceEmitter,
final PolicyEnforcer policyEnforcer,
final boolean releaseLocksOnHandoff,
- final TaskIntervalUnlocker taskIntervalUnlocker
+ final TaskIntervalUnlocker taskIntervalUnlocker,
+ final SegmentGenerationMetrics segmentGenerationMetrics
)
{
objectMapper = new DefaultObjectMapper();
@@ -165,7 +166,7 @@ public class StreamAppenderatorTester implements
AutoCloseable
releaseLocksOnHandoff
);
- metrics = new SegmentGenerationMetrics();
+ metrics = segmentGenerationMetrics == null ? new
SegmentGenerationMetrics() : segmentGenerationMetrics;
queryExecutor = Execs.singleThreaded("queryExecutor(%d)");
IndexIO indexIO = new IndexIO(
@@ -364,6 +365,7 @@ public class StreamAppenderatorTester implements
AutoCloseable
private PolicyEnforcer policyEnforcer = NoopPolicyEnforcer.instance();
private boolean releaseLocksOnHandoff;
private TaskIntervalUnlocker taskIntervalUnlocker = interval -> {};
+ private SegmentGenerationMetrics segmentGenerationMetrics;
public Builder maxRowsInMemory(final int maxRowsInMemory)
{
@@ -389,6 +391,12 @@ public class StreamAppenderatorTester implements
AutoCloseable
return this;
}
+ public Builder segmentGenerationMetrics(final SegmentGenerationMetrics
segmentGenerationMetrics)
+ {
+ this.segmentGenerationMetrics = segmentGenerationMetrics;
+ return this;
+ }
+
public Builder rowIngestionMeters(final RowIngestionMeters
rowIngestionMeters)
{
this.rowIngestionMeters = rowIngestionMeters;
@@ -446,7 +454,8 @@ public class StreamAppenderatorTester implements
AutoCloseable
serviceEmitter,
policyEnforcer,
releaseLocksOnHandoff,
- taskIntervalUnlocker
+ taskIntervalUnlocker,
+ segmentGenerationMetrics
);
}
@@ -468,7 +477,8 @@ public class StreamAppenderatorTester implements
AutoCloseable
serviceEmitter,
policyEnforcer,
releaseLocksOnHandoff,
- taskIntervalUnlocker
+ taskIntervalUnlocker,
+ segmentGenerationMetrics
);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]