This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new d6507c9  PrioritizedExecutorService: Properly wrap on direct calls to 
"execute". (#11956)
d6507c9 is described below

commit d6507c9428b5db6cba929d72b40197279aa3ea65
Author: Gian Merlino <[email protected]>
AuthorDate: Mon Nov 22 10:30:12 2021 -0800

    PrioritizedExecutorService: Properly wrap on direct calls to "execute". 
(#11956)
    
    Usually, "execute" is called by methods defined in the superclass
    AbstractExecutorService, and the passed-in Runnable has been wrapped
    by newTaskFor inside a PrioritizedListenableFutureTask. But this method
    can also be called directly, and if so, the same wrapping is necessary
    for the delegate to get a Runnable that can be entered into a priority
    queue with the others.
---
 .../druid/query/PrioritizedExecutorService.java    |  6 +++-
 .../query/PrioritizedExecutorServiceTest.java      | 36 ++++++++++++++++++++++
 2 files changed, 41 insertions(+), 1 deletion(-)

diff --git 
a/processing/src/main/java/org/apache/druid/query/PrioritizedExecutorService.java
 
b/processing/src/main/java/org/apache/druid/query/PrioritizedExecutorService.java
index 1861bd6..6781df6 100644
--- 
a/processing/src/main/java/org/apache/druid/query/PrioritizedExecutorService.java
+++ 
b/processing/src/main/java/org/apache/druid/query/PrioritizedExecutorService.java
@@ -192,7 +192,11 @@ public class PrioritizedExecutorService extends 
AbstractExecutorService implemen
   @Override
   public void execute(final Runnable runnable)
   {
-    delegate.execute(runnable);
+    if (runnable instanceof PrioritizedListenableFutureTask) {
+      delegate.execute(runnable);
+    } else {
+      delegate.execute(newTaskFor(runnable, null));
+    }
   }
 
   public int getQueueSize()
diff --git 
a/processing/src/test/java/org/apache/druid/query/PrioritizedExecutorServiceTest.java
 
b/processing/src/test/java/org/apache/druid/query/PrioritizedExecutorServiceTest.java
index c4afe6c..06954b8 100644
--- 
a/processing/src/test/java/org/apache/druid/query/PrioritizedExecutorServiceTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/PrioritizedExecutorServiceTest.java
@@ -41,6 +41,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
+ *
  */
 @RunWith(Parameterized.class)
 public class PrioritizedExecutorServiceTest
@@ -182,6 +183,41 @@ public class PrioritizedExecutorServiceTest
     Assert.assertEquals(expected, ImmutableList.copyOf(order));
   }
 
+  @Test
+  public void testExecuteRegularRunnable()
+  {
+    final CountDownLatch latch = new CountDownLatch(1);
+
+    Assert.assertThrows(
+        "Class does not implemented PrioritizedRunnable",
+        IllegalArgumentException.class,
+        () -> exec.execute(latch::countDown)
+    );
+  }
+
+  @Test
+  public void testExecutePrioritizedRunnable() throws InterruptedException
+  {
+    final CountDownLatch latch = new CountDownLatch(1);
+    exec.execute(
+        new PrioritizedRunnable()
+        {
+          @Override
+          public int getPriority()
+          {
+            return 1;
+          }
+
+          @Override
+          public void run()
+          {
+            latch.countDown();
+          }
+        }
+    );
+    latch.await();
+  }
+
   // Make sure entries are processed FIFO
   @Test
   public void testOrderedExecutionEqualPriorityRunnable() throws 
ExecutionException, InterruptedException

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to