This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new a7a4bfd331 modify QueryScheduler to lazily acquire lanes when
executing queries to avoid leaks (#14184)
a7a4bfd331 is described below
commit a7a4bfd331e320d8925e6f39fdc4861319e5b8bd
Author: Clint Wylie <[email protected]>
AuthorDate: Sun May 7 23:12:05 2023 -0700
modify QueryScheduler to lazily acquire lanes when executing queries to
avoid leaks (#14184)
This PR fixes an issue that could occur if druid.query.scheduler.numThreads
is configured and any exception occurs after QueryScheduler.run has been called
to create a Sequence. This would result in total and/or lane specific locks
being acquired, but because the sequence was not actually being evaluated, the
"baggage" which typically releases these locks was not being executed. An
example of how this can happen is if a group-by having filter, which wraps and
transforms this sequence [...]
---
.../query/groupby/GroupByQueryRunnerTest.java | 15 ++
.../org/apache/druid/server/QueryScheduler.java | 21 ++-
.../org/apache/druid/server/QueryResourceTest.java | 30 +--
.../apache/druid/server/QuerySchedulerTest.java | 207 +++++++++++++++++----
4 files changed, 221 insertions(+), 52 deletions(-)
diff --git
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
index 6d898e725f..70c9373a0e 100644
---
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
+++
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
@@ -228,6 +228,7 @@ public class GroupByQueryRunnerTest extends
InitializedNullHandlingTest
return GroupByStrategySelector.STRATEGY_V1;
}
};
+
final GroupByQueryConfig v1SingleThreadedConfig = new GroupByQueryConfig()
{
@Override
@@ -361,6 +362,20 @@ public class GroupByQueryRunnerTest extends
InitializedNullHandlingTest
);
}
+ public static GroupByQueryRunnerFactory makeQueryRunnerFactory(
+ final GroupByQueryConfig config
+ )
+ {
+ return makeQueryRunnerFactory(
+ DEFAULT_MAPPER,
+ config,
+ new TestGroupByBuffers(
+ DEFAULT_PROCESSING_CONFIG.intermediateComputeSizeBytes(),
+ DEFAULT_PROCESSING_CONFIG.getNumMergeBuffers()
+ ),
+ DEFAULT_PROCESSING_CONFIG
+ );
+ }
public static GroupByQueryRunnerFactory makeQueryRunnerFactory(
final GroupByQueryConfig config,
final TestGroupByBuffers bufferPools
diff --git a/server/src/main/java/org/apache/druid/server/QueryScheduler.java
b/server/src/main/java/org/apache/druid/server/QueryScheduler.java
index 762647aa86..fb44af0e7b 100644
--- a/server/src/main/java/org/apache/druid/server/QueryScheduler.java
+++ b/server/src/main/java/org/apache/druid/server/QueryScheduler.java
@@ -32,6 +32,8 @@ import org.apache.druid.client.SegmentServerSelector;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.LazySequence;
import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.SequenceWrapper;
+import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.core.NoopEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
@@ -185,8 +187,23 @@ public class QueryScheduler implements QueryWatcher
*/
public <T> Sequence<T> run(Query<?> query, Sequence<T> resultSequence)
{
- List<Bulkhead> bulkheads = acquireLanes(query);
- return resultSequence.withBaggage(() -> finishLanes(bulkheads));
+ return Sequences.wrap(resultSequence, new SequenceWrapper()
+ {
+ private List<Bulkhead> bulkheads = null;
+ @Override
+ public void before()
+ {
+ bulkheads = acquireLanes(query);
+ }
+
+ @Override
+ public void after(boolean isDone, Throwable thrown)
+ {
+ if (bulkheads != null) {
+ finishLanes(bulkheads);
+ }
+ }
+ });
}
/**
diff --git
a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java
b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java
index 35dbc63444..34c0a44a5c 100644
--- a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java
+++ b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java
@@ -896,7 +896,7 @@ public class QueryResourceTest
assertAsyncResponseAndCountdownOrBlockForever(
SIMPLE_TIMESERIES_QUERY,
waitAllFinished,
- response -> Assert.assertEquals(Response.Status.OK.getStatusCode(),
response.getStatus())
+ response -> Assert.assertEquals(Status.OK.getStatusCode(),
response.getStatus())
);
waitTwoScheduled.await();
assertSynchronousResponseAndCountdownOrBlockForever(
@@ -1043,19 +1043,21 @@ public class QueryResourceTest
return (queryPlus, responseContext) -> {
beforeScheduler.forEach(CountDownLatch::countDown);
- return scheduler.run(
- scheduler.prioritizeAndLaneQuery(queryPlus, ImmutableSet.of()),
- new LazySequence<T>(() -> {
- inScheduler.forEach(CountDownLatch::countDown);
- try {
- // pretend to be a query that is waiting on results
- Thread.sleep(500);
- }
- catch (InterruptedException ignored) {
- }
- // all that waiting for nothing :(
- return Sequences.empty();
- })
+ return Sequences.simple(
+ scheduler.run(
+ scheduler.prioritizeAndLaneQuery(queryPlus,
ImmutableSet.of()),
+ new LazySequence<T>(() -> {
+ inScheduler.forEach(CountDownLatch::countDown);
+ try {
+ // pretend to be a query that is waiting on results
+ Thread.sleep(500);
+ }
+ catch (InterruptedException ignored) {
+ }
+ // all that waiting for nothing :(
+ return Sequences.empty();
+ })
+ ).toList()
);
};
}
diff --git
a/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java
b/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java
index 571684cacc..1f3acac8fc 100644
--- a/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java
+++ b/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java
@@ -37,6 +37,7 @@ import org.apache.druid.guice.JsonConfigurator;
import org.apache.druid.guice.annotations.Global;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.LazySequence;
import org.apache.druid.java.util.common.guava.Sequence;
@@ -46,10 +47,20 @@ import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.java.util.emitter.core.NoopEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.query.FluentQueryRunnerBuilder;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryCapacityExceededException;
import org.apache.druid.query.QueryPlus;
+import org.apache.druid.query.QueryRunnerFactory;
+import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.groupby.GroupByQuery;
+import org.apache.druid.query.groupby.GroupByQueryConfig;
+import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
+import org.apache.druid.query.groupby.ResultRow;
+import org.apache.druid.query.groupby.having.HavingSpec;
+import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
import org.apache.druid.query.topn.TopNQuery;
import org.apache.druid.query.topn.TopNQueryBuilder;
import org.apache.druid.server.initialization.ServerConfig;
@@ -60,9 +71,7 @@ import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.util.ArrayList;
@@ -81,9 +90,6 @@ public class QuerySchedulerTest
private static final int TEST_HI_CAPACITY = 5;
private static final int TEST_LO_CAPACITY = 2;
- @Rule
- public ExpectedException expected = ExpectedException.none();
-
private ListeningExecutorService executorService;
private ObservableQueryScheduler scheduler;
@@ -176,10 +182,8 @@ public class QuerySchedulerTest
}
@Test
- public void testHiLoReleaseLaneWhenSequenceExplodes() throws Exception
+ public void testHiLoReleaseLaneWhenSequenceExplodes()
{
- expected.expectMessage("exploded");
- expected.expect(ExecutionException.class);
TopNQuery interactive = makeInteractiveQuery();
ListenableFuture<?> future = executorService.submit(() -> {
try {
@@ -204,71 +208,91 @@ public class QuerySchedulerTest
throw new RuntimeException(ex);
}
});
- future.get();
+ Throwable t = Assert.assertThrows(ExecutionException.class, future::get);
+ Assert.assertEquals("java.lang.RuntimeException: exploded",
t.getMessage());
+ Assert.assertEquals(5, scheduler.getTotalAvailableCapacity());
}
@Test
public void testHiLoFailsWhenOutOfLaneCapacity()
{
- expected.expectMessage(
-
QueryCapacityExceededException.makeLaneErrorMessage(HiLoQueryLaningStrategy.LOW,
TEST_LO_CAPACITY)
- );
- expected.expect(QueryCapacityExceededException.class);
-
Query<?> report1 =
scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(makeReportQuery()),
ImmutableSet.of());
- scheduler.run(report1, Sequences.empty());
+ Sequence<?> sequence = scheduler.run(report1, Sequences.empty());
+ // making the sequence doesn't count, only running it does
+ Assert.assertEquals(5, scheduler.getTotalAvailableCapacity());
+ // this counts though since we are doing stuff
+ Yielders.each(sequence);
Assert.assertNotNull(report1);
Assert.assertEquals(4, scheduler.getTotalAvailableCapacity());
Assert.assertEquals(1,
scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW));
Query<?> report2 =
scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(makeReportQuery()),
ImmutableSet.of());
- scheduler.run(report2, Sequences.empty());
+ Yielders.each(scheduler.run(report2, Sequences.empty()));
Assert.assertNotNull(report2);
Assert.assertEquals(3, scheduler.getTotalAvailableCapacity());
Assert.assertEquals(0,
scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW));
// too many reports
- scheduler.run(
- scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(makeReportQuery()),
ImmutableSet.of()), Sequences.empty()
+ Throwable t = Assert.assertThrows(
+ QueryCapacityExceededException.class,
+ () -> Yielders.each(
+ scheduler.run(
+
scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(makeReportQuery()),
ImmutableSet.of()),
+ Sequences.empty()
+ )
+ )
+ );
+ Assert.assertEquals(
+ "Too many concurrent queries for lane 'low', query capacity of 2
exceeded. Please try your query again later.",
+ t.getMessage()
);
}
@Test
public void testHiLoFailsWhenOutOfTotalCapacity()
{
-
expected.expectMessage(QueryCapacityExceededException.makeTotalErrorMessage(TEST_HI_CAPACITY));
- expected.expect(QueryCapacityExceededException.class);
-
Query<?> interactive1 =
scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(makeInteractiveQuery()),
ImmutableSet.of());
- scheduler.run(interactive1, Sequences.empty());
+ Sequence<?> sequence = scheduler.run(interactive1, Sequences.empty());
+ // making the sequence doesn't count, only running it does
+ Assert.assertEquals(5, scheduler.getTotalAvailableCapacity());
+ // this counts tho
+ Yielders.each(sequence);
Assert.assertNotNull(interactive1);
Assert.assertEquals(4, scheduler.getTotalAvailableCapacity());
Query<?> report1 =
scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(makeReportQuery()),
ImmutableSet.of());
- scheduler.run(report1, Sequences.empty());
+ Yielders.each(scheduler.run(report1, Sequences.empty()));
Assert.assertNotNull(report1);
Assert.assertEquals(3, scheduler.getTotalAvailableCapacity());
Assert.assertEquals(1,
scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW));
Query<?> interactive2 =
scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(makeInteractiveQuery()),
ImmutableSet.of());
- scheduler.run(interactive2, Sequences.empty());
+ Yielders.each(scheduler.run(interactive2, Sequences.empty()));
Assert.assertNotNull(interactive2);
Assert.assertEquals(2, scheduler.getTotalAvailableCapacity());
Query<?> report2 =
scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(makeReportQuery()),
ImmutableSet.of());
- scheduler.run(report2, Sequences.empty());
+ Yielders.each(scheduler.run(report2, Sequences.empty()));
Assert.assertNotNull(report2);
Assert.assertEquals(1, scheduler.getTotalAvailableCapacity());
Assert.assertEquals(0,
scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW));
Query<?> interactive3 =
scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(makeInteractiveQuery()),
ImmutableSet.of());
- scheduler.run(interactive3, Sequences.empty());
+ Yielders.each(scheduler.run(interactive3, Sequences.empty()));
Assert.assertNotNull(interactive3);
Assert.assertEquals(0, scheduler.getTotalAvailableCapacity());
// one too many
- scheduler.run(
-
scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(makeInteractiveQuery()),
ImmutableSet.of()), Sequences.empty()
+ Throwable t = Assert.assertThrows(
+ QueryCapacityExceededException.class,
+ () -> Yielders.each(scheduler.run(
+
scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(makeInteractiveQuery()),
ImmutableSet.of()),
+ Sequences.empty()
+ ))
+ );
+ Assert.assertEquals(
+ "Too many concurrent queries, total query capacity of 5 exceeded.
Please try your query again later.",
+ t.getMessage()
);
}
@@ -324,6 +348,74 @@ public class QuerySchedulerTest
getFuturesAndAssertAftermathIsChill(futures, scheduler, true, true);
}
+ @Test
+ public void testExplodingWrapperDoesNotLeakLocks()
+ {
+ scheduler = new ObservableQueryScheduler(
+ 5,
+ ManualQueryPrioritizationStrategy.INSTANCE,
+ new NoQueryLaningStrategy(),
+ new ServerConfig()
+ );
+
+ QueryRunnerFactory factory = GroupByQueryRunnerTest.makeQueryRunnerFactory(
+ new GroupByQueryConfig()
+ {
+ @Override
+ public String getDefaultStrategy()
+ {
+ return GroupByStrategySelector.STRATEGY_V2;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "v2";
+ }
+ }
+ );
+ Future<?> f = makeMergingQueryFuture(
+ executorService,
+ scheduler,
+ GroupByQuery.builder()
+ .setDataSource("foo")
+ .setInterval("2020-01-01/2020-01-02")
+ .setDimensions(DefaultDimensionSpec.of("bar"))
+ .setAggregatorSpecs(new CountAggregatorFactory("chocula"))
+ .setGranularity(Granularities.ALL)
+ .setHavingSpec(
+ new HavingSpec()
+ {
+ @Override
+ public void setQuery(GroupByQuery query)
+ {
+ throw new RuntimeException("exploded");
+ }
+
+ @Override
+ public boolean eval(ResultRow row)
+ {
+ return false;
+ }
+
+ @Override
+ public byte[] getCacheKey()
+ {
+ return new byte[0];
+ }
+ }
+ )
+ .build(),
+ factory.getToolchest(),
+ NUM_ROWS
+ );
+
+ Assert.assertEquals(5, scheduler.getTotalAvailableCapacity());
+ Throwable t = Assert.assertThrows(Throwable.class, f::get);
+ Assert.assertEquals("java.lang.RuntimeException: exploded",
t.getMessage());
+ Assert.assertEquals(5, scheduler.getTotalAvailableCapacity());
+ }
+
@Test
public void testConfigNone()
{
@@ -367,7 +459,6 @@ public class QuerySchedulerTest
@Test
public void testMisConfigHiLo()
{
- expected.expect(ProvisionException.class);
final Injector injector = createInjector();
final String propertyPrefix = "druid.query.scheduler";
final JsonConfigProvider<QuerySchedulerProvider> provider =
JsonConfigProvider.of(
@@ -377,9 +468,16 @@ public class QuerySchedulerTest
final Properties properties = new Properties();
properties.setProperty(propertyPrefix + ".laning.strategy", "hilo");
provider.inject(properties, injector.getInstance(JsonConfigurator.class));
- final QueryScheduler scheduler = provider.get().get().get();
- Assert.assertEquals(10, scheduler.getTotalAvailableCapacity());
- Assert.assertEquals(2,
scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW));
+ Throwable t = Assert.assertThrows(ProvisionException.class, () ->
provider.get().get().get());
+ Assert.assertEquals(
+ "Unable to provision, see the following errors:\n"
+ + "\n"
+ + "1) Problem parsing object at prefix[druid.query.scheduler]: Cannot
construct instance of
`org.apache.druid.server.scheduling.HiLoQueryLaningStrategy`, problem:
maxLowPercent must be set\n"
+ + " at [Source: UNKNOWN; line: -1, column: -1] (through reference
chain: org.apache.druid.server.QuerySchedulerProvider[\"laning\"]).\n"
+ + "\n"
+ + "1 error",
+ t.getMessage()
+ );
}
@Test
@@ -418,7 +516,6 @@ public class QuerySchedulerTest
@Test
public void testMisConfigThreshold()
{
- expected.expect(ProvisionException.class);
final Injector injector = createInjector();
final String propertyPrefix = "druid.query.scheduler";
final JsonConfigProvider<QuerySchedulerProvider> provider =
JsonConfigProvider.of(
@@ -428,9 +525,16 @@ public class QuerySchedulerTest
final Properties properties = new Properties();
properties.setProperty(propertyPrefix + ".prioritization.strategy",
"threshold");
provider.inject(properties, injector.getInstance(JsonConfigurator.class));
- final QueryScheduler scheduler = provider.get().get().get();
- Assert.assertEquals(10, scheduler.getTotalAvailableCapacity());
- Assert.assertEquals(2,
scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW));
+ Throwable t = Assert.assertThrows(ProvisionException.class, () ->
provider.get().get().get());
+ Assert.assertEquals(
+ "Unable to provision, see the following errors:\n"
+ + "\n"
+ + "1) Problem parsing object at prefix[druid.query.scheduler]: Cannot
construct instance of
`org.apache.druid.server.scheduling.ThresholdBasedQueryPrioritizationStrategy`,
problem: periodThreshold, durationThreshold, or segmentCountThreshold must be
set\n"
+ + " at [Source: UNKNOWN; line: -1, column: -1] (through reference
chain: org.apache.druid.server.QuerySchedulerProvider[\"prioritization\"]).\n"
+ + "\n"
+ + "1 error",
+ t.getMessage()
+ );
}
@@ -497,6 +601,7 @@ public class QuerySchedulerTest
.context(ImmutableMap.of("queryId", "default-" + UUID.randomUUID()))
.build();
}
+
private TopNQuery makeInteractiveQuery()
{
return makeBaseBuilder()
@@ -638,6 +743,36 @@ public class QuerySchedulerTest
});
}
+ private ListenableFuture<?> makeMergingQueryFuture(
+ ListeningExecutorService executorService,
+ QueryScheduler scheduler,
+ Query<?> query,
+ QueryToolChest toolChest,
+ int numRows
+ )
+ {
+ return executorService.submit(() -> {
+ try {
+ Query<?> scheduled =
scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(query), ImmutableSet.of());
+
+ Assert.assertNotNull(scheduled);
+
+ FluentQueryRunnerBuilder fluentQueryRunnerBuilder = new
FluentQueryRunnerBuilder(toolChest);
+ FluentQueryRunnerBuilder.FluentQueryRunner runner =
fluentQueryRunnerBuilder.create((queryPlus, responseContext) -> {
+ Sequence<Integer> underlyingSequence = makeSequence(numRows);
+ Sequence<Integer> results = scheduler.run(scheduled,
underlyingSequence);
+ return results;
+ });
+
+ final int actualNumRows =
consumeAndCloseSequence(runner.mergeResults().run(QueryPlus.wrap(query)));
+ Assert.assertEquals(actualNumRows, numRows);
+ }
+ catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ });
+ }
+
private void getFuturesAndAssertAftermathIsChill(
List<Future<?>> futures,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]