This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 32ce341a6cd Fix RejectExecutionHandler of Blocking Single Threaded 
executor (#17146)
32ce341a6cd is described below

commit 32ce341a6cd8731afda93b852c30f79b9c29e89a
Author: Hardik Bajaj <[email protected]>
AuthorDate: Tue Oct 15 22:02:34 2024 +0530

    Fix RejectExecutionHandler of Blocking Single Threaded executor (#17146)
    
    Throw RejectedExecutionException when submitting tasks to executor that has 
been shut down.
---
 .../druid/java/util/common/concurrent/Execs.java   |  3 ++
 .../org/apache/druid/concurrent/ExecsTest.java     | 35 ++++++++++++++++++++++
 2 files changed, 38 insertions(+)

diff --git 
a/processing/src/main/java/org/apache/druid/java/util/common/concurrent/Execs.java
 
b/processing/src/main/java/org/apache/druid/java/util/common/concurrent/Execs.java
index c0ccb967dbd..6b49ab0a0ee 100644
--- 
a/processing/src/main/java/org/apache/druid/java/util/common/concurrent/Execs.java
+++ 
b/processing/src/main/java/org/apache/druid/java/util/common/concurrent/Execs.java
@@ -159,6 +159,9 @@ public class Execs
           @Override
           public void rejectedExecution(Runnable r, ThreadPoolExecutor 
executor)
           {
+            if (executor.isShutdown()) {
+              throw new RejectedExecutionException("Executor is shutdown, 
rejecting task");
+            }
             try {
               executor.getQueue().put(r);
             }
diff --git 
a/processing/src/test/java/org/apache/druid/concurrent/ExecsTest.java 
b/processing/src/test/java/org/apache/druid/concurrent/ExecsTest.java
index de76af7cb7b..6c7925e185d 100644
--- a/processing/src/test/java/org/apache/druid/concurrent/ExecsTest.java
+++ b/processing/src/test/java/org/apache/druid/concurrent/ExecsTest.java
@@ -19,15 +19,20 @@
 
 package org.apache.druid.concurrent;
 
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class ExecsTest
@@ -118,6 +123,36 @@ public class ExecsTest
     producer.shutdown();
   }
 
+  @Test
+  public void testTaskAddedToShutdownExecutorThrowsException() throws Exception
+  {
+    // The implementation of Execs.newBlockingSingleThreaded() 
rejectedExecutionHandler should not add tasks when it's in shutDown state
+    // When a SynchronousQueue is used in executor and a task is put in it in 
ShutDown state, it will forever stuck in WAITING state
+    // as executor will not take() the task to schedule it.
+    final ListeningExecutorService intermediateTempExecutor = 
MoreExecutors.listeningDecorator(
+        Execs.newBlockingSingleThreaded("[TASK_ID]-appenderator-abandon", 0)
+    );
+    Callable<Void> task = () -> {
+      try {
+        Thread.sleep(500); // Simulate long-running task
+      }
+      catch (InterruptedException e) {
+        Thread.currentThread().interrupt(); // Restore interrupted status
+      }
+      return null;
+    };
+
+    // Submit multiple tasks together
+    Assert.assertNotNull(intermediateTempExecutor.submit(task));
+    Assert.assertNotNull(intermediateTempExecutor.submit(task));
+
+    intermediateTempExecutor.shutdownNow();
+    // Submit task after shutDown / shutDownNow should not be added in queue
+    Assert.assertThrows(RejectedExecutionException.class, () -> 
intermediateTempExecutor.submit(task));
+    Assert.assertTrue(intermediateTempExecutor.awaitTermination(10, 
TimeUnit.SECONDS));
+    Assert.assertTrue(intermediateTempExecutor.isShutdown());
+  }
+
   @Test
   public void testDirectExecutorFactory()
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to