This is an automated email from the ASF dual-hosted git repository.

atul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 235938e615e Configure producer shutdown timeout for kafka emitter 
(#18427)
235938e615e is described below

commit 235938e615e29a4a315614613962416c44b99de7
Author: Atul Mohan <[email protected]>
AuthorDate: Tue Sep 2 09:31:47 2025 -0700

    Configure producer shutdown timeout for kafka emitter (#18427)
    
    * Configure producer shutdown timeout for kafka emitter
    
    * Remove nullable
---
 .../extensions-contrib/kafka-emitter.md            |  1 +
 .../apache/druid/emitter/kafka/KafkaEmitter.java   |  3 +-
 .../druid/emitter/kafka/KafkaEmitterConfig.java    | 18 +++++++++-
 .../emitter/kafka/KafkaEmitterConfigTest.java      | 40 ++++++++++++++++++++--
 .../druid/emitter/kafka/KafkaEmitterTest.java      |  7 ++++
 5 files changed, 64 insertions(+), 5 deletions(-)

diff --git a/docs/development/extensions-contrib/kafka-emitter.md 
b/docs/development/extensions-contrib/kafka-emitter.md
index 1d21bfc67f2..772c0ff405c 100644
--- a/docs/development/extensions-contrib/kafka-emitter.md
+++ b/docs/development/extensions-contrib/kafka-emitter.md
@@ -48,6 +48,7 @@ All the configuration parameters for the Kafka emitter are 
under `druid.emitter.
 | `druid.emitter.kafka.clusterName`                  | Optional value to 
specify the name of your Druid cluster. It can help make groups in your 
monitoring environment.                         | no        | none              
    |
 | `druid.emitter.kafka.extra.dimensions` | Optional JSON configuration to 
specify a map of extra string dimensions for the events emitted. These can help 
make groups in your monitoring environment. | no | none |
 | `druid.emitter.kafka.producer.hiddenProperties`    | JSON configuration to 
specify sensitive Kafka producer properties such as username and password.  
This property accepts a 
[DynamicConfigProvider](../../operations/dynamic-config-provider.md) 
implementation. | no | none |
+| `druid.emitter.kafka.producer.shutdownTimeout`    | Duration in milliseconds 
the Kafka producer waits for pending requests to finish before shutting down. | 
no | Long.MAX_VALUE |
 
 ### Example
 
diff --git 
a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
 
b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
index 4e8c5ba5a92..b647e425ebc 100644
--- 
a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
+++ 
b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
@@ -41,6 +41,7 @@ import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.StringSerializer;
 
+import java.time.Duration;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.Executors;
@@ -293,7 +294,7 @@ public class KafkaEmitter implements Emitter
   public void close()
   {
     scheduler.shutdownNow();
-    producer.close();
+    producer.close(Duration.ofMillis(config.getShutdownTimeout()));
   }
 
   public long getMetricLostCount()
diff --git 
a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java
 
b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java
index 8ae25329035..a690637c27d 100644
--- 
a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java
+++ 
b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonValue;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import org.apache.druid.common.config.Configs;
 import org.apache.druid.error.DruidException;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.metadata.DynamicConfigProvider;
@@ -80,6 +81,8 @@ public class KafkaEmitterConfig
   private final Map<String, String> kafkaProducerConfig;
   @JsonProperty("producer.hiddenProperties")
   private final DynamicConfigProvider<String> kafkaProducerSecrets;
+  @JsonProperty("producer.shutdownTimeout")
+  private final Long shutdownTimeout;
 
   @JsonCreator
   public KafkaEmitterConfig(
@@ -92,7 +95,8 @@ public class KafkaEmitterConfig
       @Nullable @JsonProperty("clusterName") String clusterName,
       @Nullable @JsonProperty("extra.dimensions") Map<String, String> 
extraDimensions,
       @JsonProperty("producer.config") @Nullable Map<String, String> 
kafkaProducerConfig,
-      @JsonProperty("producer.hiddenProperties") @Nullable 
DynamicConfigProvider<String> kafkaProducerSecrets
+      @JsonProperty("producer.hiddenProperties") @Nullable 
DynamicConfigProvider<String> kafkaProducerSecrets,
+      @JsonProperty("producer.shutdownTimeout") @Nullable Long shutdownTimeout
   )
   {
     this.eventTypes = maybeUpdateEventTypes(eventTypes, requestTopic);
@@ -138,6 +142,7 @@ public class KafkaEmitterConfig
     this.extraDimensions = extraDimensions;
     this.kafkaProducerConfig = kafkaProducerConfig == null ? ImmutableMap.of() 
: kafkaProducerConfig;
     this.kafkaProducerSecrets = kafkaProducerSecrets == null ? new 
MapStringDynamicConfigProvider(ImmutableMap.of()) : kafkaProducerSecrets;
+    this.shutdownTimeout = Configs.valueOrDefault(shutdownTimeout, 
Long.MAX_VALUE);
   }
 
   @Nonnull
@@ -217,6 +222,12 @@ public class KafkaEmitterConfig
     return kafkaProducerSecrets;
   }
 
+  @JsonProperty
+  public Long getShutdownTimeout()
+  {
+    return shutdownTimeout;
+  }
+
   @Override
   public boolean equals(Object o)
   {
@@ -259,6 +270,9 @@ public class KafkaEmitterConfig
     if (!getKafkaProducerConfig().equals(that.getKafkaProducerConfig())) {
       return false;
     }
+    if (!getShutdownTimeout().equals(that.getShutdownTimeout())) {
+      return false;
+    }
     return 
getKafkaProducerSecrets().getConfig().equals(that.getKafkaProducerSecrets().getConfig());
   }
 
@@ -274,6 +288,7 @@ public class KafkaEmitterConfig
     result = 31 * result + (getClusterName() != null ? 
getClusterName().hashCode() : 0);
     result = 31 * result + (getExtraDimensions() != null ? 
getExtraDimensions().hashCode() : 0);
     result = 31 * result + getKafkaProducerConfig().hashCode();
+    result = 31 * result + getShutdownTimeout().hashCode();
     result = 31 * result + getKafkaProducerSecrets().getConfig().hashCode();
     return result;
   }
@@ -292,6 +307,7 @@ public class KafkaEmitterConfig
            ", extra.dimensions='" + extraDimensions + '\'' +
            ", producer.config=" + kafkaProducerConfig + '\'' +
            ", producer.hiddenProperties=" + kafkaProducerSecrets +
+           ", producer.shutdownTimeout=" + shutdownTimeout +
            '}';
   }
 }
diff --git 
a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java
 
b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java
index 0d5c5e312e8..b22cbfd1f4b 100644
--- 
a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java
+++ 
b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java
@@ -58,7 +58,8 @@ public class KafkaEmitterConfigTest
         ImmutableMap.of("env", "preProd"),
         ImmutableMap.<String, String>builder()
                     .put("testKey", "testValue").build(),
-        DEFAULT_PRODUCER_SECRETS
+        DEFAULT_PRODUCER_SECRETS,
+        50L
     );
     String kafkaEmitterConfigString = 
MAPPER.writeValueAsString(kafkaEmitterConfig);
     KafkaEmitterConfig kafkaEmitterConfigExpected = 
MAPPER.readerFor(KafkaEmitterConfig.class)
@@ -80,7 +81,8 @@ public class KafkaEmitterConfigTest
         null,
         ImmutableMap.<String, String>builder()
                     .put("testKey", "testValue").build(),
-        DEFAULT_PRODUCER_SECRETS
+        DEFAULT_PRODUCER_SECRETS,
+        null
     );
     String kafkaEmitterConfigString = 
MAPPER.writeValueAsString(kafkaEmitterConfig);
     KafkaEmitterConfig kafkaEmitterConfigExpected = 
MAPPER.readerFor(KafkaEmitterConfig.class)
@@ -104,7 +106,8 @@ public class KafkaEmitterConfigTest
         null,
         ImmutableMap.<String, String>builder()
                     .put("testKey", "testValue").build(),
-        DEFAULT_PRODUCER_SECRETS
+        DEFAULT_PRODUCER_SECRETS,
+        null
     );
     String kafkaEmitterConfigString = 
MAPPER.writeValueAsString(kafkaEmitterConfig);
     KafkaEmitterConfig kafkaEmitterConfigExpected = 
MAPPER.readerFor(KafkaEmitterConfig.class)
@@ -125,6 +128,7 @@ public class KafkaEmitterConfigTest
         null,
         ImmutableMap.of("env", "preProd"),
         null,
+        null,
         null
     );
     String kafkaEmitterConfigString = 
MAPPER.writeValueAsString(kafkaEmitterConfig);
@@ -179,6 +183,7 @@ public class KafkaEmitterConfigTest
                 null,
                 null,
                 null,
+                null,
                 null
             )
         ),
@@ -203,6 +208,7 @@ public class KafkaEmitterConfigTest
                 null,
                 null,
                 null,
+                null,
                 null
             )
         ),
@@ -227,6 +233,7 @@ public class KafkaEmitterConfigTest
                 null,
                 null,
                 null,
+                null,
                 null
             )
         ),
@@ -253,6 +260,7 @@ public class KafkaEmitterConfigTest
                 null,
                 null,
                 null,
+                null,
                 null
             )
         ),
@@ -279,6 +287,7 @@ public class KafkaEmitterConfigTest
                 null,
                 null,
                 null,
+                null,
                 null
             )
         ),
@@ -289,6 +298,31 @@ public class KafkaEmitterConfigTest
     );
   }
 
+  @Test
+  public void testDefaultShutdownTimeout() throws IOException
+  {
+    Set<KafkaEmitterConfig.EventType> eventTypeSet = new HashSet<>();
+    eventTypeSet.add(KafkaEmitterConfig.EventType.SEGMENT_METADATA);
+    KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig(
+        "hostname",
+        eventTypeSet,
+        null,
+        null,
+        null,
+        "shutdownTest",
+        "clusterNameTest",
+        null,
+        ImmutableMap.<String, String>builder()
+                    .put("testKey", "testValue").build(),
+        DEFAULT_PRODUCER_SECRETS,
+        null
+    );
+    String kafkaEmitterConfigString = 
MAPPER.writeValueAsString(kafkaEmitterConfig);
+    KafkaEmitterConfig kafkaEmitterConfigExpected = 
MAPPER.readerFor(KafkaEmitterConfig.class)
+                                                          
.readValue(kafkaEmitterConfigString);
+    Assert.assertEquals(Long.MAX_VALUE, (long) 
kafkaEmitterConfigExpected.getShutdownTimeout());
+  }
+
   private DruidExceptionMatcher operatorExceptionMatcher()
   {
     return new DruidExceptionMatcher(
diff --git 
a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java
 
b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java
index d97aead9b00..6c9b188b5e0 100644
--- 
a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java
+++ 
b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java
@@ -149,6 +149,7 @@ public class KafkaEmitterTest
         "clusterName",
         ImmutableMap.of("clusterId", "cluster-101"),
         null,
+        null,
         null
     );
 
@@ -202,6 +203,7 @@ public class KafkaEmitterTest
         null,
         ImmutableMap.of("clusterId", "cluster-101", "env", "staging"),
         null,
+        null,
         null
     );
 
@@ -251,6 +253,7 @@ public class KafkaEmitterTest
         "clusterName",
         null,
         null,
+        null,
         null
     );
 
@@ -299,6 +302,7 @@ public class KafkaEmitterTest
         "clusterName",
         null,
         null,
+        null,
         null
     );
 
@@ -373,6 +377,7 @@ public class KafkaEmitterTest
         null,
         null,
         null,
+        null,
         null
     );
 
@@ -424,6 +429,7 @@ public class KafkaEmitterTest
         "cluster-102",
         ImmutableMap.of("clusterName", "cluster-101", "env", "staging"), // 
clusterName again, extraDimensions should take precedence
         null,
+        null,
         null
     );
 
@@ -517,6 +523,7 @@ public class KafkaEmitterTest
         null,
         extraDimensions,
         ImmutableMap.of(ProducerConfig.BUFFER_MEMORY_CONFIG, 
String.valueOf(totalBufferSize)),
+        null,
         null
     );
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to