This is an automated email from the ASF dual-hosted git repository.

abhishekrb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new f224035c7e2 Fix Flakiness in KafkaEmitterTest (#15907)
f224035c7e2 is described below

commit f224035c7e2121ae726bf9b04dd69545778d2eaa
Author: Tom <[email protected]>
AuthorDate: Wed Feb 14 20:31:55 2024 -0800

    Fix Flakiness in KafkaEmitterTest (#15907)
    
    * thrust of the fix to allow for the json values to be out of order
    
    The existing problem is that toMap doesn't turn some values into json 
primitive
    values, for example segmentMetadata just has DateTime objects for it's time 
in
    the EventMap, but Alert event converts those into strings when calling 
toMap.
    This creates an issue because when we check the emitted events the mapper
    deserializing the string value for dateTime leaves it as a string in the
    EventMap. So the question is do we alter the events toMap() to return 
string/map
    version of objects or to make the expected events do a round trip of
    eventMap -> string -> eventMap to turn everything into json primitives
    
    * fix issue by making toMap events convert Objects into strings, or maps
    
    * fix linting errors
    
    * use method of using mapper to round trip expected data to make it have 
same type
    as those of the events emitted
    
    * remove unnecessary comment
---
 .../druid/emitter/kafka/KafkaEmitterTest.java      | 80 +++++++++++-----------
 1 file changed, 41 insertions(+), 39 deletions(-)

diff --git 
a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java
 
b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java
index e7bd7ac47d4..a9f5e14fbea 100644
--- 
a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java
+++ 
b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java
@@ -54,6 +54,8 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -135,7 +137,7 @@ public class KafkaEmitterTest
    * events are emitted without any drops.
    */
   @Test(timeout = 10_000)
-  public void testServiceMetricEvents() throws JsonProcessingException, 
InterruptedException
+  public void testServiceMetricEvents() throws InterruptedException, 
JsonProcessingException
   {
     final KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig(
         "",
@@ -155,12 +157,12 @@ public class KafkaEmitterTest
     final List<Event> inputEvents = flattenEvents(SERVICE_METRIC_EVENTS);
     final CountDownLatch eventLatch = new CountDownLatch(inputEvents.size());
 
-    final Map<String, List<String>> feedToExpectedEvents = 
trackExpectedEventsPerFeed(
+    final Map<String, List<EventMap>> feedToExpectedEvents = 
trackExpectedEventsPerFeed(
         inputEvents,
         kafkaEmitterConfig.getClusterName(),
         kafkaEmitterConfig.getExtraDimensions()
     );
-    final Map<String, List<String>> feedToActualEvents = 
trackActualEventsPerFeed(eventLatch);
+    final Map<String, List<EventMap>> feedToActualEvents = 
trackActualEventsPerFeed(eventLatch);
 
     emitEvents(kafkaEmitter, inputEvents, eventLatch);
 
@@ -180,7 +182,7 @@ public class KafkaEmitterTest
    * events are emitted without any drops.
    */
   @Test(timeout = 10_000)
-  public void testAllEvents() throws JsonProcessingException, 
InterruptedException
+  public void testAllEvents() throws InterruptedException, 
JsonProcessingException
   {
     final KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig(
         "",
@@ -205,12 +207,12 @@ public class KafkaEmitterTest
     );
     final CountDownLatch eventLatch = new CountDownLatch(inputEvents.size());
 
-    final Map<String, List<String>> feedToExpectedEvents = 
trackExpectedEventsPerFeed(
+    final Map<String, List<EventMap>> feedToExpectedEvents = 
trackExpectedEventsPerFeed(
         inputEvents,
         kafkaEmitterConfig.getClusterName(),
         kafkaEmitterConfig.getExtraDimensions()
     );
-    final Map<String, List<String>> feedToActualEvents = 
trackActualEventsPerFeed(eventLatch);
+    final Map<String, List<EventMap>> feedToActualEvents = 
trackActualEventsPerFeed(eventLatch);
 
     emitEvents(kafkaEmitter, inputEvents, eventLatch);
 
@@ -230,7 +232,7 @@ public class KafkaEmitterTest
    * event types should be emitted without any drops.
    */
   @Test(timeout = 10_000)
-  public void testDefaultEvents() throws JsonProcessingException, 
InterruptedException
+  public void testDefaultEvents() throws InterruptedException, 
JsonProcessingException
   {
     final KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig(
         "",
@@ -253,12 +255,12 @@ public class KafkaEmitterTest
     );
     final CountDownLatch eventLatch = new CountDownLatch(inputEvents.size());
 
-    final Map<String, List<String>> feedToExpectedEvents = 
trackExpectedEventsPerFeed(
+    final Map<String, List<EventMap>> feedToExpectedEvents = 
trackExpectedEventsPerFeed(
         inputEvents,
         kafkaEmitterConfig.getClusterName(),
         kafkaEmitterConfig.getExtraDimensions()
     );
-    final Map<String, List<String>> feedToActualEvents = 
trackActualEventsPerFeed(eventLatch);
+    final Map<String, List<EventMap>> feedToActualEvents = 
trackActualEventsPerFeed(eventLatch);
 
     emitEvents(kafkaEmitter, inputEvents, eventLatch);
 
@@ -278,7 +280,7 @@ public class KafkaEmitterTest
    * should be emitted, and everything else should be dropped.
    */
   @Test(timeout = 10_000)
-  public void testAlertsPlusUnsubscribedEvents() throws 
JsonProcessingException, InterruptedException
+  public void testAlertsPlusUnsubscribedEvents() throws InterruptedException, 
JsonProcessingException
   {
     final KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig(
         "",
@@ -305,12 +307,12 @@ public class KafkaEmitterTest
 
     final CountDownLatch eventLatch = new CountDownLatch(ALERT_EVENTS.size());
 
-    final Map<String, List<String>> feedToExpectedEvents = 
trackExpectedEventsPerFeed(
+    final Map<String, List<EventMap>> feedToExpectedEvents = 
trackExpectedEventsPerFeed(
         ALERT_EVENTS,
         kafkaEmitterConfig.getClusterName(),
         kafkaEmitterConfig.getExtraDimensions()
     );
-    final Map<String, List<String>> feedToActualEvents = 
trackActualEventsPerFeed(eventLatch);
+    final Map<String, List<EventMap>> feedToActualEvents = 
trackActualEventsPerFeed(eventLatch);
 
     emitEvents(kafkaEmitter, inputEvents, eventLatch);
 
@@ -335,7 +337,7 @@ public class KafkaEmitterTest
    * </p>
    */
   @Test(timeout = 10_000)
-  public void testAllEventsWithCommonTopic() throws JsonProcessingException, 
InterruptedException
+  public void testAllEventsWithCommonTopic() throws InterruptedException, 
JsonProcessingException
   {
     final KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig(
         "",
@@ -361,12 +363,12 @@ public class KafkaEmitterTest
 
     final CountDownLatch eventLatch = new CountDownLatch(inputEvents.size());
 
-    final Map<String, List<String>> feedToExpectedEvents = 
trackExpectedEventsPerFeed(
+    final Map<String, List<EventMap>> feedToExpectedEvents = 
trackExpectedEventsPerFeed(
         inputEvents,
         kafkaEmitterConfig.getClusterName(),
         kafkaEmitterConfig.getExtraDimensions()
     );
-    final Map<String, List<String>> feedToActualEvents = 
trackActualEventsPerFeed(eventLatch);
+    final Map<String, List<EventMap>> feedToActualEvents = 
trackActualEventsPerFeed(eventLatch);
 
     emitEvents(kafkaEmitter, inputEvents, eventLatch);
 
@@ -386,7 +388,7 @@ public class KafkaEmitterTest
    * </p>
    */
   @Test(timeout = 10_000)
-  public void testUnknownEvents() throws JsonProcessingException, 
InterruptedException
+  public void testUnknownEvents() throws InterruptedException, 
JsonProcessingException
   {
     final KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig(
         "",
@@ -410,12 +412,12 @@ public class KafkaEmitterTest
 
     final CountDownLatch eventLatch = new 
CountDownLatch(SERVICE_METRIC_EVENTS.size());
 
-    final Map<String, List<String>> feedToExpectedEvents = 
trackExpectedEventsPerFeed(
+    final Map<String, List<EventMap>> feedToExpectedEvents = 
trackExpectedEventsPerFeed(
         SERVICE_METRIC_EVENTS,
         kafkaEmitterConfig.getClusterName(),
         kafkaEmitterConfig.getExtraDimensions()
     );
-    final Map<String, List<String>> feedToActualEvents = 
trackActualEventsPerFeed(eventLatch);
+    final Map<String, List<EventMap>> feedToActualEvents = 
trackActualEventsPerFeed(eventLatch);
 
     emitEvents(kafkaEmitter, inputEvents, eventLatch);
 
@@ -443,7 +445,7 @@ public class KafkaEmitterTest
     );
 
     final ImmutableMap<String, String> extraDimensions = 
ImmutableMap.of("clusterId", "cluster-101");
-    final Map<String, List<String>> feedToAllEventsBeforeDrop = 
trackExpectedEventsPerFeed(
+    final Map<String, List<EventMap>> feedToAllEventsBeforeDrop = 
trackExpectedEventsPerFeed(
         inputEvents,
         null,
         extraDimensions
@@ -469,15 +471,15 @@ public class KafkaEmitterTest
     // we should track the minimum buffer size per feed, compute the global 
maximum across all the feeds and prune the
     // expected set of events accordingly. For the sake of testing simplicity, 
we skip that for now.
     int totalBufferSize = 0;
-    for (final List<String> feedEvents : feedToAllEventsBeforeDrop.values()) {
+    for (final List<EventMap> feedEvents : feedToAllEventsBeforeDrop.values()) 
{
       for (int idx = 0; idx < feedEvents.size() - bufferEventsDrop; idx++) {
-        totalBufferSize += 
feedEvents.get(idx).getBytes(StandardCharsets.UTF_8).length;
+        totalBufferSize += 
MAPPER.writeValueAsString(feedEvents.get(idx)).getBytes(StandardCharsets.UTF_8).length;
       }
     }
 
-    final Map<String, List<String>> feedToExpectedEvents = new HashMap<>();
-    for (final Map.Entry<String, List<String>> expectedEvent : 
feedToAllEventsBeforeDrop.entrySet()) {
-      List<String> expectedEvents = expectedEvent.getValue();
+    final Map<String, List<EventMap>> feedToExpectedEvents = new HashMap<>();
+    for (final Map.Entry<String, List<EventMap>> expectedEvent : 
feedToAllEventsBeforeDrop.entrySet()) {
+      List<EventMap> expectedEvents = expectedEvent.getValue();
       feedToExpectedEvents.put(expectedEvent.getKey(), 
expectedEvents.subList(0, expectedEvents.size() - bufferEventsDrop));
     }
 
@@ -497,7 +499,7 @@ public class KafkaEmitterTest
     final KafkaEmitter kafkaEmitter = initKafkaEmitter(kafkaEmitterConfig);
 
     final CountDownLatch eventLatch = new CountDownLatch(inputEvents.size() - 
bufferEventsDrop);
-    final Map<String, List<String>> feedToActualEvents = 
trackActualEventsPerFeed(eventLatch);
+    final Map<String, List<EventMap>> feedToActualEvents = 
trackActualEventsPerFeed(eventLatch);
 
     emitEvents(kafkaEmitter, inputEvents, eventLatch);
 
@@ -554,19 +556,20 @@ public class KafkaEmitterTest
     return flattenedList;
   }
 
-  private Map<String, List<String>> trackActualEventsPerFeed(
+  private Map<String, List<EventMap>> trackActualEventsPerFeed(
       final CountDownLatch eventLatch
   )
   {
+
     // A concurrent hashmap because the producer callback can trigger 
concurrently and can override the map initialization
-    final ConcurrentHashMap<String, List<String>> feedToActualEvents = new 
ConcurrentHashMap<>();
+    final ConcurrentHashMap<String, List<EventMap>> feedToActualEvents = new 
ConcurrentHashMap<>();
     when(producer.send(any(), any())).then((invocation) -> {
       final ProducerRecord<?, ?> producerRecord = invocation.getArgument(0);
       final String value = String.valueOf(producerRecord.value());
       final EventMap eventMap = MAPPER.readValue(value, EventMap.class);
       feedToActualEvents.computeIfAbsent(
           (String) eventMap.get("feed"), k -> new ArrayList<>()
-      ).add(value);
+      ).add(eventMap);
 
       eventLatch.countDown();
       return null;
@@ -574,38 +577,37 @@ public class KafkaEmitterTest
     return feedToActualEvents;
   }
 
-  private Map<String, List<String>> trackExpectedEventsPerFeed(
+  private Map<String, List<EventMap>> trackExpectedEventsPerFeed(
       final List<Event> events,
       final String clusterName,
       final Map<String, String> extraDimensions
   ) throws JsonProcessingException
   {
-    final Map<String, List<String>> feedToExpectedEvents = new HashMap<>();
+    final Map<String, List<EventMap>> feedToExpectedEvents = new HashMap<>();
     for (final Event event : events) {
-      final EventMap eventMap = event.toMap();
+      final EventMap eventMap = 
MAPPER.readValue(MAPPER.writeValueAsString(event.toMap()), EventMap.class);
       eventMap.computeIfAbsent("clusterName", k -> clusterName);
       if (extraDimensions != null) {
         eventMap.putAll(extraDimensions);
       }
       feedToExpectedEvents.computeIfAbsent(
-          event.getFeed(), k -> new 
ArrayList<>()).add(MAPPER.writeValueAsString(eventMap)
-      );
+          event.getFeed(), k -> new ArrayList<>()).add(eventMap);
     }
     return feedToExpectedEvents;
   }
 
   private void validateEvents(
-      final Map<String, List<String>> feedToExpectedEvents,
-      final Map<String, List<String>> feedToActualEvents
+      final Map<String, List<EventMap>> feedToExpectedEvents,
+      final Map<String, List<EventMap>> feedToActualEvents
   )
   {
     Assert.assertEquals(feedToExpectedEvents.size(), 
feedToActualEvents.size());
 
-    for (final Map.Entry<String, List<String>> actualEntry : 
feedToActualEvents.entrySet()) {
+    for (final Map.Entry<String, List<EventMap>> actualEntry : 
feedToActualEvents.entrySet()) {
       final String feed = actualEntry.getKey();
-      final List<String> actualEvents = actualEntry.getValue();
-      final List<String> expectedEvents = feedToExpectedEvents.get(feed);
-      Assert.assertEquals(expectedEvents, actualEvents);
+      final List<EventMap> actualEvents = actualEntry.getValue();
+      final List<EventMap> expectedEvents = feedToExpectedEvents.get(feed);
+      assertThat(actualEvents, containsInAnyOrder(expectedEvents.toArray(new 
Map[0])));
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to