This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ff6026d30 Adding SegmentMetadataEvent and publishing them via 
KafkaEmitter (#14281)
4ff6026d30 is described below

commit 4ff6026d30e4da53dc0e37bc2279d9e030773787
Author: Harini Rajendran <[email protected]>
AuthorDate: Fri Jun 2 10:58:26 2023 -0500

    Adding SegmentMetadataEvent and publishing them via KafkaEmitter (#14281)
    
    In this PR, we are enhancing KafkaEmitter, to emit metadata about published 
segments (SegmentMetadataEvent) into a Kafka topic. This segment metadata 
information that gets published into Kafka, can be used by any other downstream 
services to query Druid intelligently based on the segments published. The 
segment metadata gets published into kafka topic in json string format similar 
to other events.
---
 .../extensions-contrib/kafka-emitter.md            |  22 +++--
 .../ambari/metrics/AmbariMetricsEmitter.java       |   3 +
 .../emitter/dropwizard/DropwizardEmitter.java      |   3 +
 .../druid/emitter/graphite/GraphiteEmitter.java    |   3 +
 .../apache/druid/emitter/kafka/KafkaEmitter.java   |  48 ++++++++--
 .../druid/emitter/kafka/KafkaEmitterConfig.java    | 101 +++++++++++++++++---
 .../emitter/kafka/KafkaEmitterConfigTest.java      |  41 +++++++--
 .../druid/emitter/kafka/KafkaEmitterTest.java      |  41 +++++++--
 .../actions/SegmentTransactionalInsertAction.java  |  21 +++++
 .../util/emitter/service/SegmentMetadataEvent.java | 102 +++++++++++++++++++++
 .../emitter/service/SegmentMetadataEventTest.java  |  54 +++++++++++
 11 files changed, 394 insertions(+), 45 deletions(-)

diff --git a/docs/development/extensions-contrib/kafka-emitter.md 
b/docs/development/extensions-contrib/kafka-emitter.md
index 3457c249c7..40b63ca73a 100644
--- a/docs/development/extensions-contrib/kafka-emitter.md
+++ b/docs/development/extensions-contrib/kafka-emitter.md
@@ -36,20 +36,26 @@ to monitor the status of your Druid cluster with this 
extension.
 
 All the configuration parameters for the Kafka emitter are under 
`druid.emitter.kafka`.
 
-|property|description|required?|default|
-|--------|-----------|---------|-------|
-|`druid.emitter.kafka.bootstrap.servers`|Comma-separated Kafka broker. 
(`[hostname:port],[hostname:port]...`)|yes|none|
-|`druid.emitter.kafka.metric.topic`|Kafka topic name for emitter's target to 
emit service metric.|yes|none|
-|`druid.emitter.kafka.alert.topic`|Kafka topic name for emitter's target to 
emit alert.|yes|none|
-|`druid.emitter.kafka.request.topic`|Kafka topic name for emitter's target to 
emit request logs. If left empty then request logs will not be sent to the 
Kafka topic.|no|none|
-|`druid.emitter.kafka.producer.config`|JSON formatted configuration which user 
want to set additional properties to Kafka producer.|no|none|
-|`druid.emitter.kafka.clusterName`|Optional value to specify name of your 
druid cluster. It can help make groups in your monitoring environment. |no|none|
+| Property                                           | Description             
                                                                                
                                  | Required | Default               |
+|----------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------|-----------|-----------------------|
+| `druid.emitter.kafka.bootstrap.servers`            | Comma-separated Kafka 
broker. (`[hostname:port],[hostname:port]...`)                                  
                                    | yes       | none                  |
+| `druid.emitter.kafka.event.types`                  | Comma-separated event 
types. <br/>Supported types are `alerts`, `metrics`, `requests`, and 
`segment_metadata`.                            | no        | `["metrics", 
"alerts"]` |
+| `druid.emitter.kafka.metric.topic`                 | Kafka topic name for 
emitter's target to emit service metrics. If `event.types` contains `metrics`, 
this field cannot be empty.           | no        | none                  |
+| `druid.emitter.kafka.alert.topic`                  | Kafka topic name for 
emitter's target to emit alerts. If `event.types` contains `alerts`, this field 
cannot empty.                        | no        | none                  |
+| `druid.emitter.kafka.request.topic`                | Kafka topic name for 
emitter's target to emit request logs. If `event.types` contains `requests`, 
this field cannot be empty.             | no        | none                  |
+| `druid.emitter.kafka.segmentMetadata.topic`        | Kafka topic name for 
emitter's target to emit segment metadata. If `event.types` contains 
`segment_metadata`, this field cannot be empty. | no        | none              
    |
+| `druid.emitter.kafka.producer.config`              | JSON configuration to 
set additional properties to Kafka producer.                                    
                                    | no        | none                  |
+| `druid.emitter.kafka.clusterName`                  | Optional value to 
specify the name of your Druid cluster. It can help make groups in your 
monitoring environment.                         | no        | none              
    |
 
 ### Example
 
 ```
 druid.emitter.kafka.bootstrap.servers=hostname1:9092,hostname2:9092
+druid.emitter.kafka.event.types=["metrics", alerts", "requests", 
"segment_metadata"]
 druid.emitter.kafka.metric.topic=druid-metric
 druid.emitter.kafka.alert.topic=druid-alert
+druid.emitter.kafka.request.topic=druid-request-logs
+druid.emitter.kafka.segmentMetadata.topic=druid-segment-metadata 
 druid.emitter.kafka.producer.config={"max.block.ms":10000}
 ```
+
diff --git 
a/extensions-contrib/ambari-metrics-emitter/src/main/java/org/apache/druid/emitter/ambari/metrics/AmbariMetricsEmitter.java
 
b/extensions-contrib/ambari-metrics-emitter/src/main/java/org/apache/druid/emitter/ambari/metrics/AmbariMetricsEmitter.java
index 905b6cffc0..11dea07585 100644
--- 
a/extensions-contrib/ambari-metrics-emitter/src/main/java/org/apache/druid/emitter/ambari/metrics/AmbariMetricsEmitter.java
+++ 
b/extensions-contrib/ambari-metrics-emitter/src/main/java/org/apache/druid/emitter/ambari/metrics/AmbariMetricsEmitter.java
@@ -26,6 +26,7 @@ import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.emitter.core.Emitter;
 import org.apache.druid.java.util.emitter.core.Event;
 import org.apache.druid.java.util.emitter.service.AlertEvent;
+import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent;
 import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
@@ -137,6 +138,8 @@ public class AmbariMetricsEmitter extends 
AbstractTimelineMetricsSink implements
       for (Emitter emitter : emitterList) {
         emitter.emit(event);
       }
+    } else if (event instanceof SegmentMetadataEvent) {
+      // do nothing. Ignore this event type
     } else {
       throw new ISE("unknown event type [%s]", event.getClass());
     }
diff --git 
a/extensions-contrib/dropwizard-emitter/src/main/java/org/apache/druid/emitter/dropwizard/DropwizardEmitter.java
 
b/extensions-contrib/dropwizard-emitter/src/main/java/org/apache/druid/emitter/dropwizard/DropwizardEmitter.java
index 5baa1b5da2..e22c373f89 100644
--- 
a/extensions-contrib/dropwizard-emitter/src/main/java/org/apache/druid/emitter/dropwizard/DropwizardEmitter.java
+++ 
b/extensions-contrib/dropwizard-emitter/src/main/java/org/apache/druid/emitter/dropwizard/DropwizardEmitter.java
@@ -33,6 +33,7 @@ import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.emitter.core.Emitter;
 import org.apache.druid.java.util.emitter.core.Event;
 import org.apache.druid.java.util.emitter.service.AlertEvent;
+import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent;
 import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 
 import java.util.LinkedHashMap;
@@ -127,6 +128,8 @@ public class DropwizardEmitter implements Emitter
       for (Emitter emitter : alertEmitters) {
         emitter.emit(event);
       }
+    } else if (event instanceof SegmentMetadataEvent) {
+      // do nothing. Ignore this event type
     } else {
       throw new ISE("unknown event type [%s]", event.getClass());
     }
