This is an automated email from the ASF dual-hosted git repository.

suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new c1d6328249d StreamingTaskRunner: Close the rejection period updater 
executor service (#17490)
c1d6328249d is described below

commit c1d6328249dd8e5067bc1cabd30f00515cbabbdb
Author: Adithya Chakilam <[email protected]>
AuthorDate: Tue Nov 19 14:49:20 2024 -0600

    StreamingTaskRunner: Close the rejection period updater executor service 
(#17490)
---
 .../indexing/seekablestream/SeekableStreamIndexTaskRunner.java   | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index 42dcef39bc8..a2db12d005e 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -128,6 +128,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -249,6 +250,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
 
   private volatile DateTime minMessageTime;
   private volatile DateTime maxMessageTime;
+  private final ScheduledExecutorService rejectionPeriodUpdaterExec;
 
   public SeekableStreamIndexTaskRunner(
       final SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType, 
RecordType> task,
@@ -273,15 +275,15 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
 
     minMessageTime = ioConfig.getMinimumMessageTime().or(DateTimes.MIN);
     maxMessageTime = ioConfig.getMaximumMessageTime().or(DateTimes.MAX);
+    rejectionPeriodUpdaterExec = 
Execs.scheduledSingleThreaded("RejectionPeriodUpdater-Exec--%d");
 
     if (ioConfig.getRefreshRejectionPeriodsInMinutes() != null) {
-      Execs.scheduledSingleThreaded("RejectionPeriodUpdater-Exec--%d")
+      rejectionPeriodUpdaterExec
            .scheduleWithFixedDelay(
                this::refreshMinMaxMessageTime,
                ioConfig.getRefreshRejectionPeriodsInMinutes(),
                ioConfig.getRefreshRejectionPeriodsInMinutes(),
-               TimeUnit.MINUTES
-        );
+               TimeUnit.MINUTES);
     }
     resetNextCheckpointTime();
   }
@@ -940,6 +942,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
           toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode);
           toolbox.getDataSegmentServerAnnouncer().unannounce();
         }
+        rejectionPeriodUpdaterExec.shutdown();
       }
       catch (Throwable e) {
         if (caughtExceptionOuter != null) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to