This is an automated email from the ASF dual-hosted git repository.
jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 18d42cae3ff Kafka emitter wasn't given the correct number of threads.
It should be 1 thread per scheduled task. (#15719)
18d42cae3ff is described below
commit 18d42cae3ff9ebe5f255ed356864793b20c034c1
Author: Tom <[email protected]>
AuthorDate: Thu Jan 18 13:27:40 2024 -0800
Kafka emitter wasn't given the correct number of threads. It should be 1
thread per scheduled task. (#15719)
This change intelligently provisions the correct number of threads per
scheduled task. 1 for each event type, and 1 for logging the lost events.
This is a change to make this work. But in the future it would be
worthwhile to make each task not be greedy and share threads so there isn't a
need of a thread per task.
---
.../src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java | 4 +++-
.../main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java | 2 ++
2 files changed, 5 insertions(+), 1 deletion(-)
diff --git
a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
index 7485cbaab6d..87183d62fe7 100644
---
a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
+++
b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
@@ -84,7 +84,9 @@ public class KafkaEmitter implements Emitter
this.alertQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound);
this.requestQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound);
this.segmentMetadataQueue = new
MemoryBoundLinkedBlockingQueue<>(queueMemoryBound);
- this.scheduler = Executors.newScheduledThreadPool(4);
+ // need one thread per scheduled task. Scheduled tasks are per eventType
and 1 for reporting the lost events
+ int numOfThreads = config.getEventTypes().size() + 1;
+ this.scheduler = Executors.newScheduledThreadPool(numOfThreads);
this.metricLost = new AtomicLong(0L);
this.alertLost = new AtomicLong(0L);
this.requestLost = new AtomicLong(0L);
diff --git
a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java
b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java
index d6d823c0a88..c7038079aa4 100644
---
a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java
+++
b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java
@@ -30,6 +30,7 @@ import org.apache.druid.metadata.DynamicConfigProvider;
import org.apache.druid.metadata.MapStringDynamicConfigProvider;
import org.apache.kafka.clients.producer.ProducerConfig;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.HashSet;
import java.util.Map;
@@ -124,6 +125,7 @@ public class KafkaEmitterConfig
}
@JsonProperty
+ @Nonnull
public Set<EventType> getEventTypes()
{
return eventTypes;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]