This is an automated email from the ASF dual-hosted git repository.
suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new c1d6328249d StreamingTaskRunner: Close the rejection period updater
executor service (#17490)
c1d6328249d is described below
commit c1d6328249dd8e5067bc1cabd30f00515cbabbdb
Author: Adithya Chakilam <[email protected]>
AuthorDate: Tue Nov 19 14:49:20 2024 -0600
StreamingTaskRunner: Close the rejection period updater executor service
(#17490)
---
.../indexing/seekablestream/SeekableStreamIndexTaskRunner.java | 9 ++++++---
1 file changed, 6 insertions(+), 3 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index 42dcef39bc8..a2db12d005e 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -128,6 +128,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -249,6 +250,7 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
private volatile DateTime minMessageTime;
private volatile DateTime maxMessageTime;
+ private final ScheduledExecutorService rejectionPeriodUpdaterExec;
public SeekableStreamIndexTaskRunner(
final SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType,
RecordType> task,
@@ -273,15 +275,15 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
minMessageTime = ioConfig.getMinimumMessageTime().or(DateTimes.MIN);
maxMessageTime = ioConfig.getMaximumMessageTime().or(DateTimes.MAX);
+ rejectionPeriodUpdaterExec =
Execs.scheduledSingleThreaded("RejectionPeriodUpdater-Exec--%d");
if (ioConfig.getRefreshRejectionPeriodsInMinutes() != null) {
- Execs.scheduledSingleThreaded("RejectionPeriodUpdater-Exec--%d")
+ rejectionPeriodUpdaterExec
.scheduleWithFixedDelay(
this::refreshMinMaxMessageTime,
ioConfig.getRefreshRejectionPeriodsInMinutes(),
ioConfig.getRefreshRejectionPeriodsInMinutes(),
- TimeUnit.MINUTES
- );
+ TimeUnit.MINUTES);
}
resetNextCheckpointTime();
}
@@ -940,6 +942,7 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode);
toolbox.getDataSegmentServerAnnouncer().unannounce();
}
+ rejectionPeriodUpdaterExec.shutdown();
}
catch (Throwable e) {
if (caughtExceptionOuter != null) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]