This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 32ce341a6cd Fix RejectExecutionHandler of Blocking Single Threaded
executor (#17146)
32ce341a6cd is described below
commit 32ce341a6cd8731afda93b852c30f79b9c29e89a
Author: Hardik Bajaj <[email protected]>
AuthorDate: Tue Oct 15 22:02:34 2024 +0530
Fix RejectExecutionHandler of Blocking Single Threaded executor (#17146)
Throw RejectedExecutionException when submitting tasks to executor that has
been shut down.
---
.../druid/java/util/common/concurrent/Execs.java | 3 ++
.../org/apache/druid/concurrent/ExecsTest.java | 35 ++++++++++++++++++++++
2 files changed, 38 insertions(+)
diff --git
a/processing/src/main/java/org/apache/druid/java/util/common/concurrent/Execs.java
b/processing/src/main/java/org/apache/druid/java/util/common/concurrent/Execs.java
index c0ccb967dbd..6b49ab0a0ee 100644
---
a/processing/src/main/java/org/apache/druid/java/util/common/concurrent/Execs.java
+++
b/processing/src/main/java/org/apache/druid/java/util/common/concurrent/Execs.java
@@ -159,6 +159,9 @@ public class Execs
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor
executor)
{
+ if (executor.isShutdown()) {
+ throw new RejectedExecutionException("Executor is shutdown,
rejecting task");
+ }
try {
executor.getQueue().put(r);
}
diff --git
a/processing/src/test/java/org/apache/druid/concurrent/ExecsTest.java
b/processing/src/test/java/org/apache/druid/concurrent/ExecsTest.java
index de76af7cb7b..6c7925e185d 100644
--- a/processing/src/test/java/org/apache/druid/concurrent/ExecsTest.java
+++ b/processing/src/test/java/org/apache/druid/concurrent/ExecsTest.java
@@ -19,15 +19,20 @@
package org.apache.druid.concurrent;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
import org.junit.Assert;
import org.junit.Test;
+import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class ExecsTest
@@ -118,6 +123,36 @@ public class ExecsTest
producer.shutdown();
}
+ @Test
+ public void testTaskAddedToShutdownExecutorThrowsException() throws Exception
+ {
+ // The implementation of Execs.newBlockingSingleThreaded()
rejectedExecutionHandler should not add tasks when it's in shutDown state
+ // When a SynchronousQueue is used in executor and a task is put in it in
ShutDown state, it will forever stuck in WAITING state
+ // as executor will not take() the task to schedule it.
+ final ListeningExecutorService intermediateTempExecutor =
MoreExecutors.listeningDecorator(
+ Execs.newBlockingSingleThreaded("[TASK_ID]-appenderator-abandon", 0)
+ );
+ Callable<Void> task = () -> {
+ try {
+ Thread.sleep(500); // Simulate long-running task
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt(); // Restore interrupted status
+ }
+ return null;
+ };
+
+ // Submit multiple tasks together
+ Assert.assertNotNull(intermediateTempExecutor.submit(task));
+ Assert.assertNotNull(intermediateTempExecutor.submit(task));
+
+ intermediateTempExecutor.shutdownNow();
+ // Submit task after shutDown / shutDownNow should not be added in queue
+ Assert.assertThrows(RejectedExecutionException.class, () ->
intermediateTempExecutor.submit(task));
+ Assert.assertTrue(intermediateTempExecutor.awaitTermination(10,
TimeUnit.SECONDS));
+ Assert.assertTrue(intermediateTempExecutor.isShutdown());
+ }
+
@Test
public void testDirectExecutorFactory()
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]