harinirajendran commented on code in PR #14281:
URL: https://github.com/apache/druid/pull/14281#discussion_r1202558908
##########
extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java:
##########
@@ -47,20 +51,23 @@
@RunWith(Parameterized.class)
public class KafkaEmitterTest
{
- @Parameterized.Parameter
+ @Parameterized.Parameter(0)
+ public Set<KafkaEmitterConfig.EventType> eventsType;
+
+ @Parameterized.Parameter(1)
public String requestTopic;
- @Parameterized.Parameters(name = "{index}: requestTopic - {0}")
+ @Parameterized.Parameters(name = "{index}: eventTypes - {0}, requestTopic -
{1}")
public static Object[] data()
{
- return new Object[] {
- "requests",
- null
+ return new Object[][] {
+ {new HashSet<>(Arrays.asList(KafkaEmitterConfig.EventType.METRICS,
KafkaEmitterConfig.EventType.REQUESTS, KafkaEmitterConfig.EventType.ALERTS,
KafkaEmitterConfig.EventType.SEGMENTMETADATA)), "requests"},
+ {new HashSet<>(Arrays.asList(KafkaEmitterConfig.EventType.METRICS,
KafkaEmitterConfig.EventType.ALERTS,
KafkaEmitterConfig.EventType.SEGMENTMETADATA)), null}
};
}
// there is 1 seconds wait in kafka emitter before it starts sending events
to broker, set a timeout for 5 seconds
Review Comment:
yes. atleast in my laptop it did fail a few times.
##########
extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java:
##########
@@ -183,24 +202,31 @@ public void emit(final Event event)
resultJson,
StringUtils.toUtf8(resultJson).length
);
+
+ Set<EventType> eventTypes = config.getEventTypes();
if (event instanceof ServiceMetricEvent) {
- if (!metricQueue.offer(objectContainer)) {
+ if (!eventTypes.contains(EventType.METRICS) ||
!metricQueue.offer(objectContainer)) {
metricLost.incrementAndGet();
}
} else if (event instanceof AlertEvent) {
- if (!alertQueue.offer(objectContainer)) {
+ if (!eventTypes.contains(EventType.ALERTS) ||
!alertQueue.offer(objectContainer)) {
alertLost.incrementAndGet();
}
} else if (event instanceof RequestLogEvent) {
- if (config.getRequestTopic() == null ||
!requestQueue.offer(objectContainer)) {
+ if (!eventTypes.contains(EventType.REQUESTS) ||
!requestQueue.offer(objectContainer)) {
requestLost.incrementAndGet();
}
+ } else if (event instanceof SegmentMetadataEvent) {
+ if (!eventTypes.contains(EventType.SEGMENTMETADATA) ||
!segmentMetadataQueue.offer(objectContainer)) {
+ segmentMetadataLost.incrementAndGet();
+ }
} else {
invalidLost.incrementAndGet();
}
}
- catch (JsonProcessingException e) {
+ catch (Exception e) {
invalidLost.incrementAndGet();
+ log.warn(e, "Exception while serializing event");
Review Comment:
now that we are not doing protobuf, I changed it back to
JsonProcessingException. So, leaving this line as is.
##########
extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java:
##########
@@ -183,24 +202,31 @@ public void emit(final Event event)
resultJson,
StringUtils.toUtf8(resultJson).length
);
+
+ Set<EventType> eventTypes = config.getEventTypes();
if (event instanceof ServiceMetricEvent) {
- if (!metricQueue.offer(objectContainer)) {
+ if (!eventTypes.contains(EventType.METRICS) ||
!metricQueue.offer(objectContainer)) {
metricLost.incrementAndGet();
}
} else if (event instanceof AlertEvent) {
- if (!alertQueue.offer(objectContainer)) {
+ if (!eventTypes.contains(EventType.ALERTS) ||
!alertQueue.offer(objectContainer)) {
alertLost.incrementAndGet();
}
} else if (event instanceof RequestLogEvent) {
- if (config.getRequestTopic() == null ||
!requestQueue.offer(objectContainer)) {
+ if (!eventTypes.contains(EventType.REQUESTS) ||
!requestQueue.offer(objectContainer)) {
requestLost.incrementAndGet();
}
+ } else if (event instanceof SegmentMetadataEvent) {
+ if (!eventTypes.contains(EventType.SEGMENTMETADATA) ||
!segmentMetadataQueue.offer(objectContainer)) {
+ segmentMetadataLost.incrementAndGet();
+ }
} else {
invalidLost.incrementAndGet();
}
}
- catch (JsonProcessingException e) {
Review Comment:
Changed it back
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]