This is an automated email from the ASF dual-hosted git repository.

abhishekrb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new c324e377512 Add javadocs to `KafkaEmitterTest` & fix flaky test 
(#15898)
c324e377512 is described below

commit c324e377512bfa815e0e71fc3dbdb69934d9951e
Author: Abhishek Radhakrishnan <[email protected]>
AuthorDate: Wed Feb 14 11:52:06 2024 -0800

    Add javadocs to `KafkaEmitterTest` & fix flaky test (#15898)
    
    * Address review comment: add test javadocs
    
    * Fix flaky assertion failure.
    
    Use ConcurrentHashMap instead of HashMap because the producer callback
    can trigger concurrently and override the map initialization.
    
    * fixup intellij inspection
---
 .../druid/emitter/kafka/KafkaEmitterTest.java      | 48 +++++++++++++++++++++-
 1 file changed, 47 insertions(+), 1 deletion(-)

diff --git 
a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java
 
b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java
index 74a412a203e..e7bd7ac47d4 100644
--- 
a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java
+++ 
b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java
@@ -51,6 +51,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 
 import static org.mockito.ArgumentMatchers.any;
@@ -128,6 +129,11 @@ public class KafkaEmitterTest
       new TestEvent()
   );
 
+  /**
+   * Unit test to validate the handling of {@link ServiceMetricEvent}s.
+   * Only {@link KafkaEmitterConfig.EventType}s is subscribed in the config, 
so the expectation is that the
+   * events are emitted without any drops.
+   */
   @Test(timeout = 10_000)
   public void testServiceMetricEvents() throws JsonProcessingException, 
InterruptedException
   {
@@ -167,6 +173,12 @@ public class KafkaEmitterTest
     Assert.assertEquals(0, kafkaEmitter.getInvalidLostCount());
   }
 
+  /**
+   * Unit test to validate the handling of all event types, including {@link 
ServiceMetricEvent},
+   * {@link AlertEvent}, {@link org.apache.druid.server.log.RequestLogEvent}, 
and {@link SegmentMetadataEvent}.
+   * All {@link KafkaEmitterConfig.EventType}s are subscribed in the config, 
so the expectation is that all the
+   * events are emitted without any drops.
+   */
   @Test(timeout = 10_000)
   public void testAllEvents() throws JsonProcessingException, 
InterruptedException
   {
@@ -212,6 +224,11 @@ public class KafkaEmitterTest
 
   }
 
+  /**
+   * Unit test to validate the handling of the default event types - {@link 
ServiceMetricEvent} and {@link AlertEvent}.
+   * The default event types (alerts and metrics) are subscribed in the 
config, so the expectation is that both input
+   * event types should be emitted without any drops.
+   */
   @Test(timeout = 10_000)
   public void testDefaultEvents() throws JsonProcessingException, 
InterruptedException
   {
@@ -254,6 +271,12 @@ public class KafkaEmitterTest
     Assert.assertEquals(0, kafkaEmitter.getInvalidLostCount());
   }
 
+  /**
+   * Unit test to validate the handling of all valid event types, including 
{@link ServiceMetricEvent},
+   * {@link AlertEvent}, {@link org.apache.druid.server.log.RequestLogEvent}, 
and {@link SegmentMetadataEvent}.
+   * Only alerts are subscribed in the config, so the expectation is that only 
alert events
+   * should be emitted, and everything else should be dropped.
+   */
   @Test(timeout = 10_000)
   public void testAlertsPlusUnsubscribedEvents() throws 
JsonProcessingException, InterruptedException
   {
@@ -302,6 +325,15 @@ public class KafkaEmitterTest
     Assert.assertEquals(REQUEST_LOG_EVENTS.size(), 
kafkaEmitter.getRequestLostCount());
   }
 
+  /**
+   * Similar to {@link #testAllEvents()}, this test configures all event feeds 
to emit to the same topic.
+   * <p>
+   * Unit test to validate the handling of all valid event types, including 
{@link ServiceMetricEvent},
+   * {@link AlertEvent}, {@link org.apache.druid.server.log.RequestLogEvent}, 
and {@link SegmentMetadataEvent}.
+   * All {@link KafkaEmitterConfig.EventType}s are subscribed to the same 
topic in the config, so the expectation
+   * is that all input events are emitted without any drops.
+   * </p>
+   */
   @Test(timeout = 10_000)
   public void testAllEventsWithCommonTopic() throws JsonProcessingException, 
InterruptedException
   {
@@ -347,6 +379,12 @@ public class KafkaEmitterTest
     Assert.assertEquals(0, kafkaEmitter.getInvalidLostCount());
   }
 
+  /**
+   * Unit test to validate the handling of {@link ServiceMetricEvent}s and 
{@link TestEvent}s.
+   * The default event types (alerts and metrics) are subscribed in the 
config, so the expectation is that only
+   * {@link ServiceMetricEvent} is expected to be emitted, while dropping all 
unknown {@link TestEvent}s.
+   * </p>
+   */
   @Test(timeout = 10_000)
   public void testUnknownEvents() throws JsonProcessingException, 
InterruptedException
   {
@@ -390,6 +428,13 @@ public class KafkaEmitterTest
     Assert.assertEquals(UNKNOWN_EVENTS.size(), 
kafkaEmitter.getInvalidLostCount());
   }
 
+  /**
+   * Unit test to validate the handling of {@link ServiceMetricEvent}s when 
the Kafka emitter queue, which buffers up events
+   * becomes full. The queue size in the config is set via {@code 
buffer.memory} and is computed from
+   * the input events using {@code bufferEventsDrop}. The default event types 
(alerts and metrics) are subscribed in
+   * the config, so the expectation is that all {@link ServiceMetricEvent}s up 
to {@code n - bufferEventsDrop} will be
+   * emitted, {@code n} being the total number of input events, while dropping 
the last {@code bufferEventsDrop} events.
+   */
   @Test(timeout = 10_000)
   public void testDropEventsWhenQueueFull() throws JsonProcessingException, 
InterruptedException
   {
@@ -513,7 +558,8 @@ public class KafkaEmitterTest
       final CountDownLatch eventLatch
   )
   {
-    final Map<String, List<String>> feedToActualEvents = new HashMap<>();
+    // A concurrent hashmap because the producer callback can trigger 
concurrently and can override the map initialization
+    final ConcurrentHashMap<String, List<String>> feedToActualEvents = new 
ConcurrentHashMap<>();
     when(producer.send(any(), any())).then((invocation) -> {
       final ProducerRecord<?, ?> producerRecord = invocation.getArgument(0);
       final String value = String.valueOf(producerRecord.value());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to