diff --git 
a/extensions-contrib/graphite-emitter/src/main/java/org/apache/druid/emitter/graphite/GraphiteEmitter.java
 
b/extensions-contrib/graphite-emitter/src/main/java/org/apache/druid/emitter/graphite/GraphiteEmitter.java
index b3739ab9d1..10bfe1e869 100644
--- 
a/extensions-contrib/graphite-emitter/src/main/java/org/apache/druid/emitter/graphite/GraphiteEmitter.java
+++ 
b/extensions-contrib/graphite-emitter/src/main/java/org/apache/druid/emitter/graphite/GraphiteEmitter.java
@@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.emitter.core.Emitter;
 import org.apache.druid.java.util.emitter.core.Event;
 import org.apache.druid.java.util.emitter.service.AlertEvent;
+import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent;
 import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.server.log.RequestLogEvent;
 
@@ -139,6 +140,8 @@ public class GraphiteEmitter implements Emitter
           "The following alert is dropped, description is [%s], severity is 
[%s]",
           alertEvent.getDescription(), alertEvent.getSeverity()
       );
+    } else if (event instanceof SegmentMetadataEvent) {
+      // do nothing. Ignore this event type
     } else {
       log.error("unknown event type [%s]", event.getClass());
     }
diff --git 
a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
 
