abhishekrb19 commented on code in PR #14281:
URL: https://github.com/apache/druid/pull/14281#discussion_r1201229476
##########
extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java:
##########
@@ -183,24 +202,31 @@ public void emit(final Event event)
resultJson,
StringUtils.toUtf8(resultJson).length
);
+
+ Set<EventType> eventTypes = config.getEventTypes();
if (event instanceof ServiceMetricEvent) {
- if (!metricQueue.offer(objectContainer)) {
+ if (!eventTypes.contains(EventType.METRICS) ||
!metricQueue.offer(objectContainer)) {
metricLost.incrementAndGet();
}
} else if (event instanceof AlertEvent) {
- if (!alertQueue.offer(objectContainer)) {
+ if (!eventTypes.contains(EventType.ALERTS) ||
!alertQueue.offer(objectContainer)) {
alertLost.incrementAndGet();
}
} else if (event instanceof RequestLogEvent) {
- if (config.getRequestTopic() == null ||
!requestQueue.offer(objectContainer)) {
+ if (!eventTypes.contains(EventType.REQUESTS) ||
!requestQueue.offer(objectContainer)) {
requestLost.incrementAndGet();
}
+ } else if (event instanceof SegmentMetadataEvent) {
+ if (!eventTypes.contains(EventType.SEGMENTMETADATA) ||
!segmentMetadataQueue.offer(objectContainer)) {
+ segmentMetadataLost.incrementAndGet();
+ }
} else {
invalidLost.incrementAndGet();
}
}
- catch (JsonProcessingException e) {
Review Comment:
Do we need this wider catch block or should `JsonProcessingException`
suffice?
##########
extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java:
##########
@@ -47,20 +51,23 @@
@RunWith(Parameterized.class)
public class KafkaEmitterTest
{
- @Parameterized.Parameter
+ @Parameterized.Parameter(0)
+ public Set<KafkaEmitterConfig.EventType> eventsType;
+
+ @Parameterized.Parameter(1)
public String requestTopic;
- @Parameterized.Parameters(name = "{index}: requestTopic - {0}")
+ @Parameterized.Parameters(name = "{index}: eventTypes - {0}, requestTopic -
{1}")
public static Object[] data()
{
- return new Object[] {
- "requests",
- null
+ return new Object[][] {
+ {new HashSet<>(Arrays.asList(KafkaEmitterConfig.EventType.METRICS,
KafkaEmitterConfig.EventType.REQUESTS, KafkaEmitterConfig.EventType.ALERTS,
KafkaEmitterConfig.EventType.SEGMENTMETADATA)), "requests"},
+ {new HashSet<>(Arrays.asList(KafkaEmitterConfig.EventType.METRICS,
KafkaEmitterConfig.EventType.ALERTS,
KafkaEmitterConfig.EventType.SEGMENTMETADATA)), null}
};
}
// there is 1 seconds wait in kafka emitter before it starts sending events
to broker, set a timeout for 5 seconds
Review Comment:
Do we see this test fail sometimes without this bump in timeout?
```suggestion
// there is 1 seconds wait in kafka emitter before it starts sending
events to broker, set a timeout for 10 seconds
```
##########
extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java:
##########
@@ -21,53 +21,114 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.kafka.clients.producer.ProducerConfig;
import javax.annotation.Nullable;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
public class KafkaEmitterConfig
{
+ public enum EventType
+ {
+ METRICS,
+ ALERTS,
+ REQUESTS,
+ SEGMENTMETADATA {
Review Comment:
nit: for readability, I'd suggest `SEGMENT_METADATA`. Also, I think with
that we won't need a custom enum `toString()`
##########
extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java:
##########
@@ -183,24 +202,31 @@ public void emit(final Event event)
resultJson,
StringUtils.toUtf8(resultJson).length
);
+
+ Set<EventType> eventTypes = config.getEventTypes();
if (event instanceof ServiceMetricEvent) {
- if (!metricQueue.offer(objectContainer)) {
+ if (!eventTypes.contains(EventType.METRICS) ||
!metricQueue.offer(objectContainer)) {
metricLost.incrementAndGet();
}
} else if (event instanceof AlertEvent) {
- if (!alertQueue.offer(objectContainer)) {
+ if (!eventTypes.contains(EventType.ALERTS) ||
!alertQueue.offer(objectContainer)) {
alertLost.incrementAndGet();
}
} else if (event instanceof RequestLogEvent) {
- if (config.getRequestTopic() == null ||
!requestQueue.offer(objectContainer)) {
+ if (!eventTypes.contains(EventType.REQUESTS) ||
!requestQueue.offer(objectContainer)) {
requestLost.incrementAndGet();
}
+ } else if (event instanceof SegmentMetadataEvent) {
+ if (!eventTypes.contains(EventType.SEGMENTMETADATA) ||
!segmentMetadataQueue.offer(objectContainer)) {
+ segmentMetadataLost.incrementAndGet();
+ }
} else {
invalidLost.incrementAndGet();
}
}
- catch (JsonProcessingException e) {
+ catch (Exception e) {
invalidLost.incrementAndGet();
+ log.warn(e, "Exception while serializing event");
Review Comment:
Related to the above comment, this log line only applies to
`JsonProcessingException`s. I wonder if we should log it as an error and make
it general enough instead of "serializing event".
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]