This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new bde65b29b47 MSQ: Add "maxThreads" context parameter. (#18520)
bde65b29b47 is described below
commit bde65b29b476c759063942ec9723ff179740ebca
Author: Gian Merlino <[email protected]>
AuthorDate: Tue Sep 16 11:25:17 2025 -0700
MSQ: Add "maxThreads" context parameter. (#18520)
* MSQ: Add "maxThreads" context parameter.
Allows users to reduce the number of threads per data server or task.
Useful for benchmarking single-threaded performance, or for limiting
the impact of a heavyweight query.
* Fix imports.
* Handle edge cases.
---
docs/multi-stage-query/reference.md | 1 +
.../druid/msq/dart/worker/DartWorkerContext.java | 14 +++---
.../druid/msq/exec/ExecutionContextImpl.java | 9 +++-
.../org/apache/druid/msq/exec/WorkerContext.java | 9 +---
.../msq/indexing/IndexerControllerContext.java | 5 ++
.../druid/msq/indexing/IndexerWorkerContext.java | 15 +++---
.../druid/msq/util/MultiStageQueryContext.java | 10 ++++
.../druid/msq/test/MSQTestWorkerContext.java | 6 ---
.../druid/msq/util/MultiStageQueryContextTest.java | 14 ++++++
.../org/apache/druid/frame/processor/Bouncer.java | 56 +++++++++++++++++++--
.../frame/processor/RunAllFullyWidgetTest.java | 57 +++++++++++++++++-----
11 files changed, 149 insertions(+), 47 deletions(-)
diff --git a/docs/multi-stage-query/reference.md
b/docs/multi-stage-query/reference.md
index 672f8050a1e..a3ecd01abe6 100644
--- a/docs/multi-stage-query/reference.md
+++ b/docs/multi-stage-query/reference.md
@@ -418,6 +418,7 @@ The following table lists the context parameters for the
MSQ task engine:
| `storeCompactionState` | REPLACE<br /><br /> When set to true, a REPLACE
query stores as part of each segment's metadata a `lastCompactionState` field
that captures the various specs used to create the segment. Future compaction
jobs skip segments whose `lastCompactionState` matches the desired compaction
state. Works the same as
[`storeCompactionState`](../ingestion/tasks.md#context-parameters) task context
flag. | `false` |
| `removeNullBytes` | SELECT, INSERT or REPLACE<br /><br /> The MSQ engine
cannot process null bytes in strings and throws `InvalidNullByteFault` if it
encounters them in the source data. If the parameter is set to true, The MSQ
engine will remove the null bytes in string fields when reading the data. |
`false` |
| `maxFrameSize` | SELECT, INSERT or REPLACE<br /><br />Size of frames used
for data transfer within the MSQ engine. You generally do not need to change
this unless you have very large rows. | `1000000` (1 MB) |
+| `maxThreads` | SELECT, INSERT or REPLACE<br /><br />Maximum number of
threads to use for processing. This only has an effect if it is greater than
zero and less than the default thread count based on system configuration.
Otherwise, it is ignored, and workers use the default thread count. | Not set
(use default thread count) |
## Joins
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContext.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContext.java
index f4e9f3b008f..0bfd79e999f 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContext.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContext.java
@@ -80,6 +80,7 @@ public class DartWorkerContext implements WorkerContext
private final File tempDir;
private final QueryContext queryContext;
private final ServiceEmitter emitter;
+ private final int threadCount;
/**
* Lazy initialized upon call to {@link #frameContext(WorkOrder)}.
@@ -128,6 +129,11 @@ public class DartWorkerContext implements WorkerContext
this.tempDir = tempDir;
this.queryContext = Preconditions.checkNotNull(queryContext,
"queryContext");
this.emitter = emitter;
+
+ // Compute thread count once in constructor
+ final int baseThreadCount = processingConfig.getNumThreads();
+ final Integer maxThreads =
MultiStageQueryContext.getMaxThreads(queryContext);
+ this.threadCount = (maxThreads != null && maxThreads > 0) ?
Math.min(baseThreadCount, maxThreads) : baseThreadCount;
}
@Override
@@ -242,13 +248,7 @@ public class DartWorkerContext implements WorkerContext
@Override
public int threadCount()
{
- return processingConfig.getNumThreads();
- }
-
- @Override
- public DataServerQueryHandlerFactory dataServerQueryHandlerFactory()
- {
- return dataServerQueryHandlerFactory;
+ return threadCount;
}
@Override
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExecutionContextImpl.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExecutionContextImpl.java
index a1d4ec9f282..9b3c4276087 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExecutionContextImpl.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExecutionContextImpl.java
@@ -148,9 +148,14 @@ public class ExecutionContextImpl implements
ExecutionContext
public Bouncer processingBouncer()
{
if (workOrder.getStageDefinition().getProcessor().usesProcessingBuffers())
{
- return frameContext.processingBuffers().getBouncer();
+ final Bouncer baseBouncer =
frameContext.processingBuffers().getBouncer();
+ if (maxOutstandingProcessors < baseBouncer.getMaxCount()) {
+ return new Bouncer(maxOutstandingProcessors, baseBouncer);
+ } else {
+ return baseBouncer;
+ }
} else {
- return Bouncer.unlimited();
+ return new Bouncer(maxOutstandingProcessors);
}
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerContext.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerContext.java
index 66bd2893410..ceac7728c07 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerContext.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerContext.java
@@ -97,12 +97,12 @@ public interface WorkerContext extends Closeable
File tempDir();
/**
- * Create a context with useful objects required by {@link
StageProcessor#makeProcessors}.
+ * Create a context with useful objects required by {@link
StageProcessor#execute(ExecutionContext)}.
*/
FrameContext frameContext(WorkOrder workOrder);
/**
- * Number of available processing threads.
+ * Number of available processing threads. Workers must not use more than
this number of threads.
*/
int threadCount();
@@ -111,11 +111,6 @@ public interface WorkerContext extends Closeable
*/
DruidNode selfNode();
- /**
- * Returns the factory for {@link DataServerQueryHandler} from the context.
Used to query realtime tasks.
- */
- DataServerQueryHandlerFactory dataServerQueryHandlerFactory();
-
/**
* Whether to include all counters in reports. See {@link
MultiStageQueryContext#CTX_INCLUDE_ALL_COUNTERS} for detail.
*/
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
index 738b3c94758..86966a92236 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
@@ -309,6 +309,7 @@ public class IndexerControllerContext implements
ControllerContext
final boolean includeAllCounters =
MultiStageQueryContext.getIncludeAllCounters(queryContext);
final boolean isReindex = MultiStageQueryContext.isReindex(queryContext);
final int frameSize = MultiStageQueryContext.getFrameSize(queryContext);
+ final Integer maxThreads =
MultiStageQueryContext.getMaxThreads(queryContext);
final ImmutableMap.Builder<String, Object> builder =
ImmutableMap.builder();
builder
@@ -321,6 +322,10 @@ public class IndexerControllerContext implements
ControllerContext
.put(MultiStageQueryContext.CTX_INCLUDE_ALL_COUNTERS,
includeAllCounters)
.put(MultiStageQueryContext.CTX_MAX_FRAME_SIZE, frameSize);
+ if (maxThreads != null) {
+ builder.put(MultiStageQueryContext.CTX_MAX_THREADS, maxThreads);
+ }
+
if (querySpec.getId() != null) {
builder.put(BaseQuery.QUERY_ID, querySpec.getId());
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
index c48df4bdea5..873a0619c04 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
@@ -30,7 +30,6 @@ import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.exec.ControllerClient;
-import org.apache.druid.msq.exec.DataServerQueryHandlerFactory;
import org.apache.druid.msq.exec.FrameContext;
import org.apache.druid.msq.exec.FrameWriterSpec;
import org.apache.druid.msq.exec.MSQMetriceEventBuilder;
@@ -83,6 +82,7 @@ public class IndexerWorkerContext implements WorkerContext
private final ProcessingBuffersProvider processingBuffersProvider;
private final int maxConcurrentStages;
private final boolean includeAllCounters;
+ private final int threadCount;
// Written under synchronized(this) using double-checked locking.
private volatile ResourceHolder<ProcessingBuffersSet> processingBuffersSet;
@@ -118,6 +118,11 @@ public class IndexerWorkerContext implements WorkerContext
IndexerControllerContext.DEFAULT_MAX_CONCURRENT_STAGES
);
this.includeAllCounters =
MultiStageQueryContext.getIncludeAllCounters(queryContext);
+
+ // Compute thread count once in constructor
+ final int baseThreadCount = memoryIntrospector.numProcessingThreads();
+ final Integer maxThreads =
MultiStageQueryContext.getMaxThreads(queryContext);
+ this.threadCount = (maxThreads != null && maxThreads > 0) ?
Math.min(baseThreadCount, maxThreads) : baseThreadCount;
final StorageConnectorProvider storageConnectorProvider =
injector.getInstance(Key.get(
StorageConnectorProvider.class,
MultiStageQuery.class
@@ -286,7 +291,7 @@ public class IndexerWorkerContext implements WorkerContext
@Override
public int threadCount()
{
- return memoryIntrospector.numProcessingThreads();
+ return threadCount;
}
@Override
@@ -295,12 +300,6 @@ public class IndexerWorkerContext implements WorkerContext
return toolbox.getDruidNode();
}
- @Override
- public DataServerQueryHandlerFactory dataServerQueryHandlerFactory()
- {
- return dataServerQueryHandlerFactory;
- }
-
@Override
public boolean includeAllCounters()
{
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
index 1cd222c4f6c..3e9154d8102 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
@@ -225,6 +225,11 @@ public class MultiStageQueryContext
*/
public static final String CTX_MAX_FRAME_SIZE = "maxFrameSize";
+ /**
+ * Maximum number of threads to use for processing. Acts as a cap on the
value of {@link WorkerContext#threadCount()}.
+ */
+ public static final String CTX_MAX_THREADS = "maxThreads";
+
private static final Pattern LOOKS_LIKE_JSON_ARRAY =
Pattern.compile("^\\s*\\[.*", Pattern.DOTALL);
public static String getMSQMode(final QueryContext queryContext)
@@ -508,6 +513,11 @@ public class MultiStageQueryContext
return queryContext.getInt(CTX_MAX_FRAME_SIZE,
WorkerMemoryParameters.DEFAULT_FRAME_SIZE);
}
+ public static Integer getMaxThreads(final QueryContext queryContext)
+ {
+ return queryContext.getInt(CTX_MAX_THREADS);
+ }
+
/**
* Decodes a list from either a JSON or CSV string.
*/
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java
index a01e950a821..23dd5617cde 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java
@@ -189,12 +189,6 @@ public class MSQTestWorkerContext implements WorkerContext
return 2;
}
- @Override
- public DataServerQueryHandlerFactory dataServerQueryHandlerFactory()
- {
- return injector.getInstance(DataServerQueryHandlerFactory.class);
- }
-
@Override
public boolean includeAllCounters()
{
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java
index 3dc8591641f..95bfe2c7fff 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java
@@ -49,6 +49,7 @@ import static
org.apache.druid.msq.util.MultiStageQueryContext.CTX_FAULT_TOLERAN
import static
org.apache.druid.msq.util.MultiStageQueryContext.CTX_FINALIZE_AGGREGATIONS;
import static
org.apache.druid.msq.util.MultiStageQueryContext.CTX_MAX_FRAME_SIZE;
import static
org.apache.druid.msq.util.MultiStageQueryContext.CTX_MAX_NUM_TASKS;
+import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_MAX_THREADS;
import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_MSQ_MODE;
import static
org.apache.druid.msq.util.MultiStageQueryContext.CTX_REMOVE_NULL_BYTES;
import static
org.apache.druid.msq.util.MultiStageQueryContext.CTX_ROWS_IN_MEMORY;
@@ -361,6 +362,19 @@ public class MultiStageQueryContextTest
Assert.assertEquals(500000,
MultiStageQueryContext.getFrameSize(QueryContext.of(propertyMap)));
}
+ @Test
+ public void getMaxThreads_unset_returnsNull()
+ {
+
Assert.assertNull(MultiStageQueryContext.getMaxThreads(QueryContext.empty()));
+ }
+
+ @Test
+ public void getMaxThreads_set_returnsCorrectValue()
+ {
+ Map<String, Object> propertyMap = ImmutableMap.of(CTX_MAX_THREADS, 4);
+ Assert.assertEquals(Integer.valueOf(4),
MultiStageQueryContext.getMaxThreads(QueryContext.of(propertyMap)));
+ }
+
private static List<String> decodeSortOrder(@Nullable final String input)
{
return
MultiStageQueryContext.decodeList(MultiStageQueryContext.CTX_SORT_ORDER, input);
diff --git
a/processing/src/main/java/org/apache/druid/frame/processor/Bouncer.java
b/processing/src/main/java/org/apache/druid/frame/processor/Bouncer.java
index 40b545958d9..16d2a4ac6dd 100644
--- a/processing/src/main/java/org/apache/druid/frame/processor/Bouncer.java
+++ b/processing/src/main/java/org/apache/druid/frame/processor/Bouncer.java
@@ -24,8 +24,10 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.java.util.common.ISE;
+import javax.annotation.Nullable;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -37,6 +39,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
public class Bouncer
{
+ @Nullable
+ private final Bouncer parentBouncer;
private final int maxCount;
private final Object lock = new Object();
@@ -47,8 +51,25 @@ public class Bouncer
@GuardedBy("lock")
private final Queue<SettableFuture<Ticket>> waiters = new ArrayDeque<>();
+ /**
+ * Create a Bouncer that issues a limited number of tickets.
+ */
public Bouncer(final int maxCount)
{
+ this.parentBouncer = null;
+ this.maxCount = maxCount;
+
+ if (maxCount <= 0) {
+ throw new ISE("maxConcurrentWorkers must be greater than zero");
+ }
+ }
+
+ /**
+ * Create a Bouncer that issues a limited number of tickets from a parent.
+ */
+ public Bouncer(final int maxCount, @Nullable final Bouncer parentBouncer)
+ {
+ this.parentBouncer = parentBouncer;
this.maxCount = maxCount;
if (maxCount <= 0) {
@@ -63,16 +84,29 @@ public class Bouncer
public int getMaxCount()
{
- return maxCount;
+ return parentBouncer == null ? maxCount :
Math.min(parentBouncer.getMaxCount(), maxCount);
}
public ListenableFuture<Ticket> ticket()
+ {
+ // Acquire parent ticket first, if there's a parent.
+ if (parentBouncer != null) {
+ return FutureUtils.transformAsync(parentBouncer.ticket(),
this::ticketInternal);
+ } else {
+ return ticketInternal(null);
+ }
+ }
+
+ /**
+ * Acquire a ticket from this Bouncer. Precondition: if there is a
parentBouncer, only call this method when
+ * holding a parent ticket.
+ */
+ private ListenableFuture<Ticket> ticketInternal(@Nullable final Ticket
parentTicket)
{
synchronized (lock) {
if (currentCount < maxCount) {
currentCount++;
- //noinspection UnstableApiUsage
- return Futures.immediateFuture(new Ticket());
+ return Futures.immediateFuture(new Ticket(parentTicket));
} else {
final SettableFuture<Ticket> future = SettableFuture.create();
waiters.add(future);
@@ -91,8 +125,15 @@ public class Bouncer
public class Ticket
{
+ @Nullable
+ private final Ticket parentTicket;
private final AtomicBoolean givenBack = new AtomicBoolean();
+ public Ticket(@Nullable Ticket parentTicket)
+ {
+ this.parentTicket = parentTicket;
+ }
+
public void giveBack()
{
if (!givenBack.compareAndSet(false, true)) {
@@ -110,14 +151,19 @@ public class Bouncer
if (nextFuture == null) {
// Nobody was waiting.
currentCount--;
- return;
+ break;
}
}
- if (nextFuture.set(new Ticket())) {
+ if (nextFuture.set(new Ticket(parentTicket))) {
return;
}
}
+
+ // Dispose of the parent ticket, if we have one.
+ if (parentTicket != null) {
+ parentTicket.giveBack();
+ }
}
}
}
diff --git
a/processing/src/test/java/org/apache/druid/frame/processor/RunAllFullyWidgetTest.java
b/processing/src/test/java/org/apache/druid/frame/processor/RunAllFullyWidgetTest.java
index 6f6c15f74c2..deba9001fd3 100644
---
a/processing/src/test/java/org/apache/druid/frame/processor/RunAllFullyWidgetTest.java
+++
b/processing/src/test/java/org/apache/druid/frame/processor/RunAllFullyWidgetTest.java
@@ -69,7 +69,8 @@ import java.util.stream.IntStream;
@RunWith(Parameterized.class)
public class RunAllFullyWidgetTest extends
FrameProcessorExecutorTest.BaseFrameProcessorExecutorTestSuite
{
- private final int bouncerPoolSize;
+ private final int lowerBouncerPoolSize;
+ private final int higherBouncerPoolSize;
private final int maxOutstandingProcessors;
private final boolean delayed;
private final AtomicLong closed = new AtomicLong();
@@ -82,25 +83,46 @@ public class RunAllFullyWidgetTest extends
FrameProcessorExecutorTest.BaseFrameP
@GuardedBy("this")
private int concurrentNow = 0;
- public RunAllFullyWidgetTest(int numThreads, int bouncerPoolSize, int
maxOutstandingProcessors, boolean delayed)
+ public RunAllFullyWidgetTest(
+ int numThreads,
+ int lowerBouncerPoolSize,
+ int higherBouncerPoolSize,
+ int maxOutstandingProcessors,
+ boolean delayed
+ )
{
super(numThreads);
- this.bouncerPoolSize = bouncerPoolSize;
+ this.lowerBouncerPoolSize = lowerBouncerPoolSize;
+ this.higherBouncerPoolSize = higherBouncerPoolSize;
this.maxOutstandingProcessors = maxOutstandingProcessors;
this.delayed = delayed;
}
@Parameterized.Parameters(name =
- "numThreads = {0}, bouncerPoolSize = {1}, maxOutstandingProcessors =
{2}, delayed = {3}")
+ "numThreads = {0}, "
+ + "lowerBouncerPoolSize = {1}, "
+ + "higherBouncerPoolSize = {2}, "
+ + "maxOutstandingProcessors = {3}, "
+ + "delayed = {4}")
public static Collection<Object[]> constructorFeeder()
{
final List<Object[]> constructors = new ArrayList<>();
for (int numThreads : new int[]{1, 3, 12}) {
- for (int bouncerPoolSize : new int[]{1, 3, 12, Integer.MAX_VALUE}) {
- for (int maxOutstandingProcessors : new int[]{1, 3, 12}) {
- for (boolean delayed : new boolean[]{false, true}) {
- constructors.add(new Object[]{numThreads, bouncerPoolSize,
maxOutstandingProcessors, delayed});
+ for (int lowerBouncerPoolSize : new int[]{1, 3, 12, Integer.MAX_VALUE}) {
+ for (int higherBouncerPoolSize : new int[]{-1, 1, 3, 12,
Integer.MAX_VALUE}) {
+ for (int maxOutstandingProcessors : new int[]{1, 3, 12}) {
+ for (boolean delayed : new boolean[]{false, true}) {
+ constructors.add(
+ new Object[]{
+ numThreads,
+ lowerBouncerPoolSize,
+ higherBouncerPoolSize,
+ maxOutstandingProcessors,
+ delayed
+ }
+ );
+ }
}
}
}
@@ -114,7 +136,10 @@ public class RunAllFullyWidgetTest extends
FrameProcessorExecutorTest.BaseFrameP
public void setUp() throws Exception
{
super.setUp();
- bouncer = bouncerPoolSize == Integer.MAX_VALUE ? Bouncer.unlimited() : new
Bouncer(bouncerPoolSize);
+ bouncer = new Bouncer(lowerBouncerPoolSize);
+ if (higherBouncerPoolSize != -1) {
+ bouncer = new Bouncer(higherBouncerPoolSize, bouncer);
+ }
synchronized (this) {
concurrentNow = 0;
@@ -130,12 +155,19 @@ public class RunAllFullyWidgetTest extends
FrameProcessorExecutorTest.BaseFrameP
synchronized (this) {
Assert.assertEquals(0, concurrentNow);
- MatcherAssert.assertThat(concurrentHighWatermark,
Matchers.lessThanOrEqualTo(bouncerPoolSize));
+ MatcherAssert.assertThat(concurrentHighWatermark,
Matchers.lessThanOrEqualTo(lowerBouncerPoolSize));
+ if (higherBouncerPoolSize != -1) {
+ MatcherAssert.assertThat(concurrentHighWatermark,
Matchers.lessThanOrEqualTo(higherBouncerPoolSize));
+ }
MatcherAssert.assertThat(concurrentHighWatermark,
Matchers.lessThanOrEqualTo(maxOutstandingProcessors));
}
Assert.assertEquals("Bouncer current running count", 0,
bouncer.getCurrentCount());
- Assert.assertEquals("Bouncer max pool size", bouncerPoolSize,
bouncer.getMaxCount());
+ Assert.assertEquals(
+ "Bouncer max pool size",
+ higherBouncerPoolSize == -1 ? lowerBouncerPoolSize :
Math.min(lowerBouncerPoolSize, higherBouncerPoolSize),
+ bouncer.getMaxCount()
+ );
Assert.assertEquals("Encountered single close (from ensureClose)", 1,
closed.get());
}
@@ -402,7 +434,8 @@ public class RunAllFullyWidgetTest extends
FrameProcessorExecutorTest.BaseFrameP
@SuppressWarnings("BusyWait")
public void test_runAllFully_futureCancel() throws InterruptedException
{
- final int expectedRunningProcessors = Math.min(Math.min(bouncerPoolSize,
maxOutstandingProcessors), numThreads);
+ final int expectedRunningProcessors =
+ Math.min(Math.min(bouncer.getMaxCount(), maxOutstandingProcessors),
numThreads);
final List<SleepyFrameProcessor> processors =
IntStream.range(0, 10 * expectedRunningProcessors)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]