This is an automated email from the ASF dual-hosted git repository.
atul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 235938e615e Configure producer shutdown timeout for kafka emitter
(#18427)
235938e615e is described below
commit 235938e615e29a4a315614613962416c44b99de7
Author: Atul Mohan <[email protected]>
AuthorDate: Tue Sep 2 09:31:47 2025 -0700
Configure producer shutdown timeout for kafka emitter (#18427)
* Configure producer shutdown timeout for kafka emitter
* Remove nullable
---
.../extensions-contrib/kafka-emitter.md | 1 +
.../apache/druid/emitter/kafka/KafkaEmitter.java | 3 +-
.../druid/emitter/kafka/KafkaEmitterConfig.java | 18 +++++++++-
.../emitter/kafka/KafkaEmitterConfigTest.java | 40 ++++++++++++++++++++--
.../druid/emitter/kafka/KafkaEmitterTest.java | 7 ++++
5 files changed, 64 insertions(+), 5 deletions(-)
diff --git a/docs/development/extensions-contrib/kafka-emitter.md
b/docs/development/extensions-contrib/kafka-emitter.md
index 1d21bfc67f2..772c0ff405c 100644
--- a/docs/development/extensions-contrib/kafka-emitter.md
+++ b/docs/development/extensions-contrib/kafka-emitter.md
@@ -48,6 +48,7 @@ All the configuration parameters for the Kafka emitter are
under `druid.emitter.
| `druid.emitter.kafka.clusterName` | Optional value to
specify the name of your Druid cluster. It can help make groups in your
monitoring environment. | no | none
|
| `druid.emitter.kafka.extra.dimensions` | Optional JSON configuration to
specify a map of extra string dimensions for the events emitted. These can help
make groups in your monitoring environment. | no | none |
| `druid.emitter.kafka.producer.hiddenProperties` | JSON configuration to
specify sensitive Kafka producer properties such as username and password.
This property accepts a
[DynamicConfigProvider](../../operations/dynamic-config-provider.md)
implementation. | no | none |
+| `druid.emitter.kafka.producer.shutdownTimeout` | Duration in milliseconds
the Kafka producer waits for pending requests to finish before shutting down. |
no | Long.MAX_VALUE |
### Example
diff --git
a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
index 4e8c5ba5a92..b647e425ebc 100644
---
a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
+++
b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
@@ -41,6 +41,7 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
+import java.time.Duration;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Executors;
@@ -293,7 +294,7 @@ public class KafkaEmitter implements Emitter
public void close()
{
scheduler.shutdownNow();
- producer.close();
+ producer.close(Duration.ofMillis(config.getShutdownTimeout()));
}
public long getMetricLostCount()
diff --git
a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java
b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java
index 8ae25329035..a690637c27d 100644
---
a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java
+++
b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import org.apache.druid.common.config.Configs;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.metadata.DynamicConfigProvider;
@@ -80,6 +81,8 @@ public class KafkaEmitterConfig
private final Map<String, String> kafkaProducerConfig;
@JsonProperty("producer.hiddenProperties")
private final DynamicConfigProvider<String> kafkaProducerSecrets;
+ @JsonProperty("producer.shutdownTimeout")
+ private final Long shutdownTimeout;
@JsonCreator
public KafkaEmitterConfig(
@@ -92,7 +95,8 @@ public class KafkaEmitterConfig
@Nullable @JsonProperty("clusterName") String clusterName,
@Nullable @JsonProperty("extra.dimensions") Map<String, String>
extraDimensions,
@JsonProperty("producer.config") @Nullable Map<String, String>
kafkaProducerConfig,
- @JsonProperty("producer.hiddenProperties") @Nullable
DynamicConfigProvider<String> kafkaProducerSecrets
+ @JsonProperty("producer.hiddenProperties") @Nullable
DynamicConfigProvider<String> kafkaProducerSecrets,
+ @JsonProperty("producer.shutdownTimeout") @Nullable Long shutdownTimeout
)
{
this.eventTypes = maybeUpdateEventTypes(eventTypes, requestTopic);
@@ -138,6 +142,7 @@ public class KafkaEmitterConfig
this.extraDimensions = extraDimensions;
this.kafkaProducerConfig = kafkaProducerConfig == null ? ImmutableMap.of()
: kafkaProducerConfig;
this.kafkaProducerSecrets = kafkaProducerSecrets == null ? new
MapStringDynamicConfigProvider(ImmutableMap.of()) : kafkaProducerSecrets;
+ this.shutdownTimeout = Configs.valueOrDefault(shutdownTimeout,
Long.MAX_VALUE);
}
@Nonnull
@@ -217,6 +222,12 @@ public class KafkaEmitterConfig
return kafkaProducerSecrets;
}
+ @JsonProperty
+ public Long getShutdownTimeout()
+ {
+ return shutdownTimeout;
+ }
+
@Override
public boolean equals(Object o)
{
@@ -259,6 +270,9 @@ public class KafkaEmitterConfig
if (!getKafkaProducerConfig().equals(that.getKafkaProducerConfig())) {
return false;
}
+ if (!getShutdownTimeout().equals(that.getShutdownTimeout())) {
+ return false;
+ }
return
getKafkaProducerSecrets().getConfig().equals(that.getKafkaProducerSecrets().getConfig());
}
@@ -274,6 +288,7 @@ public class KafkaEmitterConfig
result = 31 * result + (getClusterName() != null ?
getClusterName().hashCode() : 0);
result = 31 * result + (getExtraDimensions() != null ?
getExtraDimensions().hashCode() : 0);
result = 31 * result + getKafkaProducerConfig().hashCode();
+ result = 31 * result + getShutdownTimeout().hashCode();
result = 31 * result + getKafkaProducerSecrets().getConfig().hashCode();
return result;
}
@@ -292,6 +307,7 @@ public class KafkaEmitterConfig
", extra.dimensions='" + extraDimensions + '\'' +
", producer.config=" + kafkaProducerConfig + '\'' +
", producer.hiddenProperties=" + kafkaProducerSecrets +
+ ", producer.shutdownTimeout=" + shutdownTimeout +
'}';
}
}
diff --git
a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java
b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java
index 0d5c5e312e8..b22cbfd1f4b 100644
---
a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java
+++
b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java
@@ -58,7 +58,8 @@ public class KafkaEmitterConfigTest
ImmutableMap.of("env", "preProd"),
ImmutableMap.<String, String>builder()
.put("testKey", "testValue").build(),
- DEFAULT_PRODUCER_SECRETS
+ DEFAULT_PRODUCER_SECRETS,
+ 50L
);
String kafkaEmitterConfigString =
MAPPER.writeValueAsString(kafkaEmitterConfig);
KafkaEmitterConfig kafkaEmitterConfigExpected =
MAPPER.readerFor(KafkaEmitterConfig.class)
@@ -80,7 +81,8 @@ public class KafkaEmitterConfigTest
null,
ImmutableMap.<String, String>builder()
.put("testKey", "testValue").build(),
- DEFAULT_PRODUCER_SECRETS
+ DEFAULT_PRODUCER_SECRETS,
+ null
);
String kafkaEmitterConfigString =
MAPPER.writeValueAsString(kafkaEmitterConfig);
KafkaEmitterConfig kafkaEmitterConfigExpected =
MAPPER.readerFor(KafkaEmitterConfig.class)
@@ -104,7 +106,8 @@ public class KafkaEmitterConfigTest
null,
ImmutableMap.<String, String>builder()
.put("testKey", "testValue").build(),
- DEFAULT_PRODUCER_SECRETS
+ DEFAULT_PRODUCER_SECRETS,
+ null
);
String kafkaEmitterConfigString =
MAPPER.writeValueAsString(kafkaEmitterConfig);
KafkaEmitterConfig kafkaEmitterConfigExpected =
MAPPER.readerFor(KafkaEmitterConfig.class)
@@ -125,6 +128,7 @@ public class KafkaEmitterConfigTest
null,
ImmutableMap.of("env", "preProd"),
null,
+ null,
null
);
String kafkaEmitterConfigString =
MAPPER.writeValueAsString(kafkaEmitterConfig);
@@ -179,6 +183,7 @@ public class KafkaEmitterConfigTest
null,
null,
null,
+ null,
null
)
),
@@ -203,6 +208,7 @@ public class KafkaEmitterConfigTest
null,
null,
null,
+ null,
null
)
),
@@ -227,6 +233,7 @@ public class KafkaEmitterConfigTest
null,
null,
null,
+ null,
null
)
),
@@ -253,6 +260,7 @@ public class KafkaEmitterConfigTest
null,
null,
null,
+ null,
null
)
),
@@ -279,6 +287,7 @@ public class KafkaEmitterConfigTest
null,
null,
null,
+ null,
null
)
),
@@ -289,6 +298,31 @@ public class KafkaEmitterConfigTest
);
}
+ @Test
+ public void testDefaultShutdownTimeout() throws IOException
+ {
+ Set<KafkaEmitterConfig.EventType> eventTypeSet = new HashSet<>();
+ eventTypeSet.add(KafkaEmitterConfig.EventType.SEGMENT_METADATA);
+ KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig(
+ "hostname",
+ eventTypeSet,
+ null,
+ null,
+ null,
+ "shutdownTest",
+ "clusterNameTest",
+ null,
+ ImmutableMap.<String, String>builder()
+ .put("testKey", "testValue").build(),
+ DEFAULT_PRODUCER_SECRETS,
+ null
+ );
+ String kafkaEmitterConfigString =
MAPPER.writeValueAsString(kafkaEmitterConfig);
+ KafkaEmitterConfig kafkaEmitterConfigExpected =
MAPPER.readerFor(KafkaEmitterConfig.class)
+
.readValue(kafkaEmitterConfigString);
+ Assert.assertEquals(Long.MAX_VALUE, (long)
kafkaEmitterConfigExpected.getShutdownTimeout());
+ }
+
private DruidExceptionMatcher operatorExceptionMatcher()
{
return new DruidExceptionMatcher(
diff --git
a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java
b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java
index d97aead9b00..6c9b188b5e0 100644
---
a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java
+++
b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java
@@ -149,6 +149,7 @@ public class KafkaEmitterTest
"clusterName",
ImmutableMap.of("clusterId", "cluster-101"),
null,
+ null,
null
);
@@ -202,6 +203,7 @@ public class KafkaEmitterTest
null,
ImmutableMap.of("clusterId", "cluster-101", "env", "staging"),
null,
+ null,
null
);
@@ -251,6 +253,7 @@ public class KafkaEmitterTest
"clusterName",
null,
null,
+ null,
null
);
@@ -299,6 +302,7 @@ public class KafkaEmitterTest
"clusterName",
null,
null,
+ null,
null
);
@@ -373,6 +377,7 @@ public class KafkaEmitterTest
null,
null,
null,
+ null,
null
);
@@ -424,6 +429,7 @@ public class KafkaEmitterTest
"cluster-102",
ImmutableMap.of("clusterName", "cluster-101", "env", "staging"), //
clusterName again, extraDimensions should take precedence
null,
+ null,
null
);
@@ -517,6 +523,7 @@ public class KafkaEmitterTest
null,
extraDimensions,
ImmutableMap.of(ProducerConfig.BUFFER_MEMORY_CONFIG,
String.valueOf(totalBufferSize)),
+ null,
null
);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]