harinirajendran commented on code in PR #14281:
URL: https://github.com/apache/druid/pull/14281#discussion_r1201042926


##########
extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java:
##########
@@ -21,53 +21,115 @@
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonValue;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.kafka.clients.producer.ProducerConfig;
 
 import javax.annotation.Nullable;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 
 public class KafkaEmitterConfig
 {
+  public enum EventType
+  {
+    METRICS,
+    ALERTS,
+    REQUESTS,
+    SEGMENTMETADATA {
+      @Override
+      public String toString()
+      {
+        return "segmentMetadata";
+      }
+    };
+
+    @JsonValue
+    @Override
+    public String toString()
+    {
+      return StringUtils.toLowerCase(this.name());
+    }
 
+    @JsonCreator
+    public static EventType fromString(String name)
+    {
+      return valueOf(StringUtils.toUpperCase(name));
+    }
+  }
+
+  public static final Set<EventType> DEFAULT_EVENT_TYPES = 
ImmutableSet.of(EventType.ALERTS, EventType.METRICS);
   @JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)
   private final String bootstrapServers;
-  @JsonProperty("metric.topic")
+  @Nullable @JsonProperty("event.types")
+  private final Set<EventType> eventTypes;
+  @Nullable @JsonProperty("metric.topic")
   private final String metricTopic;
-  @JsonProperty("alert.topic")
+  @Nullable @JsonProperty("alert.topic")
   private final String alertTopic;
   @Nullable @JsonProperty("request.topic")
   private final String requestTopic;
+  @Nullable @JsonProperty("segmentMetadata.topic")
+  private final String segmentMetadataTopic;
   @JsonProperty
   private final String clusterName;
   @JsonProperty("producer.config")
-  private Map<String, String> kafkaProducerConfig;
+  private final Map<String, String> kafkaProducerConfig;
 
   @JsonCreator
   public KafkaEmitterConfig(
       @JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) String 
bootstrapServers,
-      @JsonProperty("metric.topic") String metricTopic,
-      @JsonProperty("alert.topic") String alertTopic,
+      @Nullable @JsonProperty("event.types") Set<EventType> eventTypes,
+      @Nullable @JsonProperty("metric.topic") String metricTopic,
+      @Nullable @JsonProperty("alert.topic") String alertTopic,
       @Nullable @JsonProperty("request.topic") String requestTopic,
+      @Nullable @JsonProperty("segmentMetadata.topic") String 
segmentMetadataTopic,
       @JsonProperty("clusterName") String clusterName,
       @JsonProperty("producer.config") @Nullable Map<String, String> 
kafkaProducerConfig
   )
   {
     this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers, 
"bootstrap.servers can not be null");
-    this.metricTopic = Preconditions.checkNotNull(metricTopic, "metric.topic 
can not be null");
-    this.alertTopic = Preconditions.checkNotNull(alertTopic, "alert.topic can 
not be null");
-    this.requestTopic = requestTopic;
+    this.eventTypes = validateEventTypes(eventTypes, requestTopic);
+
+    this.metricTopic = this.eventTypes.contains(EventType.METRICS) ? 
Preconditions.checkNotNull(metricTopic, "metric.topic can not be null") : null;
+    this.alertTopic = this.eventTypes.contains(EventType.ALERTS) ? 
Preconditions.checkNotNull(alertTopic, "alert.topic can not be null") : null;
+    this.requestTopic = this.eventTypes.contains(EventType.REQUESTS) ? 
Preconditions.checkNotNull(requestTopic, "request.topic can not be null") : 
null;
+    this.segmentMetadataTopic = 
this.eventTypes.contains(EventType.SEGMENTMETADATA) ? 
Preconditions.checkNotNull(segmentMetadataTopic, "segmentMetadata.topic can not 
be null") : null;
     this.clusterName = clusterName;
     this.kafkaProducerConfig = kafkaProducerConfig == null ? ImmutableMap.of() 
: kafkaProducerConfig;
   }
 
+  private Set<EventType> validateEventTypes(Set<EventType> eventTypes, String 
requestTopic)

Review Comment:
   Renamed it to `maybeUpdateEventTypes` 



##########
extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java:
##########
@@ -105,7 +111,7 @@ protected Producer<String, String> setKafkaProducer()
       Properties props = new Properties();
       props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
config.getBootstrapServers());
       props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
-      props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
+      props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());

Review Comment:
   I changed it byte[] to handle the protobuf format. Nw that we are not adding 
protobuf support, reverted it back to string.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to