This is an automated email from the ASF dual-hosted git repository.

abhishekrb pushed a commit to branch missing_appendator_metrics
in repository https://gitbox.apache.org/repos/asf/druid.git

commit f48f51867d24378c444279fff47838de8c084d47
Author: Abhishek Balaji Radhakrishnan <[email protected]>
AuthorDate: Tue Dec 23 09:15:56 2025 -0500

    Test extensions for kafka and kinesis
    
    Init ServiceEmitter as non-static to reuse individually for
    each test
    
    Add a helper in the base class for asserting segment generation metrics
---
 .../druid/indexing/kafka/KafkaIndexTaskTest.java   | 60 ++++++++++++++++------
 .../indexing/kinesis/KinesisIndexTaskTest.java     | 33 +++++++-----
 .../SeekableStreamIndexTaskTestBase.java           | 31 +++++++++++
 3 files changed, 96 insertions(+), 28 deletions(-)

diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 96dd04d42cf..14eec079574 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -88,9 +88,6 @@ import 
org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.guava.Sequences;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
 import org.apache.druid.java.util.common.parsers.ParseException;
-import org.apache.druid.java.util.emitter.EmittingLogger;
-import org.apache.druid.java.util.emitter.core.NoopEmitter;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.math.expr.ExprMacroTable;
 import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
 import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
@@ -117,6 +114,7 @@ import 
org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
 import org.apache.druid.segment.incremental.RowMeters;
 import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
 import org.apache.druid.segment.transform.ExpressionTransform;
 import org.apache.druid.segment.transform.TransformSpec;
 import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec;
@@ -195,9 +193,7 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
 
   private static TestingCluster zkServer;
   private static TestBroker kafkaServer;
-  private static ServiceEmitter emitter;
   private static int topicPostfix;
-
   static final Module TEST_MODULE = new 
SimpleModule("kafkaTestModule").registerSubtypes(
       new NamedType(TestKafkaInputFormat.class, "testKafkaInputFormat"),
       new NamedType(TestKafkaFormatWithMalformedDataDetection.class, 
"testKafkaFormatWithMalformedDataDetection")
@@ -287,14 +283,6 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
   @BeforeClass
   public static void setupClass() throws Exception
   {
-    emitter = new ServiceEmitter(
-        "service",
-        "host",
-        new NoopEmitter()
-    );
-    emitter.start();
-    EmittingLogger.registerEmitter(emitter);
-
     zkServer = new TestingCluster(1);
     zkServer.start();
 
@@ -353,8 +341,6 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
 
     zkServer.stop();
     zkServer = null;
-
-    emitter.close();
   }
 
   @Test(timeout = 60_000L)
@@ -387,7 +373,6 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
     // Wait for task to exit
     Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
     verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(2, 
5)).totalProcessed(3));
-    
Assert.assertTrue(task.getRunner().getSegmentGenerationMetrics().isProcessingDone());
 
     // Check published metadata and segments in deep storage
     assertEqualsExceptVersion(
@@ -401,6 +386,12 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
         new KafkaDataSourceMetadata(new 
SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(new 
KafkaTopicPartition(false, topic, 0), 5L))),
         newDataSchemaMetadata()
     );
+
+    final SegmentGenerationMetrics observedSegmentGenerationMetrics = 
task.getRunner().getSegmentGenerationMetrics();
+    Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone());
+    Assert.assertEquals(3, observedSegmentGenerationMetrics.rowOutput());
+    Assert.assertEquals(2, observedSegmentGenerationMetrics.handOffCount());
+    
verifySegmentGenerationTimeMetricsArePositive(observedSegmentGenerationMetrics);
   }
 
   @Test(timeout = 60_000L)
@@ -449,6 +440,12 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
         Assert.assertEquals(dimensionsSpec.getDimensionNames().get(i), 
segment.getDimensions().get(i));
       }
     }
+
+    final SegmentGenerationMetrics observedSegmentGenerationMetrics = 
task.getRunner().getSegmentGenerationMetrics();
+    Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone());
+    Assert.assertEquals(3, observedSegmentGenerationMetrics.rowOutput());
+    Assert.assertEquals(2, observedSegmentGenerationMetrics.handOffCount());
+    
verifySegmentGenerationTimeMetricsArePositive(observedSegmentGenerationMetrics);
   }
 
   @Test(timeout = 60_000L)
@@ -738,6 +735,13 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
         ),
         newDataSchemaMetadata()
     );
+
+    final SegmentGenerationMetrics observedSegmentGenerationMetrics = 
task.getRunner().getSegmentGenerationMetrics();
+    Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone());
+    Assert.assertEquals(8, observedSegmentGenerationMetrics.rowOutput());
+    Assert.assertEquals(7, observedSegmentGenerationMetrics.handOffCount());
+    Assert.assertEquals(4, observedSegmentGenerationMetrics.numPersists());
+    
verifySegmentGenerationTimeMetricsArePositive(observedSegmentGenerationMetrics);
   }
 
   @Test(timeout = 60_000L)