b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
index 129a374b58..dd8f3665f5 100644
--- 
a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
+++ 
b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
@@ -22,6 +22,7 @@ package org.apache.druid.emitter.kafka;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.emitter.kafka.KafkaEmitterConfig.EventType;
 import 
org.apache.druid.emitter.kafka.MemoryBoundLinkedBlockingQueue.ObjectContainer;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
@@ -30,6 +31,7 @@ import org.apache.druid.java.util.emitter.core.Emitter;
 import org.apache.druid.java.util.emitter.core.Event;
 import org.apache.druid.java.util.emitter.core.EventMap;
 import org.apache.druid.java.util.emitter.service.AlertEvent;
+import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent;
 import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.server.log.RequestLogEvent;
 import org.apache.kafka.clients.producer.Callback;
@@ -40,6 +42,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.StringSerializer;
 
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -55,6 +58,7 @@ public class KafkaEmitter implements Emitter
   private final AtomicLong metricLost;
   private final AtomicLong alertLost;
   private final AtomicLong requestLost;
+  private final AtomicLong segmentMetadataLost;
   private final AtomicLong invalidLost;
 
   private final KafkaEmitterConfig config;
@@ -63,6 +67,7 @@ public class KafkaEmitter implements Emitter
   private final MemoryBoundLinkedBlockingQueue<String> metricQueue;
   private final MemoryBoundLinkedBlockingQueue<String> alertQueue;
   private final MemoryBoundLinkedBlockingQueue<String> requestQueue;
+  private final MemoryBoundLinkedBlockingQueue<String> segmentMetadataQueue;
   private final ScheduledExecutorService scheduler;
 
   protected int sendInterval = DEFAULT_SEND_INTERVAL_SECONDS;
@@ -78,10 +83,12 @@ public class KafkaEmitter implements Emitter
     this.metricQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound);
     this.alertQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound);
     this.requestQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound);
+    this.segmentMetadataQueue = new 
MemoryBoundLinkedBlockingQueue<>(queueMemoryBound);
     this.scheduler = Executors.newScheduledThreadPool(4);
     this.metricLost = new AtomicLong(0L);
     this.alertLost = new AtomicLong(0L);
     this.requestLost = new AtomicLong(0L);
+    this.segmentMetadataLost = new AtomicLong(0L);
     this.invalidLost = new AtomicLong(0L);
   }
 
