This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new d6507c9 PrioritizedExecutorService: Properly wrap on direct calls to
"execute". (#11956)
d6507c9 is described below
commit d6507c9428b5db6cba929d72b40197279aa3ea65
Author: Gian Merlino <[email protected]>
AuthorDate: Mon Nov 22 10:30:12 2021 -0800
PrioritizedExecutorService: Properly wrap on direct calls to "execute".
(#11956)
Usually, "execute" is called by methods defined in the superclass
AbstractExecutorService, and the passed-in Runnable has been wrapped
by newTaskFor inside a PrioritizedListenableFutureTask. But this method
can also be called directly, and if so, the same wrapping is necessary
for the delegate to get a Runnable that can be entered into a priority
queue with the others.
---
.../druid/query/PrioritizedExecutorService.java | 6 +++-
.../query/PrioritizedExecutorServiceTest.java | 36 ++++++++++++++++++++++
2 files changed, 41 insertions(+), 1 deletion(-)
diff --git
a/processing/src/main/java/org/apache/druid/query/PrioritizedExecutorService.java
b/processing/src/main/java/org/apache/druid/query/PrioritizedExecutorService.java
index 1861bd6..6781df6 100644
---
a/processing/src/main/java/org/apache/druid/query/PrioritizedExecutorService.java
+++
b/processing/src/main/java/org/apache/druid/query/PrioritizedExecutorService.java
@@ -192,7 +192,11 @@ public class PrioritizedExecutorService extends
AbstractExecutorService implemen
@Override
public void execute(final Runnable runnable)
{
- delegate.execute(runnable);
+ if (runnable instanceof PrioritizedListenableFutureTask) {
+ delegate.execute(runnable);
+ } else {
+ delegate.execute(newTaskFor(runnable, null));
+ }
}
public int getQueueSize()
diff --git
a/processing/src/test/java/org/apache/druid/query/PrioritizedExecutorServiceTest.java
b/processing/src/test/java/org/apache/druid/query/PrioritizedExecutorServiceTest.java
index c4afe6c..06954b8 100644
---
a/processing/src/test/java/org/apache/druid/query/PrioritizedExecutorServiceTest.java
+++
b/processing/src/test/java/org/apache/druid/query/PrioritizedExecutorServiceTest.java
@@ -41,6 +41,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
/**
+ *
*/
@RunWith(Parameterized.class)
public class PrioritizedExecutorServiceTest
@@ -182,6 +183,41 @@ public class PrioritizedExecutorServiceTest
Assert.assertEquals(expected, ImmutableList.copyOf(order));
}
+ @Test
+ public void testExecuteRegularRunnable()
+ {
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ Assert.assertThrows(
+ "Class does not implemented PrioritizedRunnable",
+ IllegalArgumentException.class,
+ () -> exec.execute(latch::countDown)
+ );
+ }
+
+ @Test
+ public void testExecutePrioritizedRunnable() throws InterruptedException
+ {
+ final CountDownLatch latch = new CountDownLatch(1);
+ exec.execute(
+ new PrioritizedRunnable()
+ {
+ @Override
+ public int getPriority()
+ {
+ return 1;
+ }
+
+ @Override
+ public void run()
+ {
+ latch.countDown();
+ }
+ }
+ );
+ latch.await();
+ }
+
// Make sure entries are processed FIFO
@Test
public void testOrderedExecutionEqualPriorityRunnable() throws
ExecutionException, InterruptedException
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]