@@ -1694,6 +1698,14 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
         "{timestamp=246140482-04-24T15:36:27.903Z, dim1=x, dim2=z, dimLong=10, 
dimFloat=20.0, met1=1.0}"
     );
     Assert.assertEquals(expectedInputs, parseExceptionReport.getInputs());
+
+    emitter.verifyValue("ingest/segments/count", 4);
+
+    final SegmentGenerationMetrics observedSegmentGenerationMetrics = 
task.getRunner().getSegmentGenerationMetrics();
+    Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone());
+    Assert.assertEquals(7, observedSegmentGenerationMetrics.rowOutput());
+    Assert.assertEquals(4, observedSegmentGenerationMetrics.handOffCount());
+    
verifySegmentGenerationTimeMetricsArePositive(observedSegmentGenerationMetrics);
   }
 
   @Test(timeout = 60_000L)
@@ -1768,6 +1780,14 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
 
     List<String> expectedInputs = Arrays.asList("", "unparseable");
     Assert.assertEquals(expectedInputs, parseExceptionReport.getInputs());
+
+    emitter.verifyNotEmitted("ingest/segments/count");
+
+    final SegmentGenerationMetrics observedSegmentGenerationMetrics = 
task.getRunner().getSegmentGenerationMetrics();
+    Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone());
+    Assert.assertEquals(3, observedSegmentGenerationMetrics.rowOutput());
+    Assert.assertEquals(1, observedSegmentGenerationMetrics.numPersists());
+    Assert.assertEquals(0, observedSegmentGenerationMetrics.handOffCount());
   }
 
   @Test(timeout = 60_000L)
@@ -3450,6 +3470,14 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
     // Check segments in deep storage
     Assert.assertEquals(ImmutableList.of("b"), readSegmentColumn("dim1", 
publishedDescriptors.get(0)));
     Assert.assertEquals(ImmutableList.of("bb"), readSegmentColumn("dim1t", 
publishedDescriptors.get(0)));
+
+    emitter.verifyValue("ingest/segments/count", 1);
+
+    final SegmentGenerationMetrics observedSegmentGenerationMetrics = 
task.getRunner().getSegmentGenerationMetrics();
+    Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone());
+    Assert.assertEquals(1, observedSegmentGenerationMetrics.rowOutput());
+    Assert.assertEquals(1, observedSegmentGenerationMetrics.handOffCount());
+    
verifySegmentGenerationTimeMetricsArePositive(observedSegmentGenerationMetrics);
   }
 
   public static class TestKafkaInputFormat implements InputFormat
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index 98dfea4333c..f37f856282f 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -68,9 +68,6 @@ import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervi
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
-import org.apache.druid.java.util.emitter.EmittingLogger;
-import org.apache.druid.java.util.emitter.core.NoopEmitter;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.math.expr.ExprMacroTable;
 import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
 import org.apache.druid.query.DruidProcessingConfigTest;
@@ -85,6 +82,7 @@ import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.segment.incremental.RowMeters;
 import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
 import org.apache.druid.segment.transform.ExpressionTransform;
 import org.apache.druid.segment.transform.TransformSpec;
 import org.apache.druid.timeline.DataSegment;
@@ -167,7 +165,6 @@ public class KinesisIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
   );
 
   private static KinesisRecordSupplier recordSupplier;
-  private static ServiceEmitter emitter;
 
   @Parameterized.Parameters(name = "{0}")
   public static Iterable<Object[]> constructorFeeder()
@@ -193,13 +190,6 @@ public class KinesisIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
   @BeforeClass
   public static void setupClass()
   {
-    emitter = new ServiceEmitter(
-        "service",
-        "host",
-        new NoopEmitter()
-    );
-    emitter.start();
-    EmittingLogger.registerEmitter(emitter);
     taskExec = MoreExecutors.listeningDecorator(
         Executors.newCachedThreadPool(
             Execs.makeThreadFactory("kinesis-task-test-%d")
@@ -251,7 +241,6 @@ public class KinesisIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
   {
     taskExec.shutdown();
     taskExec.awaitTermination(20, TimeUnit.MINUTES);
-    emitter.close();
   }
 
   private void waitUntil(KinesisIndexTask task, Predicate<KinesisIndexTask> 
predicate)
@@ -355,6 +344,13 @@ public class KinesisIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
         ),
         newDataSchemaMetadata()
     );
+
+    final SegmentGenerationMetrics observedSegmentGenerationMetrics = 
task.getRunner().getSegmentGenerationMetrics();
+    Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone());
+    Assert.assertEquals(3, observedSegmentGenerationMetrics.rowOutput());
+    Assert.assertEquals(2, observedSegmentGenerationMetrics.handOffCount());
+    Assert.assertEquals(2, observedSegmentGenerationMetrics.numPersists());
+    
verifySegmentGenerationTimeMetricsArePositive(observedSegmentGenerationMetrics);
   }
 
   @Test(timeout = 120_000L)
