This is an automated email from the ASF dual-hosted git repository.

abhishekrb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 11a8624ef17 allow for kafka-emitter to have extra dimensions be set 
for each event it emits (#15845)
11a8624ef17 is described below

commit 11a8624ef17dc6b7c264ef3ec162485fa21dce69
Author: Tom <[email protected]>
AuthorDate: Thu Feb 8 22:55:24 2024 -0800

    allow for kafka-emitter to have extra dimensions be set for each event it 
emits (#15845)
    
    * allow for kafka-emitter to have extra dimensions be set for each event it 
emits
    
    * fix checktsyle issue in kafkaemitterconfig
    
    * make changes to fix docs, and cleanup copy paste error in #toString()
    
    * undo formatting to markdown table
    
    * add more branches so test passes
    
    * fix checkstyle issue
---
 .../development/extensions-contrib/kafka-emitter.md |  2 ++
 .../apache/druid/emitter/kafka/KafkaEmitter.java    | 21 ++++++++++++++++-----
 .../druid/emitter/kafka/KafkaEmitterConfig.java     | 21 +++++++++++++++++----
 .../druid/emitter/kafka/KafkaEmitterConfigTest.java |  5 ++++-
 .../druid/emitter/kafka/KafkaEmitterTest.java       |  2 +-
 5 files changed, 40 insertions(+), 11 deletions(-)

diff --git a/docs/development/extensions-contrib/kafka-emitter.md 
b/docs/development/extensions-contrib/kafka-emitter.md
index 40b63ca73af..a3641e89cec 100644
--- a/docs/development/extensions-contrib/kafka-emitter.md
+++ b/docs/development/extensions-contrib/kafka-emitter.md
@@ -46,6 +46,7 @@ All the configuration parameters for the Kafka emitter are 
under `druid.emitter.
 | `druid.emitter.kafka.segmentMetadata.topic`        | Kafka topic name for 
emitter's target to emit segment metadata. If `event.types` contains 
`segment_metadata`, this field cannot be empty. | no        | none              
    |
 | `druid.emitter.kafka.producer.config`              | JSON configuration to 
set additional properties to Kafka producer.                                    
                                    | no        | none                  |
 | `druid.emitter.kafka.clusterName`                  | Optional value to 
specify the name of your Druid cluster. It can help make groups in your 
monitoring environment.                         | no        | none              
    |
+| `druid.emitter.kafka.extra.dimensions` | Optional JSON configuration to 
specify a map of extra string dimensions for the events emitted. These can help 
make groups in your monitoring environment. | no | none |
 
 ### Example
 
@@ -57,5 +58,6 @@ druid.emitter.kafka.alert.topic=druid-alert
 druid.emitter.kafka.request.topic=druid-request-logs
 druid.emitter.kafka.segmentMetadata.topic=druid-segment-metadata 
 druid.emitter.kafka.producer.config={"max.block.ms":10000}
+druid.emitter.kafka.extra.dimensions={"region":"us-east-1","environment":"preProd"}
 ```
 
diff --git 
a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
 
b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
index 661543f707c..a0cb1a9afe1 100644
--- 
a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
+++ 
b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
@@ -198,11 +198,7 @@ public class KafkaEmitter implements Emitter
     if (event != null) {
       try {
         EventMap map = event.toMap();
-        if (config.getClusterName() != null) {
-          map = map.asBuilder()
-                   .put("clusterName", config.getClusterName())
-                   .build();
-        }
+        map = addExtraDimensionsToEvent(map);
 
         String resultJson = jsonMapper.writeValueAsString(map);
 
@@ -239,6 +235,21 @@ public class KafkaEmitter implements Emitter
     }
   }
 
+  private EventMap addExtraDimensionsToEvent(EventMap map)
+  {
+    if (config.getClusterName() != null || config.getExtraDimensions() != 
null) {
+      EventMap.Builder eventMapBuilder = map.asBuilder();
+      if (config.getClusterName() != null) {
+        eventMapBuilder.put("clusterName", config.getClusterName());
+      }
+      if (config.getExtraDimensions() != null) {
+        eventMapBuilder.putAll(config.getExtraDimensions());
+      }
+      map = eventMapBuilder.build();
+    }
+    return map;
+  }
+
   @Override
   public void flush()
   {
diff --git 
a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java
 
b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java
index c7038079aa4..86f636e853d 100644
--- 
a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java
+++ 
b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java
@@ -62,7 +62,7 @@ public class KafkaEmitterConfig
   public static final Set<EventType> DEFAULT_EVENT_TYPES = 
ImmutableSet.of(EventType.ALERTS, EventType.METRICS);
   @JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)
   private final String bootstrapServers;
-  @Nullable @JsonProperty("event.types")
+  @Nonnull @JsonProperty("event.types")
   private final Set<EventType> eventTypes;
   @Nullable @JsonProperty("metric.topic")
   private final String metricTopic;
@@ -72,8 +72,10 @@ public class KafkaEmitterConfig
   private final String requestTopic;
   @Nullable @JsonProperty("segmentMetadata.topic")
   private final String segmentMetadataTopic;
-  @JsonProperty
+  @Nullable @JsonProperty
   private final String clusterName;
+  @Nullable @JsonProperty("extra.dimensions")
+  private final Map<String, String> extraDimensions;
   @JsonProperty("producer.config")
   private final Map<String, String> kafkaProducerConfig;
   @JsonProperty("producer.hiddenProperties")
@@ -87,7 +89,8 @@ public class KafkaEmitterConfig
       @Nullable @JsonProperty("alert.topic") String alertTopic,
       @Nullable @JsonProperty("request.topic") String requestTopic,
       @Nullable @JsonProperty("segmentMetadata.topic") String 
segmentMetadataTopic,
-      @JsonProperty("clusterName") String clusterName,
+      @Nullable @JsonProperty("clusterName") String clusterName,
+      @Nullable @JsonProperty("extra.dimensions") Map<String, String> 
extraDimensions,
       @JsonProperty("producer.config") @Nullable Map<String, String> 
kafkaProducerConfig,
       @JsonProperty("producer.hiddenProperties") @Nullable 
DynamicConfigProvider<String> kafkaProducerSecrets
   )
@@ -99,10 +102,12 @@ public class KafkaEmitterConfig
     this.requestTopic = this.eventTypes.contains(EventType.REQUESTS) ? 
Preconditions.checkNotNull(requestTopic, "druid.emitter.kafka.request.topic can 
not be null") : null;
     this.segmentMetadataTopic = 
this.eventTypes.contains(EventType.SEGMENT_METADATA) ? 
Preconditions.checkNotNull(segmentMetadataTopic, 
"druid.emitter.kafka.segmentMetadata.topic can not be null") : null;
     this.clusterName = clusterName;
+    this.extraDimensions = extraDimensions;
     this.kafkaProducerConfig = kafkaProducerConfig == null ? ImmutableMap.of() 
: kafkaProducerConfig;
     this.kafkaProducerSecrets = kafkaProducerSecrets == null ? new 
MapStringDynamicConfigProvider(ImmutableMap.of()) : kafkaProducerSecrets;
   }
 
+  @Nonnull
   private Set<EventType> maybeUpdateEventTypes(Set<EventType> eventTypes, 
String requestTopic)
   {
     // Unless explicitly overridden, kafka emitter will always emit metrics 
and alerts
@@ -143,12 +148,18 @@ public class KafkaEmitterConfig
     return alertTopic;
   }
 
-  @JsonProperty
+  @Nullable @JsonProperty
   public String getClusterName()
   {
     return clusterName;
   }
 
+  @Nullable
+  public Map<String, String> getExtraDimensions()
+  {
+    return extraDimensions;
+  }
+
   @Nullable
   public String getRequestTopic()
   {
@@ -228,6 +239,7 @@ public class KafkaEmitterConfig
     result = 31 * result + (getRequestTopic() != null ? 
getRequestTopic().hashCode() : 0);
     result = 31 * result + (getSegmentMetadataTopic() != null ? 
getSegmentMetadataTopic().hashCode() : 0);
     result = 31 * result + (getClusterName() != null ? 
getClusterName().hashCode() : 0);
+    result = 31 * result + (getExtraDimensions() != null ? 
getExtraDimensions().hashCode() : 0);
     result = 31 * result + getKafkaProducerConfig().hashCode();
     result = 31 * result + getKafkaProducerSecrets().getConfig().hashCode();
     return result;
@@ -244,6 +256,7 @@ public class KafkaEmitterConfig
            ", request.topic='" + requestTopic + '\'' +
            ", segmentMetadata.topic='" + segmentMetadataTopic + '\'' +
            ", clusterName='" + clusterName + '\'' +
+           ", extra.dimensions='" + extraDimensions + '\'' +
            ", producer.config=" + kafkaProducerConfig + '\'' +
            ", producer.hiddenProperties=" + kafkaProducerSecrets +
            '}';
diff --git 
a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java
 
b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java
index 603c8e6701b..fb10868d362 100644
--- 
a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java
+++ 
b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java
@@ -58,6 +58,7 @@ public class KafkaEmitterConfigTest
         "requestTest",
         "metadataTest",
         "clusterNameTest",
+        ImmutableMap.of("env", "preProd"),
         ImmutableMap.<String, String>builder()
                     .put("testKey", "testValue").build(),
         DEFAULT_PRODUCER_SECRETS
@@ -79,6 +80,7 @@ public class KafkaEmitterConfigTest
         null,
         "metadataTest",
         "clusterNameTest",
+        null,
         ImmutableMap.<String, String>builder()
                     .put("testKey", "testValue").build(),
         DEFAULT_PRODUCER_SECRETS
@@ -102,6 +104,7 @@ public class KafkaEmitterConfigTest
         null,
         "metadataTest",
         "clusterNameTest",
+        null,
         ImmutableMap.<String, String>builder()
                     .put("testKey", "testValue").build(),
         DEFAULT_PRODUCER_SECRETS
@@ -117,7 +120,7 @@ public class KafkaEmitterConfigTest
   {
     KafkaEmitterConfig kafkaEmitterConfig = new 
KafkaEmitterConfig("localhost:9092", null, "metricTest",
                                                                    
"alertTest", null, "metadataTest",
-                                                                   
"clusterNameTest", null, null
+                                                                   
"clusterNameTest", null, null, null
     );
     try {
       @SuppressWarnings("unused")
diff --git 
a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java
 
b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java
index 1c30bee12fa..228982b6cd7 100644
--- 
a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java
+++ 
b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java
@@ -106,7 +106,7 @@ public class KafkaEmitterTest
     ObjectMapper mapper = new ObjectMapper();
     mapper.registerModule(new JodaModule());
     final KafkaEmitter kafkaEmitter = new KafkaEmitter(
-        new KafkaEmitterConfig("", eventsType, "metrics", "alerts", 
requestTopic, "metadata", "test-cluster", null, null),
+        new KafkaEmitterConfig("", eventsType, "metrics", "alerts", 
requestTopic, "metadata", "test-cluster", ImmutableMap.of("clusterId", 
"cluster-101"), null, null),
         mapper
     )
     {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to