@@ -119,17 +126,25 @@ public class KafkaEmitter implements Emitter
   @Override
   public void start()
   {
-    scheduler.schedule(this::sendMetricToKafka, sendInterval, 
TimeUnit.SECONDS);
-    scheduler.schedule(this::sendAlertToKafka, sendInterval, TimeUnit.SECONDS);
-    if (config.getRequestTopic() != null) {
+    Set<EventType> eventTypes = config.getEventTypes();
+    if (eventTypes.contains(EventType.METRICS)) {
+      scheduler.schedule(this::sendMetricToKafka, sendInterval, 
TimeUnit.SECONDS);
+    }
+    if (eventTypes.contains(EventType.ALERTS)) {
+      scheduler.schedule(this::sendAlertToKafka, sendInterval, 
TimeUnit.SECONDS);
+    }
+    if (eventTypes.contains(EventType.REQUESTS)) {
       scheduler.schedule(this::sendRequestToKafka, sendInterval, 
TimeUnit.SECONDS);
     }
+    if (eventTypes.contains(EventType.SEGMENT_METADATA)) {
+      scheduler.schedule(this::sendSegmentMetadataToKafka, sendInterval, 
TimeUnit.SECONDS);
+    }
     scheduler.scheduleWithFixedDelay(() -> {
-      log.info(
-          "Message lost counter: metricLost=[%d], alertLost=[%d], 
requestLost=[%d], invalidLost=[%d]",
+      log.info("Message lost counter: metricLost=[%d], alertLost=[%d], 
requestLost=[%d], segmentMetadataLost=[%d], invalidLost=[%d]",
           metricLost.get(),
           alertLost.get(),
           requestLost.get(),
+          segmentMetadataLost.get(),
           invalidLost.get()
       );
     }, DEFAULT_SEND_LOST_INTERVAL_MINUTES, DEFAULT_SEND_LOST_INTERVAL_MINUTES, 
TimeUnit.MINUTES);
@@ -151,6 +166,11 @@ public class KafkaEmitter implements Emitter
     sendToKafka(config.getRequestTopic(), requestQueue, 
setProducerCallback(requestLost));
   }
 
+  private void sendSegmentMetadataToKafka()
+  {
+    sendToKafka(config.getSegmentMetadataTopic(), segmentMetadataQueue, 
setProducerCallback(segmentMetadataLost));
+  }
+
   private void sendToKafka(final String topic, 
MemoryBoundLinkedBlockingQueue<String> recordQueue, Callback callback)
   {
     ObjectContainer<String> objectToSend;
@@ -183,24 +203,31 @@ public class KafkaEmitter implements Emitter
             resultJson,
             StringUtils.toUtf8(resultJson).length
         );
+
+        Set<EventType> eventTypes = config.getEventTypes();
         if (event instanceof ServiceMetricEvent) {
-          if (!metricQueue.offer(objectContainer)) {
+          if (!eventTypes.contains(EventType.METRICS) || 
!metricQueue.offer(objectContainer)) {
             metricLost.incrementAndGet();
           }
         } else if (event instanceof AlertEvent) {
-          if (!alertQueue.offer(objectContainer)) {
+          if (!eventTypes.contains(EventType.ALERTS) || 
!alertQueue.offer(objectContainer)) {
             alertLost.incrementAndGet();
           }
         } else if (event instanceof RequestLogEvent) {
-          if (config.getRequestTopic() == null || 
!requestQueue.offer(objectContainer)) {
+          if (!eventTypes.contains(EventType.REQUESTS) || 
!requestQueue.offer(objectContainer)) {
             requestLost.incrementAndGet();
           }
+        } else if (event instanceof SegmentMetadataEvent) {
+          if (!eventTypes.contains(EventType.SEGMENT_METADATA) || 
!segmentMetadataQueue.offer(objectContainer)) {
+            segmentMetadataLost.incrementAndGet();
+          }
         } else {
           invalidLost.incrementAndGet();
         }
       }
       catch (JsonProcessingException e) {
         invalidLost.incrementAndGet();
+        log.warn(e, "Exception while serializing event");
       }
     }
   }
@@ -238,4 +265,9 @@ public class KafkaEmitter implements Emitter
   {
     return invalidLost.get();
   }
+
+  public long getSegmentMetadataLostCount()
+  {
+    return segmentMetadataLost.get();
+  }
 }
diff --git 
a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java
 
b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java
index ed7b9ea0e9..019edd095e 100644
--- 
a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java
+++ 
b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java
@@ -21,53 +21,108 @@ package org.apache.druid.emitter.kafka;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonValue;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.kafka.clients.producer.ProducerConfig;
 
 import javax.annotation.Nullable;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 
 public class KafkaEmitterConfig
 {
+  public enum EventType
+  {
+    METRICS,
+    ALERTS,
+    REQUESTS,
+    SEGMENT_METADATA;
+
+    @JsonValue
+    @Override
+    public String toString()
+    {
+      return StringUtils.toLowerCase(this.name());
+    }
 
+    @JsonCreator
+    public static EventType fromString(String name)
+    {
+      return valueOf(StringUtils.toUpperCase(name));
+    }
+  }
+
+  public static final Set<EventType> DEFAULT_EVENT_TYPES = 
ImmutableSet.of(EventType.ALERTS, EventType.METRICS);
   @JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)
   private final String bootstrapServers;
-  @JsonProperty("metric.topic")
+  @Nullable @JsonProperty("event.types")
+  private final Set<EventType> eventTypes;
+  @Nullable @JsonProperty("metric.topic")
   private final String metricTopic;
-  @JsonProperty("alert.topic")
+  @Nullable @JsonProperty("alert.topic")
   private final String alertTopic;
   @Nullable @JsonProperty("request.topic")
   private final String requestTopic;
+  @Nullable @JsonProperty("segmentMetadata.topic")
+  private final String segmentMetadataTopic;
   @JsonProperty
   private final String clusterName;
   @JsonProperty("producer.config")
-  private Map<String, String> kafkaProducerConfig;
+  private final Map<String, String> kafkaProducerConfig;
 
   @JsonCreator
   public KafkaEmitterConfig(
       @JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) String 
bootstrapServers,
-      @JsonProperty("metric.topic") String metricTopic,
-      @JsonProperty("alert.topic") String alertTopic,
+      @Nullable @JsonProperty("event.types") Set<EventType> eventTypes,
+      @Nullable @JsonProperty("metric.topic") String metricTopic,
+      @Nullable @JsonProperty("alert.topic") String alertTopic,
       @Nullable @JsonProperty("request.topic") String requestTopic,
+      @Nullable @JsonProperty("segmentMetadata.topic") String 
segmentMetadataTopic,
       @JsonProperty("clusterName") String clusterName,
       @JsonProperty("producer.config") @Nullable Map<String, String> 
kafkaProducerConfig
   )
   {
-    this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers, 
"bootstrap.servers can not be null");
-    this.metricTopic = Preconditions.checkNotNull(metricTopic, "metric.topic 
can not be null");
-    this.alertTopic = Preconditions.checkNotNull(alertTopic, "alert.topic can 
not be null");
-    this.requestTopic = requestTopic;
+    this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers, 
"druid.emitter.kafka.bootstrap.servers can not be null");
+    this.eventTypes = maybeUpdateEventTypes(eventTypes, requestTopic);
+    this.metricTopic = this.eventTypes.contains(EventType.METRICS) ? 
Preconditions.checkNotNull(metricTopic, "druid.emitter.kafka.metric.topic can 
not be null") : null;
+    this.alertTopic = this.eventTypes.contains(EventType.ALERTS) ? 
Preconditions.checkNotNull(alertTopic, "druid.emitter.kafka.alert.topic can not 
be null") : null;
+    this.requestTopic = this.eventTypes.contains(EventType.REQUESTS) ? 
Preconditions.checkNotNull(requestTopic, "druid.emitter.kafka.request.topic can 
not be null") : null;
+    this.segmentMetadataTopic = 
this.eventTypes.contains(EventType.SEGMENT_METADATA) ? 
Preconditions.checkNotNull(segmentMetadataTopic, 
"druid.emitter.kafka.segmentMetadata.topic can not be null") : null;
     this.clusterName = clusterName;
     this.kafkaProducerConfig = kafkaProducerConfig == null ? ImmutableMap.of() 
: kafkaProducerConfig;
   }
 
+  private Set<EventType> maybeUpdateEventTypes(Set<EventType> eventTypes, 
String requestTopic)
+  {
+    // Unless explicitly overridden, kafka emitter will always emit metrics 
and alerts
+    if (eventTypes == null) {
+      Set<EventType> defaultEventTypes = new HashSet<>(DEFAULT_EVENT_TYPES);
+      // To maintain backwards compatibility, if eventTypes is not set, then 
requests are sent out or not
+      // based on the `request.topic` config
+      if (requestTopic != null) {
+        defaultEventTypes.add(EventType.REQUESTS);
+      }
+      return defaultEventTypes;
+    }
+    return eventTypes;
+  }
+
   @JsonProperty
   public String getBootstrapServers()
   {
     return bootstrapServers;
   }
 
+  @JsonProperty
+  public Set<EventType> getEventTypes()
+  {
+    return eventTypes;
+  }
+
   @JsonProperty
   public String getMetricTopic()
   {
@@ -92,6 +147,12 @@ public class KafkaEmitterConfig
     return requestTopic;
   }
 
+  @Nullable
+  public String getSegmentMetadataTopic()
+  {
+    return segmentMetadataTopic;
+  }
+
   @JsonProperty
   public Map<String, String> getKafkaProducerConfig()
   {
@@ -113,10 +174,16 @@ public class KafkaEmitterConfig
     if (!getBootstrapServers().equals(that.getBootstrapServers())) {
       return false;
     }
-    if (!getMetricTopic().equals(that.getMetricTopic())) {
+
+    if (getEventTypes() != null ? 
!getEventTypes().equals(that.getEventTypes()) : that.getEventTypes() != null) {
+      return false;
+    }
+
+    if (getMetricTopic() != null ? 
!getMetricTopic().equals(that.getMetricTopic()) : that.getMetricTopic() != 
null) {
       return false;
     }
-    if (!getAlertTopic().equals(that.getAlertTopic())) {
+
+    if (getAlertTopic() != null ? 
!getAlertTopic().equals(that.getAlertTopic()) : that.getAlertTopic() != null) {
       return false;
     }
 
@@ -124,6 +191,10 @@ public class KafkaEmitterConfig
       return false;
     }
 
+    if (getSegmentMetadataTopic() != null ? 
!getSegmentMetadataTopic().equals(that.getSegmentMetadataTopic()) : 
that.getSegmentMetadataTopic() != null) {
+      return false;
+    }
+
     if (getClusterName() != null ? 
!getClusterName().equals(that.getClusterName()) : that.getClusterName() != 
null) {
       return false;
     }
@@ -134,9 +205,11 @@ public class KafkaEmitterConfig
   public int hashCode()
   {
     int result = getBootstrapServers().hashCode();
-    result = 31 * result + getMetricTopic().hashCode();
-    result = 31 * result + getAlertTopic().hashCode();
+    result = 31 * result + (getEventTypes() != null ? 
getEventTypes().hashCode() : 0);
+    result = 31 * result + (getMetricTopic() != null ? 
getMetricTopic().hashCode() : 0);
+    result = 31 * result + (getAlertTopic() != null ? 
getAlertTopic().hashCode() : 0);
     result = 31 * result + (getRequestTopic() != null ? 
getRequestTopic().hashCode() : 0);
+    result = 31 * result + (getSegmentMetadataTopic() != null ? 
getSegmentMetadataTopic().hashCode() : 0);
     result = 31 * result + (getClusterName() != null ? 
getClusterName().hashCode() : 0);
     result = 31 * result + getKafkaProducerConfig().hashCode();
     return result;
@@ -147,9 +220,11 @@ public class KafkaEmitterConfig
   {
     return "KafkaEmitterConfig{" +
            "bootstrap.servers='" + bootstrapServers + '\'' +
+           ", event.types='" + eventTypes + '\'' +
            ", metric.topic='" + metricTopic + '\'' +
            ", alert.topic='" + alertTopic + '\'' +
            ", request.topic='" + requestTopic + '\'' +
+           ", segmentMetadata.topic='" + segmentMetadataTopic + '\'' +
            ", clusterName='" + clusterName + '\'' +
            ", Producer.config=" + kafkaProducerConfig +
            '}';
diff --git 
a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java
 
b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java
index 55ecdbaeb8..c4d5811bcb 100644
--- 
a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java
+++ 
b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java
@@ -19,15 +19,18 @@
 
 package org.apache.druid.emitter.kafka;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.InjectableValues;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.exc.ValueInstantiationException;
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-
 import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
 
 public class KafkaEmitterConfigTest
 {
@@ -42,8 +45,8 @@ public class KafkaEmitterConfigTest
   @Test
   public void testSerDeserKafkaEmitterConfig() throws IOException
   {
-    KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", 
"metricTest",
-        "alertTest", "requestTest",
+    KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", 
null, "metricTest",
+        "alertTest", "requestTest", "metadataTest",
         "clusterNameTest", ImmutableMap.<String, String>builder()
         .put("testKey", "testValue").build()
     );
@@ -56,8 +59,24 @@ public class KafkaEmitterConfigTest
   @Test
   public void testSerDeserKafkaEmitterConfigNullRequestTopic() throws 
IOException
   {
-    KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", 
"metricTest",
-        "alertTest", null,
+    KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", 
null, "metricTest",
+        "alertTest", null, "metadataTest",
+        "clusterNameTest", ImmutableMap.<String, String>builder()
+        .put("testKey", "testValue").build()
+    );
+    String kafkaEmitterConfigString = 
mapper.writeValueAsString(kafkaEmitterConfig);
+    KafkaEmitterConfig kafkaEmitterConfigExpected = 
mapper.readerFor(KafkaEmitterConfig.class)
+        .readValue(kafkaEmitterConfigString);
+    Assert.assertEquals(kafkaEmitterConfigExpected, kafkaEmitterConfig);
+  }
+
+  @Test
+  public void testSerDeserKafkaEmitterConfigNullMetricsTopic() throws 
IOException
+  {
+    Set<KafkaEmitterConfig.EventType> eventTypeSet = new 
HashSet<KafkaEmitterConfig.EventType>();
+    eventTypeSet.add(KafkaEmitterConfig.EventType.SEGMENT_METADATA);
+    KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", 
eventTypeSet, null,
+        null, null, "metadataTest",
         "clusterNameTest", ImmutableMap.<String, String>builder()
         .put("testKey", "testValue").build()
     );
@@ -70,8 +89,8 @@ public class KafkaEmitterConfigTest
   @Test
   public void testSerDeNotRequiredKafkaProducerConfig()
   {
-    KafkaEmitterConfig kafkaEmitterConfig = new 
KafkaEmitterConfig("localhost:9092", "metricTest",
-        "alertTest", null,
+    KafkaEmitterConfig kafkaEmitterConfig = new 
KafkaEmitterConfig("localhost:9092", null, "metricTest",
+        "alertTest", null, "metadataTest",
         "clusterNameTest", null
     );
     try {
@@ -83,6 +102,14 @@ public class KafkaEmitterConfigTest
     }
   }
 
+  @Test
+  public void testDeserializeEventTypesWithDifferentCase() throws 
JsonProcessingException
+  {
+    Assert.assertEquals(KafkaEmitterConfig.EventType.SEGMENT_METADATA, 
mapper.readValue("\"segment_metadata\"", KafkaEmitterConfig.EventType.class));
+    Assert.assertEquals(KafkaEmitterConfig.EventType.ALERTS, 
mapper.readValue("\"alerts\"", KafkaEmitterConfig.EventType.class));
+    Assert.assertThrows(ValueInstantiationException.class, () -> 
mapper.readValue("\"segmentMetadata\"", KafkaEmitterConfig.EventType.class));
+  }
+
   @Test
   public void testJacksonModules()
   {
diff --git 
a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java
 
b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java
index 422d18a7f1..b40da9bd9e 100644
--- 
a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java
+++ 
b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java
@@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableMap;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.emitter.core.Event;
 import org.apache.druid.java.util.emitter.service.AlertEvent;
+import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent;
 import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.server.QueryStats;
 import org.apache.druid.server.RequestLogLine;
@@ -37,7 +38,10 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 
 import static org.mockito.ArgumentMatchers.any;
@@ -47,20 +51,23 @@ import static org.mockito.Mockito.when;
 @RunWith(Parameterized.class)
 public class KafkaEmitterTest
 {
-  @Parameterized.Parameter
+  @Parameterized.Parameter(0)
+  public Set<KafkaEmitterConfig.EventType> eventsType;
+
+  @Parameterized.Parameter(1)
   public String requestTopic;
 
-  @Parameterized.Parameters(name = "{index}: requestTopic - {0}")
+  @Parameterized.Parameters(name = "{index}: eventTypes - {0}, requestTopic - 
{1}")
   public static Object[] data()
   {
-    return new Object[] {
-        "requests",
-        null
+    return new Object[][] {
+        {new HashSet<>(Arrays.asList(KafkaEmitterConfig.EventType.METRICS, 
KafkaEmitterConfig.EventType.REQUESTS, KafkaEmitterConfig.EventType.ALERTS, 
KafkaEmitterConfig.EventType.SEGMENT_METADATA)), "requests"},
+        {new HashSet<>(Arrays.asList(KafkaEmitterConfig.EventType.METRICS, 
KafkaEmitterConfig.EventType.ALERTS, 
KafkaEmitterConfig.EventType.SEGMENT_METADATA)), null}
     };
   }
 
-  // there is 1 seconds wait in kafka emitter before it starts sending events 
to broker, set a timeout for 5 seconds
-  @Test(timeout = 5_000)
+  // there is 1 seconds wait in kafka emitter before it starts sending events 
to broker, set a timeout for 10 seconds
+  @Test(timeout = 10_000)
   public void testKafkaEmitter() throws InterruptedException
   {
     final List<ServiceMetricEvent> serviceMetricEvents = ImmutableList.of(
@@ -77,14 +84,26 @@ public class KafkaEmitterTest
         ).build("service", "host")
     );
 
-    int totalEvents = serviceMetricEvents.size() + alertEvents.size() + 
requestLogEvents.size();
+    final List<SegmentMetadataEvent> segmentMetadataEvents = ImmutableList.of(
+        new SegmentMetadataEvent(
+            "dummy_datasource",
+            DateTimes.of("2001-01-01T00:00:00.000Z"),
+            DateTimes.of("2001-01-02T00:00:00.000Z"),
+            DateTimes.of("2001-01-03T00:00:00.000Z"),
+            "dummy_version",
+            true
+        )
+    );
+
+    int totalEvents = serviceMetricEvents.size() + alertEvents.size() + 
requestLogEvents.size() + segmentMetadataEvents.size();
     int totalEventsExcludingRequestLogEvents = totalEvents - 
requestLogEvents.size();
 
     final CountDownLatch countDownSentEvents = new CountDownLatch(
         requestTopic == null ? totalEventsExcludingRequestLogEvents : 
totalEvents);
+
     final KafkaProducer<String, String> producer = mock(KafkaProducer.class);
     final KafkaEmitter kafkaEmitter = new KafkaEmitter(
-        new KafkaEmitterConfig("", "metrics", "alerts", requestTopic, 
"test-cluster", null),
+        new KafkaEmitterConfig("", eventsType, "metrics", "alerts", 
requestTopic, "metadata", "test-cluster", null),
         new ObjectMapper()
     )
     {
@@ -113,10 +132,14 @@ public class KafkaEmitterTest
     for (Event event : requestLogEvents) {
       kafkaEmitter.emit(event);
     }
+    for (Event event : segmentMetadataEvents) {
+      kafkaEmitter.emit(event);
+    }
     countDownSentEvents.await();
 
     Assert.assertEquals(0, kafkaEmitter.getMetricLostCount());
     Assert.assertEquals(0, kafkaEmitter.getAlertLostCount());
+    Assert.assertEquals(0, kafkaEmitter.getSegmentMetadataLostCount());
     Assert.assertEquals(requestTopic == null ? requestLogEvents.size() : 0, 
kafkaEmitter.getRequestLostCount());
     Assert.assertEquals(0, kafkaEmitter.getInvalidLostCount());
   }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
index 233739eb7b..a0567dce04 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
@@ -33,10 +33,13 @@ import org.apache.druid.indexing.overlord.CriticalAction;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
 import org.apache.druid.indexing.overlord.SegmentPublishResult;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent;
 import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.query.DruidMetrics;
 import org.apache.druid.segment.SegmentUtils;
 import org.apache.druid.timeline.DataSegment;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
@@ -257,11 +260,29 @@ public class SegmentTransactionalInsertAction implements 
TaskAction<SegmentPubli
           segment.getShardSpec() == null ? null : 
segment.getShardSpec().getType()
       );
       toolbox.getEmitter().emit(metricBuilder.build("segment/added/bytes", 
segment.getSize()));
+      // Emit the segment related metadata using the configured emitters.
+      // There is a possibility that some segments' metadata event might get 
missed if the
+      // server crashes after commiting segment but before emitting the event.
+      this.emitSegmentMetadata(segment, toolbox);
     }
 
     return retVal;
   }
 
+  private void emitSegmentMetadata(DataSegment segment, TaskActionToolbox 
toolbox)
+  {
+    SegmentMetadataEvent event = new SegmentMetadataEvent(
+        segment.getDataSource(),
+        DateTime.now(DateTimeZone.UTC),
+        segment.getInterval().getStart(),
+        segment.getInterval().getEnd(),
+        segment.getVersion(),
+        segment.getLastCompactionState() != null
+    );
+
+    toolbox.getEmitter().emit(event);
+  }
+
   private void checkWithSegmentLock()
   {
     final Map<Interval, List<DataSegment>> oldSegmentsMap = 
groupSegmentsByIntervalAndSort(segmentsToBeOverwritten);
diff --git 
a/processing/src/main/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEvent.java
 
b/processing/src/main/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEvent.java
new file mode 100644
index 0000000000..bc3769b623
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEvent.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.emitter.service;
+
+import com.fasterxml.jackson.annotation.JsonValue;
+import org.apache.druid.java.util.emitter.core.Event;
+import org.apache.druid.java.util.emitter.core.EventMap;
+import org.joda.time.DateTime;
+
+/**
+ * The event that gets generated whenever a segment is committed
+ */
+public class SegmentMetadataEvent implements Event
+{
+  public static final String FEED = "feed";
+  public static final String DATASOURCE = "dataSource";
+  public static final String CREATED_TIME = "createdTime";
+  public static final String START_TIME = "startTime";
+  public static final String END_TIME = "endTime";
+  public static final String VERSION = "version";
+  public static final String IS_COMPACTED = "isCompacted";
+
+  /**
+   * Time at which the segment metadata event is created
+   */
+  private final DateTime createdTime;
+  /**
+   * Datasource for which the segment is committed
+   */
+  private final String dataSource;
+  /**
+   * Start interval of the committed segment
+   */
+  private final DateTime startTime;
+  /**
+   * End interval of the committed segment
+   */
+  private final DateTime endTime;
+  /**
+   * Version of the committed segment
+   */
+  private final String version;
+  /**
+   * Is the segment, a compacted segment or not
+   */
+  private final boolean isCompacted;
+
+  public SegmentMetadataEvent(
+      String dataSource,
+      DateTime createdTime,
+      DateTime startTime,
+      DateTime endTime,
+      String version,
+      boolean isCompacted
+  )
+  {
+    this.dataSource = dataSource;
+    this.createdTime = createdTime;
+    this.startTime = startTime;
+    this.endTime = endTime;
+    this.version = version;
+    this.isCompacted = isCompacted;
+  }
+
+  @Override
+  public String getFeed()
+  {
+    return "segment_metadata";
+  }
+  @Override
+  @JsonValue
+  public EventMap toMap()
+  {
+
+    return EventMap.builder()
+        .put(FEED, getFeed())
+        .put(DATASOURCE, dataSource)
+        .put(CREATED_TIME, createdTime)
+        .put(START_TIME, startTime)
+        .put(END_TIME, endTime)
+        .put(VERSION, version)
+        .put(IS_COMPACTED, isCompacted)
+        .build();
+  }
+}
diff --git 
a/processing/src/test/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEventTest.java
 
b/processing/src/test/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEventTest.java
new file mode 100644
index 0000000000..83a4fcba7d
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEventTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.emitter.service;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.java.util.common.DateTimes;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SegmentMetadataEventTest
+{
+  @Test
+  public void testBasicEvent()
+  {
+    SegmentMetadataEvent event = new SegmentMetadataEvent(
+        "dummy_datasource",
+        DateTimes.of("2001-01-01T00:00:00.000Z"),
+        DateTimes.of("2001-01-02T00:00:00.000Z"),
+        DateTimes.of("2001-01-03T00:00:00.000Z"),
+        "dummy_version",
+        true
+    );
+
+    Assert.assertEquals(
+        ImmutableMap.<String, Object>builder()
+            .put(SegmentMetadataEvent.FEED, "segment_metadata")
+            .put(SegmentMetadataEvent.DATASOURCE, "dummy_datasource")
+            .put(SegmentMetadataEvent.CREATED_TIME, 
DateTimes.of("2001-01-01T00:00:00.000Z"))
+            .put(SegmentMetadataEvent.START_TIME, 
DateTimes.of("2001-01-02T00:00:00.000Z"))
+            .put(SegmentMetadataEvent.END_TIME, 
DateTimes.of("2001-01-03T00:00:00.000Z"))
+            .put(SegmentMetadataEvent.VERSION, "dummy_version")
+            .put(SegmentMetadataEvent.IS_COMPACTED, true)
+            .build(),
+        event.toMap()
+    );
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to