This is an automated email from the ASF dual-hosted git repository. abhishekrb pushed a commit to branch missing_appendator_metrics in repository https://gitbox.apache.org/repos/asf/druid.git
commit f48f51867d24378c444279fff47838de8c084d47 Author: Abhishek Balaji Radhakrishnan <[email protected]> AuthorDate: Tue Dec 23 09:15:56 2025 -0500 Test extensions for kafka and kinesis Init ServiceEmitter as non-static to reuse individually for each test Add a helper in the base class for asserting segment generation metrics --- .../druid/indexing/kafka/KafkaIndexTaskTest.java | 60 ++++++++++++++++------ .../indexing/kinesis/KinesisIndexTaskTest.java | 33 +++++++----- .../SeekableStreamIndexTaskTestBase.java | 31 +++++++++++ 3 files changed, 96 insertions(+), 28 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 96dd04d42cf..14eec079574 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -88,9 +88,6 @@ import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.java.util.common.parsers.ParseException; -import org.apache.druid.java.util.emitter.EmittingLogger; -import org.apache.druid.java.util.emitter.core.NoopEmitter; -import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.query.DefaultGenericQueryMetricsFactory; import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate; @@ -117,6 +114,7 @@ import org.apache.druid.segment.incremental.RowIngestionMeters; import org.apache.druid.segment.incremental.RowIngestionMetersTotals; import org.apache.druid.segment.incremental.RowMeters; import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.realtime.SegmentGenerationMetrics; import org.apache.druid.segment.transform.ExpressionTransform; import org.apache.druid.segment.transform.TransformSpec; import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec; @@ -195,9 +193,7 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase private static TestingCluster zkServer; private static TestBroker kafkaServer; - private static ServiceEmitter emitter; private static int topicPostfix; - static final Module TEST_MODULE = new SimpleModule("kafkaTestModule").registerSubtypes( new NamedType(TestKafkaInputFormat.class, "testKafkaInputFormat"), new NamedType(TestKafkaFormatWithMalformedDataDetection.class, "testKafkaFormatWithMalformedDataDetection") @@ -287,14 +283,6 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase @BeforeClass public static void setupClass() throws Exception { - emitter = new ServiceEmitter( - "service", - "host", - new NoopEmitter() - ); - emitter.start(); - EmittingLogger.registerEmitter(emitter); - zkServer = new TestingCluster(1); zkServer.start(); @@ -353,8 +341,6 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase zkServer.stop(); zkServer = null; - - emitter.close(); } @Test(timeout = 60_000L) @@ -387,7 +373,6 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase // Wait for task to exit Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(2, 5)).totalProcessed(3)); - Assert.assertTrue(task.getRunner().getSegmentGenerationMetrics().isProcessingDone()); // Check published metadata and segments in deep storage assertEqualsExceptVersion( @@ -401,6 +386,12 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase new KafkaDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(new KafkaTopicPartition(false, topic, 0), 5L))), newDataSchemaMetadata() ); + + final SegmentGenerationMetrics observedSegmentGenerationMetrics = task.getRunner().getSegmentGenerationMetrics(); + Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone()); + Assert.assertEquals(3, observedSegmentGenerationMetrics.rowOutput()); + Assert.assertEquals(2, observedSegmentGenerationMetrics.handOffCount()); + verifySegmentGenerationTimeMetricsArePositive(observedSegmentGenerationMetrics); } @Test(timeout = 60_000L) @@ -449,6 +440,12 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase Assert.assertEquals(dimensionsSpec.getDimensionNames().get(i), segment.getDimensions().get(i)); } } + + final SegmentGenerationMetrics observedSegmentGenerationMetrics = task.getRunner().getSegmentGenerationMetrics(); + Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone()); + Assert.assertEquals(3, observedSegmentGenerationMetrics.rowOutput()); + Assert.assertEquals(2, observedSegmentGenerationMetrics.handOffCount()); + verifySegmentGenerationTimeMetricsArePositive(observedSegmentGenerationMetrics); } @Test(timeout = 60_000L) @@ -738,6 +735,13 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase ), newDataSchemaMetadata() ); + + final SegmentGenerationMetrics observedSegmentGenerationMetrics = task.getRunner().getSegmentGenerationMetrics(); + Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone()); + Assert.assertEquals(8, observedSegmentGenerationMetrics.rowOutput()); + Assert.assertEquals(7, observedSegmentGenerationMetrics.handOffCount()); + Assert.assertEquals(4, observedSegmentGenerationMetrics.numPersists()); + verifySegmentGenerationTimeMetricsArePositive(observedSegmentGenerationMetrics); } @Test(timeout = 60_000L) @@ -1694,6 +1698,14 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase "{timestamp=246140482-04-24T15:36:27.903Z, dim1=x, dim2=z, dimLong=10, dimFloat=20.0, met1=1.0}" ); Assert.assertEquals(expectedInputs, parseExceptionReport.getInputs()); + + emitter.verifyValue("ingest/segments/count", 4); + + final SegmentGenerationMetrics observedSegmentGenerationMetrics = task.getRunner().getSegmentGenerationMetrics(); + Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone()); + Assert.assertEquals(7, observedSegmentGenerationMetrics.rowOutput()); + Assert.assertEquals(4, observedSegmentGenerationMetrics.handOffCount()); + verifySegmentGenerationTimeMetricsArePositive(observedSegmentGenerationMetrics); } @Test(timeout = 60_000L) @@ -1768,6 +1780,14 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase List<String> expectedInputs = Arrays.asList("", "unparseable"); Assert.assertEquals(expectedInputs, parseExceptionReport.getInputs()); + + emitter.verifyNotEmitted("ingest/segments/count"); + + final SegmentGenerationMetrics observedSegmentGenerationMetrics = task.getRunner().getSegmentGenerationMetrics(); + Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone()); + Assert.assertEquals(3, observedSegmentGenerationMetrics.rowOutput()); + Assert.assertEquals(1, observedSegmentGenerationMetrics.numPersists()); + Assert.assertEquals(0, observedSegmentGenerationMetrics.handOffCount()); } @Test(timeout = 60_000L) @@ -3450,6 +3470,14 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase // Check segments in deep storage Assert.assertEquals(ImmutableList.of("b"), readSegmentColumn("dim1", publishedDescriptors.get(0))); Assert.assertEquals(ImmutableList.of("bb"), readSegmentColumn("dim1t", publishedDescriptors.get(0))); + + emitter.verifyValue("ingest/segments/count", 1); + + final SegmentGenerationMetrics observedSegmentGenerationMetrics = task.getRunner().getSegmentGenerationMetrics(); + Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone()); + Assert.assertEquals(1, observedSegmentGenerationMetrics.rowOutput()); + Assert.assertEquals(1, observedSegmentGenerationMetrics.handOffCount()); + verifySegmentGenerationTimeMetricsArePositive(observedSegmentGenerationMetrics); } public static class TestKafkaInputFormat implements InputFormat diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index 98dfea4333c..f37f856282f 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -68,9 +68,6 @@ import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervi import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; -import org.apache.druid.java.util.emitter.EmittingLogger; -import org.apache.druid.java.util.emitter.core.NoopEmitter; -import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate; import org.apache.druid.query.DruidProcessingConfigTest; @@ -85,6 +82,7 @@ import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.incremental.RowIngestionMeters; import org.apache.druid.segment.incremental.RowMeters; import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.realtime.SegmentGenerationMetrics; import org.apache.druid.segment.transform.ExpressionTransform; import org.apache.druid.segment.transform.TransformSpec; import org.apache.druid.timeline.DataSegment; @@ -167,7 +165,6 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase ); private static KinesisRecordSupplier recordSupplier; - private static ServiceEmitter emitter; @Parameterized.Parameters(name = "{0}") public static Iterable<Object[]> constructorFeeder() @@ -193,13 +190,6 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase @BeforeClass public static void setupClass() { - emitter = new ServiceEmitter( - "service", - "host", - new NoopEmitter() - ); - emitter.start(); - EmittingLogger.registerEmitter(emitter); taskExec = MoreExecutors.listeningDecorator( Executors.newCachedThreadPool( Execs.makeThreadFactory("kinesis-task-test-%d") @@ -251,7 +241,6 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase { taskExec.shutdown(); taskExec.awaitTermination(20, TimeUnit.MINUTES); - emitter.close(); } private void waitUntil(KinesisIndexTask task, Predicate<KinesisIndexTask> predicate) @@ -355,6 +344,13 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase ), newDataSchemaMetadata() ); + + final SegmentGenerationMetrics observedSegmentGenerationMetrics = task.getRunner().getSegmentGenerationMetrics(); + Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone()); + Assert.assertEquals(3, observedSegmentGenerationMetrics.rowOutput()); + Assert.assertEquals(2, observedSegmentGenerationMetrics.handOffCount()); + Assert.assertEquals(2, observedSegmentGenerationMetrics.numPersists()); + verifySegmentGenerationTimeMetricsArePositive(observedSegmentGenerationMetrics); } @Test(timeout = 120_000L) @@ -752,6 +748,13 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase new KinesisDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(STREAM, endOffsets)), newDataSchemaMetadata() ); + + final SegmentGenerationMetrics observedSegmentGenerationMetrics = task.getRunner().getSegmentGenerationMetrics(); + Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone()); + Assert.assertEquals(7, observedSegmentGenerationMetrics.rowOutput()); + Assert.assertEquals(6, observedSegmentGenerationMetrics.handOffCount()); + Assert.assertEquals(5, observedSegmentGenerationMetrics.numPersists()); + verifySegmentGenerationTimeMetricsArePositive(observedSegmentGenerationMetrics); } @@ -1031,6 +1034,12 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase ), newDataSchemaMetadata() ); + final SegmentGenerationMetrics observedSegmentGenerationMetrics = task.getRunner().getSegmentGenerationMetrics(); + Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone()); + Assert.assertEquals(3, observedSegmentGenerationMetrics.rowOutput()); + Assert.assertEquals(2, observedSegmentGenerationMetrics.handOffCount()); + Assert.assertEquals(2, observedSegmentGenerationMetrics.numPersists()); + verifySegmentGenerationTimeMetricsArePositive(observedSegmentGenerationMetrics); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java index 19ff6ec4d0b..4adbbdded64 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java @@ -83,8 +83,10 @@ import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.metrics.MonitorScheduler; +import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.metadata.DerbyMetadataStorageActionHandlerFactory; import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator; import org.apache.druid.metadata.TestDerbyConnector; @@ -120,6 +122,7 @@ import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig; import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig; import org.apache.druid.segment.metadata.SegmentSchemaManager; import org.apache.druid.segment.realtime.NoopChatHandlerProvider; +import org.apache.druid.segment.realtime.SegmentGenerationMetrics; import org.apache.druid.segment.realtime.appenderator.StreamAppenderator; import org.apache.druid.server.DruidNode; import org.apache.druid.server.coordination.DataSegmentServerAnnouncer; @@ -134,7 +137,9 @@ import org.assertj.core.api.Assertions; import org.easymock.EasyMock; import org.easymock.EasyMockSupport; import org.joda.time.Interval; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Rule; import org.junit.rules.TemporaryFolder; @@ -198,6 +203,8 @@ public abstract class SeekableStreamIndexTaskTestBase extends EasyMockSupport protected final List<Task> runningTasks = new ArrayList<>(); protected final LockGranularity lockGranularity; + + protected StubServiceEmitter emitter; protected File directory; protected File reportsFile; protected TaskToolboxFactory toolboxFactory; @@ -251,6 +258,20 @@ public abstract class SeekableStreamIndexTaskTestBase extends EasyMockSupport this.lockGranularity = lockGranularity; } + @Before + public void setupBase() + { + emitter = new StubServiceEmitter(); + emitter.start(); + EmittingLogger.registerEmitter(emitter); + } + + @After + public void tearDownBase() throws IOException + { + emitter.close(); + } + protected static ByteEntity jb( String timestamp, String dim1, @@ -408,6 +429,16 @@ public abstract class SeekableStreamIndexTaskTestBase extends EasyMockSupport return new SegmentDescriptor(interval, "fakeVersion", partitionNum); } + protected void verifySegmentGenerationTimeMetricsArePositive(SegmentGenerationMetrics observedSegmentGenerationMetrics) + { + Assert.assertNotNull(observedSegmentGenerationMetrics); + Assert.assertTrue(observedSegmentGenerationMetrics.persistTimeMillis() > 0); + Assert.assertTrue(observedSegmentGenerationMetrics.persistCpuTime() > 0); + + Assert.assertTrue(observedSegmentGenerationMetrics.mergeTimeMillis() > 0); + Assert.assertTrue(observedSegmentGenerationMetrics.mergeCpuTime() > 0); + } + protected void assertEqualsExceptVersion( List<SegmentDescriptorAndExpectedDim1Values> expectedDescriptors, List<SegmentDescriptor> actualDescriptors --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
