This is an automated email from the ASF dual-hosted git repository.

jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 18d42cae3ff Kafka emitter wasn't given the correct number of threads. 
It should be 1 thread per scheduled task. (#15719)
18d42cae3ff is described below

commit 18d42cae3ff9ebe5f255ed356864793b20c034c1
Author: Tom <[email protected]>
AuthorDate: Thu Jan 18 13:27:40 2024 -0800

    Kafka emitter wasn't given the correct number of threads. It should be 1 
thread per scheduled task. (#15719)
    
    This change intelligently provisions the correct number of threads per 
scheduled task. 1 for each event type, and 1 for logging the lost events.
    This is a change to make this work. But in the future it would be 
worthwhile to make each task not be greedy and share threads so there isn't a 
need of a thread per task.
---
 .../src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java    | 4 +++-
 .../main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java  | 2 ++
 2 files changed, 5 insertions(+), 1 deletion(-)

diff --git 
a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
 
b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
index 7485cbaab6d..87183d62fe7 100644
--- 
a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
+++ 
b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
@@ -84,7 +84,9 @@ public class KafkaEmitter implements Emitter
     this.alertQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound);
     this.requestQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound);
     this.segmentMetadataQueue = new 
MemoryBoundLinkedBlockingQueue<>(queueMemoryBound);
-    this.scheduler = Executors.newScheduledThreadPool(4);
+    // need one thread per scheduled task. Scheduled tasks are per eventType 
and 1 for reporting the lost events
+    int numOfThreads = config.getEventTypes().size() + 1;
+    this.scheduler = Executors.newScheduledThreadPool(numOfThreads);
     this.metricLost = new AtomicLong(0L);
     this.alertLost = new AtomicLong(0L);
     this.requestLost = new AtomicLong(0L);
diff --git 
a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java
 
b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java
index d6d823c0a88..c7038079aa4 100644
--- 
a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java
+++ 
b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java
@@ -30,6 +30,7 @@ import org.apache.druid.metadata.DynamicConfigProvider;
 import org.apache.druid.metadata.MapStringDynamicConfigProvider;
 import org.apache.kafka.clients.producer.ProducerConfig;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.util.HashSet;
 import java.util.Map;
@@ -124,6 +125,7 @@ public class KafkaEmitterConfig
   }
 
   @JsonProperty
+  @Nonnull
   public Set<EventType> getEventTypes()
   {
     return eventTypes;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to