@@ -752,6 +748,13 @@ public class KinesisIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
         new KinesisDataSourceMetadata(new 
SeekableStreamEndSequenceNumbers<>(STREAM, endOffsets)),
         newDataSchemaMetadata()
     );
+
+    final SegmentGenerationMetrics observedSegmentGenerationMetrics = 
task.getRunner().getSegmentGenerationMetrics();
+    Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone());
+    Assert.assertEquals(7, observedSegmentGenerationMetrics.rowOutput());
+    Assert.assertEquals(6, observedSegmentGenerationMetrics.handOffCount());
+    Assert.assertEquals(5, observedSegmentGenerationMetrics.numPersists());
+    
verifySegmentGenerationTimeMetricsArePositive(observedSegmentGenerationMetrics);
   }
 
 
@@ -1031,6 +1034,12 @@ public class KinesisIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
         ),
         newDataSchemaMetadata()
     );
+    final SegmentGenerationMetrics observedSegmentGenerationMetrics = 
task.getRunner().getSegmentGenerationMetrics();
+    Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone());
+    Assert.assertEquals(3, observedSegmentGenerationMetrics.rowOutput());
+    Assert.assertEquals(2, observedSegmentGenerationMetrics.handOffCount());
+    Assert.assertEquals(2, observedSegmentGenerationMetrics.numPersists());
+    
verifySegmentGenerationTimeMetricsArePositive(observedSegmentGenerationMetrics);
   }
 
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
index 19ff6ec4d0b..4adbbdded64 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
@@ -83,8 +83,10 @@ import 
org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.guava.Comparators;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.metrics.MonitorScheduler;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
 import org.apache.druid.metadata.DerbyMetadataStorageActionHandlerFactory;
 import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
 import org.apache.druid.metadata.TestDerbyConnector;
@@ -120,6 +122,7 @@ import 
org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
 import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
 import org.apache.druid.segment.metadata.SegmentSchemaManager;
 import org.apache.druid.segment.realtime.NoopChatHandlerProvider;
+import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
 import org.apache.druid.segment.realtime.appenderator.StreamAppenderator;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.coordination.DataSegmentServerAnnouncer;
@@ -134,7 +137,9 @@ import org.assertj.core.api.Assertions;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockSupport;
 import org.joda.time.Interval;
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.rules.TemporaryFolder;
 
@@ -198,6 +203,8 @@ public abstract class SeekableStreamIndexTaskTestBase 
extends EasyMockSupport
 
   protected final List<Task> runningTasks = new ArrayList<>();
   protected final LockGranularity lockGranularity;
+
+  protected StubServiceEmitter emitter;
   protected File directory;
   protected File reportsFile;
   protected TaskToolboxFactory toolboxFactory;
@@ -251,6 +258,20 @@ public abstract class SeekableStreamIndexTaskTestBase 
extends EasyMockSupport
     this.lockGranularity = lockGranularity;
   }
 
+  @Before
+  public void setupBase()
+  {
+    emitter = new StubServiceEmitter();
+    emitter.start();
+    EmittingLogger.registerEmitter(emitter);
+  }
+
+  @After
+  public void tearDownBase() throws IOException
+  {
+    emitter.close();
+  }
+
   protected static ByteEntity jb(
       String timestamp,
       String dim1,
@@ -408,6 +429,16 @@ public abstract class SeekableStreamIndexTaskTestBase 
extends EasyMockSupport
     return new SegmentDescriptor(interval, "fakeVersion", partitionNum);
   }
 
+  protected void 
verifySegmentGenerationTimeMetricsArePositive(SegmentGenerationMetrics 
observedSegmentGenerationMetrics)
+  {
+    Assert.assertNotNull(observedSegmentGenerationMetrics);
+    Assert.assertTrue(observedSegmentGenerationMetrics.persistTimeMillis() > 
0);
+    Assert.assertTrue(observedSegmentGenerationMetrics.persistCpuTime() > 0);
+
+    Assert.assertTrue(observedSegmentGenerationMetrics.mergeTimeMillis() > 0);
+    Assert.assertTrue(observedSegmentGenerationMetrics.mergeCpuTime() > 0);
+  }
+
   protected void assertEqualsExceptVersion(
       List<SegmentDescriptorAndExpectedDim1Values> expectedDescriptors,
       List<SegmentDescriptor> actualDescriptors


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to