abhishekrb19 commented on code in PR #14281:
URL: https://github.com/apache/druid/pull/14281#discussion_r1201229476


##########
extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java:
##########
@@ -183,24 +202,31 @@ public void emit(final Event event)
             resultJson,
             StringUtils.toUtf8(resultJson).length
         );
+
+        Set<EventType> eventTypes = config.getEventTypes();
         if (event instanceof ServiceMetricEvent) {
-          if (!metricQueue.offer(objectContainer)) {
+          if (!eventTypes.contains(EventType.METRICS) || 
!metricQueue.offer(objectContainer)) {
             metricLost.incrementAndGet();
           }
         } else if (event instanceof AlertEvent) {
-          if (!alertQueue.offer(objectContainer)) {
+          if (!eventTypes.contains(EventType.ALERTS) || 
!alertQueue.offer(objectContainer)) {
             alertLost.incrementAndGet();
           }
         } else if (event instanceof RequestLogEvent) {
-          if (config.getRequestTopic() == null || 
!requestQueue.offer(objectContainer)) {
+          if (!eventTypes.contains(EventType.REQUESTS) || 
!requestQueue.offer(objectContainer)) {
             requestLost.incrementAndGet();
           }
+        } else if (event instanceof SegmentMetadataEvent) {
+          if (!eventTypes.contains(EventType.SEGMENTMETADATA) || 
!segmentMetadataQueue.offer(objectContainer)) {
+            segmentMetadataLost.incrementAndGet();
+          }
         } else {
           invalidLost.incrementAndGet();
         }
       }
-      catch (JsonProcessingException e) {

Review Comment:
   Do we need this wider catch block or should `JsonProcessingException` 
suffice?



##########
extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java:
##########
@@ -47,20 +51,23 @@
 @RunWith(Parameterized.class)
 public class KafkaEmitterTest
 {
-  @Parameterized.Parameter
+  @Parameterized.Parameter(0)
+  public Set<KafkaEmitterConfig.EventType> eventsType;
+
+  @Parameterized.Parameter(1)
   public String requestTopic;
 
-  @Parameterized.Parameters(name = "{index}: requestTopic - {0}")
+  @Parameterized.Parameters(name = "{index}: eventTypes - {0}, requestTopic - 
{1}")
   public static Object[] data()
   {
-    return new Object[] {
-        "requests",
-        null
+    return new Object[][] {
+        {new HashSet<>(Arrays.asList(KafkaEmitterConfig.EventType.METRICS, 
KafkaEmitterConfig.EventType.REQUESTS, KafkaEmitterConfig.EventType.ALERTS, 
KafkaEmitterConfig.EventType.SEGMENTMETADATA)), "requests"},
+        {new HashSet<>(Arrays.asList(KafkaEmitterConfig.EventType.METRICS, 
KafkaEmitterConfig.EventType.ALERTS, 
KafkaEmitterConfig.EventType.SEGMENTMETADATA)), null}
     };
   }
 
   // there is 1 seconds wait in kafka emitter before it starts sending events 
to broker, set a timeout for 5 seconds

Review Comment:
   Do we see this test fail sometimes without this bump in timeout?
   
   ```suggestion
     // there is 1 seconds wait in kafka emitter before it starts sending 
events to broker, set a timeout for 10 seconds
   ```



##########
extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java:
##########
@@ -21,53 +21,114 @@
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonValue;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.kafka.clients.producer.ProducerConfig;
 
 import javax.annotation.Nullable;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 
 public class KafkaEmitterConfig
 {
+  public enum EventType
+  {
+    METRICS,
+    ALERTS,
+    REQUESTS,
+    SEGMENTMETADATA {

Review Comment:
   nit: for readability, I'd suggest `SEGMENT_METADATA`. Also, I think with 
that we won't need a custom enum `toString()` 



##########
extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java:
##########
@@ -183,24 +202,31 @@ public void emit(final Event event)
             resultJson,
             StringUtils.toUtf8(resultJson).length
         );
+
+        Set<EventType> eventTypes = config.getEventTypes();
         if (event instanceof ServiceMetricEvent) {
-          if (!metricQueue.offer(objectContainer)) {
+          if (!eventTypes.contains(EventType.METRICS) || 
!metricQueue.offer(objectContainer)) {
             metricLost.incrementAndGet();
           }
         } else if (event instanceof AlertEvent) {
-          if (!alertQueue.offer(objectContainer)) {
+          if (!eventTypes.contains(EventType.ALERTS) || 
!alertQueue.offer(objectContainer)) {
             alertLost.incrementAndGet();
           }
         } else if (event instanceof RequestLogEvent) {
-          if (config.getRequestTopic() == null || 
!requestQueue.offer(objectContainer)) {
+          if (!eventTypes.contains(EventType.REQUESTS) || 
!requestQueue.offer(objectContainer)) {
             requestLost.incrementAndGet();
           }
+        } else if (event instanceof SegmentMetadataEvent) {
+          if (!eventTypes.contains(EventType.SEGMENTMETADATA) || 
!segmentMetadataQueue.offer(objectContainer)) {
+            segmentMetadataLost.incrementAndGet();
+          }
         } else {
           invalidLost.incrementAndGet();
         }
       }
-      catch (JsonProcessingException e) {
+      catch (Exception e) {
         invalidLost.incrementAndGet();
+        log.warn(e, "Exception while serializing event");

Review Comment:
   Related to the above comment, this log line only applies to 
`JsonProcessingException`s. I wonder if we should log it as an error and make 
it general enough instead of "serializing event".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to