This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 993bc5e Fix Kafka Indexing Service notice handle thread may never
terminate (#6337)
993bc5e is described below
commit 993bc5e9d3e46f07139b00270875824e8980f963
Author: QiuMM <[email protected]>
AuthorDate: Thu Sep 27 11:09:53 2018 +0800
Fix Kafka Indexing Service notice handle thread may never terminate (#6337)
* Fix Kafka Indexing Service notice handle thread may never terminate
* address comment
* handle null value
---
.../apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java | 8 ++++++--
1 file changed, 6 insertions(+), 2 deletions(-)
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index 67900f6..977e605 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -367,8 +367,12 @@ public class KafkaSupervisor implements Supervisor
exec.submit(
() -> {
try {
- while (!Thread.currentThread().isInterrupted()) {
- final Notice notice = notices.take();
+ long pollTimeout = Math.max(ioConfig.getPeriod().getMillis(),
MAX_RUN_FREQUENCY_MILLIS);
+ while (!Thread.currentThread().isInterrupted() && !stopped) {
+ final Notice notice = notices.poll(pollTimeout,
TimeUnit.MILLISECONDS);
+ if (notice == null) {
+ continue;
+ }
try {
notice.handle();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]