This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new a7a4bfd331 modify QueryScheduler to lazily acquire lanes when 
executing queries to avoid leaks (#14184)
a7a4bfd331 is described below

commit a7a4bfd331e320d8925e6f39fdc4861319e5b8bd
Author: Clint Wylie <[email protected]>
AuthorDate: Sun May 7 23:12:05 2023 -0700

    modify QueryScheduler to lazily acquire lanes when executing queries to 
avoid leaks (#14184)
    
    This PR fixes an issue that could occur if druid.query.scheduler.numThreads 
is configured and any exception occurs after QueryScheduler.run has been called 
to create a Sequence. This would result in total and/or lane specific locks 
being acquired, but because the sequence was not actually being evaluated, the 
"baggage" which typically releases these locks was not being executed. An 
example of how this can happen is if a group-by having filter, which wraps and 
transforms this sequence  [...]
---
 .../query/groupby/GroupByQueryRunnerTest.java      |  15 ++
 .../org/apache/druid/server/QueryScheduler.java    |  21 ++-
 .../org/apache/druid/server/QueryResourceTest.java |  30 +--
 .../apache/druid/server/QuerySchedulerTest.java    | 207 +++++++++++++++++----
 4 files changed, 221 insertions(+), 52 deletions(-)

diff --git 
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
 
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
index 6d898e725f..70c9373a0e 100644
--- 
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
@@ -228,6 +228,7 @@ public class GroupByQueryRunnerTest extends 
InitializedNullHandlingTest
         return GroupByStrategySelector.STRATEGY_V1;
       }
     };
+
     final GroupByQueryConfig v1SingleThreadedConfig = new GroupByQueryConfig()
     {
       @Override
@@ -361,6 +362,20 @@ public class GroupByQueryRunnerTest extends 
InitializedNullHandlingTest
     );
   }
 
+  public static GroupByQueryRunnerFactory makeQueryRunnerFactory(
+      final GroupByQueryConfig config
+  )
+  {
+    return makeQueryRunnerFactory(
+        DEFAULT_MAPPER,
+        config,
+        new TestGroupByBuffers(
+            DEFAULT_PROCESSING_CONFIG.intermediateComputeSizeBytes(),
+            DEFAULT_PROCESSING_CONFIG.getNumMergeBuffers()
+        ),
+        DEFAULT_PROCESSING_CONFIG
+    );
+  }
   public static GroupByQueryRunnerFactory makeQueryRunnerFactory(
       final GroupByQueryConfig config,
       final TestGroupByBuffers bufferPools
diff --git a/server/src/main/java/org/apache/druid/server/QueryScheduler.java 
b/server/src/main/java/org/apache/druid/server/QueryScheduler.java
index 762647aa86..fb44af0e7b 100644
--- a/server/src/main/java/org/apache/druid/server/QueryScheduler.java
+++ b/server/src/main/java/org/apache/druid/server/QueryScheduler.java
@@ -32,6 +32,8 @@ import org.apache.druid.client.SegmentServerSelector;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.guava.LazySequence;
 import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.SequenceWrapper;
+import org.apache.druid.java.util.common.guava.Sequences;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.emitter.core.NoopEmitter;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
@@ -185,8 +187,23 @@ public class QueryScheduler implements QueryWatcher
    */
   public <T> Sequence<T> run(Query<?> query, Sequence<T> resultSequence)
   {
-    List<Bulkhead> bulkheads = acquireLanes(query);
-    return resultSequence.withBaggage(() -> finishLanes(bulkheads));
+    return Sequences.wrap(resultSequence, new SequenceWrapper()
+    {
+      private List<Bulkhead> bulkheads = null;
+      @Override
+      public void before()
+      {
+        bulkheads = acquireLanes(query);
+      }
+
+      @Override
+      public void after(boolean isDone, Throwable thrown)
+      {
+        if (bulkheads != null) {
+          finishLanes(bulkheads);
+        }
+      }
+    });
   }
 
   /**
diff --git 
a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java 
b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java
index 35dbc63444..34c0a44a5c 100644
--- a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java
+++ b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java
@@ -896,7 +896,7 @@ public class QueryResourceTest
     assertAsyncResponseAndCountdownOrBlockForever(
         SIMPLE_TIMESERIES_QUERY,
         waitAllFinished,
-        response -> Assert.assertEquals(Response.Status.OK.getStatusCode(), 
response.getStatus())
+        response -> Assert.assertEquals(Status.OK.getStatusCode(), 
response.getStatus())
     );
     waitTwoScheduled.await();
     assertSynchronousResponseAndCountdownOrBlockForever(
@@ -1043,19 +1043,21 @@ public class QueryResourceTest
         return (queryPlus, responseContext) -> {
           beforeScheduler.forEach(CountDownLatch::countDown);
 
-          return scheduler.run(
-              scheduler.prioritizeAndLaneQuery(queryPlus, ImmutableSet.of()),
-              new LazySequence<T>(() -> {
-                inScheduler.forEach(CountDownLatch::countDown);
-                try {
-                  // pretend to be a query that is waiting on results
-                  Thread.sleep(500);
-                }
-                catch (InterruptedException ignored) {
-                }
-                // all that waiting for nothing :(
-                return Sequences.empty();
-              })
+          return Sequences.simple(
+              scheduler.run(
+                  scheduler.prioritizeAndLaneQuery(queryPlus, 
ImmutableSet.of()),
+                  new LazySequence<T>(() -> {
+                    inScheduler.forEach(CountDownLatch::countDown);
+                    try {
+                      // pretend to be a query that is waiting on results
+                      Thread.sleep(500);
+                    }
+                    catch (InterruptedException ignored) {
+                    }
+                    // all that waiting for nothing :(
+                    return Sequences.empty();
+                  })
+              ).toList()
           );
         };
       }
diff --git 
a/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java 
b/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java
index 571684cacc..1f3acac8fc 100644
--- a/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java
+++ b/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java
@@ -37,6 +37,7 @@ import org.apache.druid.guice.JsonConfigurator;
 import org.apache.druid.guice.annotations.Global;
 import org.apache.druid.guice.annotations.Json;
 import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.guava.BaseSequence;
 import org.apache.druid.java.util.common.guava.LazySequence;
 import org.apache.druid.java.util.common.guava.Sequence;
@@ -46,10 +47,20 @@ import org.apache.druid.java.util.common.guava.Yielder;
 import org.apache.druid.java.util.common.guava.Yielders;
 import org.apache.druid.java.util.emitter.core.NoopEmitter;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.query.FluentQueryRunnerBuilder;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryCapacityExceededException;
 import org.apache.druid.query.QueryPlus;
+import org.apache.druid.query.QueryRunnerFactory;
+import org.apache.druid.query.QueryToolChest;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.groupby.GroupByQuery;
+import org.apache.druid.query.groupby.GroupByQueryConfig;
+import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
+import org.apache.druid.query.groupby.ResultRow;
+import org.apache.druid.query.groupby.having.HavingSpec;
+import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
 import org.apache.druid.query.topn.TopNQuery;
 import org.apache.druid.query.topn.TopNQueryBuilder;
 import org.apache.druid.server.initialization.ServerConfig;
@@ -60,9 +71,7 @@ import org.easymock.EasyMock;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -81,9 +90,6 @@ public class QuerySchedulerTest
   private static final int TEST_HI_CAPACITY = 5;
   private static final int TEST_LO_CAPACITY = 2;
 
-  @Rule
-  public ExpectedException expected = ExpectedException.none();
-
   private ListeningExecutorService executorService;
   private ObservableQueryScheduler scheduler;
 
@@ -176,10 +182,8 @@ public class QuerySchedulerTest
   }
 
   @Test
-  public void testHiLoReleaseLaneWhenSequenceExplodes() throws Exception
+  public void testHiLoReleaseLaneWhenSequenceExplodes()
   {
-    expected.expectMessage("exploded");
-    expected.expect(ExecutionException.class);
     TopNQuery interactive = makeInteractiveQuery();
     ListenableFuture<?> future = executorService.submit(() -> {
       try {
@@ -204,71 +208,91 @@ public class QuerySchedulerTest
         throw new RuntimeException(ex);
       }
     });
-    future.get();
+    Throwable t = Assert.assertThrows(ExecutionException.class, future::get);
+    Assert.assertEquals("java.lang.RuntimeException: exploded", 
t.getMessage());
+    Assert.assertEquals(5, scheduler.getTotalAvailableCapacity());
   }
 
   @Test
   public void testHiLoFailsWhenOutOfLaneCapacity()
   {
-    expected.expectMessage(
-        
QueryCapacityExceededException.makeLaneErrorMessage(HiLoQueryLaningStrategy.LOW,
 TEST_LO_CAPACITY)
-    );
-    expected.expect(QueryCapacityExceededException.class);
-
     Query<?> report1 = 
scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(makeReportQuery()), 
ImmutableSet.of());
-    scheduler.run(report1, Sequences.empty());
+    Sequence<?> sequence = scheduler.run(report1, Sequences.empty());
+    // making the sequence doesn't count, only running it does
+    Assert.assertEquals(5, scheduler.getTotalAvailableCapacity());
+    // this counts though since we are doing stuff
+    Yielders.each(sequence);
     Assert.assertNotNull(report1);
     Assert.assertEquals(4, scheduler.getTotalAvailableCapacity());
     Assert.assertEquals(1, 
scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW));
 
     Query<?> report2 = 
scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(makeReportQuery()), 
ImmutableSet.of());
-    scheduler.run(report2, Sequences.empty());
+    Yielders.each(scheduler.run(report2, Sequences.empty()));
     Assert.assertNotNull(report2);
     Assert.assertEquals(3, scheduler.getTotalAvailableCapacity());
     Assert.assertEquals(0, 
scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW));
 
     // too many reports
-    scheduler.run(
-        scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(makeReportQuery()), 
ImmutableSet.of()), Sequences.empty()
+    Throwable t = Assert.assertThrows(
+        QueryCapacityExceededException.class,
+        () -> Yielders.each(
+            scheduler.run(
+                
scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(makeReportQuery()), 
ImmutableSet.of()),
+                Sequences.empty()
+            )
+        )
+    );
+    Assert.assertEquals(
+        "Too many concurrent queries for lane 'low', query capacity of 2 
exceeded. Please try your query again later.",
+        t.getMessage()
     );
   }
 
   @Test
   public void testHiLoFailsWhenOutOfTotalCapacity()
   {
-    
expected.expectMessage(QueryCapacityExceededException.makeTotalErrorMessage(TEST_HI_CAPACITY));
-    expected.expect(QueryCapacityExceededException.class);
-
     Query<?> interactive1 = 
scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(makeInteractiveQuery()), 
ImmutableSet.of());
-    scheduler.run(interactive1, Sequences.empty());
+    Sequence<?> sequence = scheduler.run(interactive1, Sequences.empty());
+    // making the sequence doesn't count, only running it does
+    Assert.assertEquals(5, scheduler.getTotalAvailableCapacity());
+    // this counts tho
+    Yielders.each(sequence);
     Assert.assertNotNull(interactive1);
     Assert.assertEquals(4, scheduler.getTotalAvailableCapacity());
 
     Query<?> report1 = 
scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(makeReportQuery()), 
ImmutableSet.of());
-    scheduler.run(report1, Sequences.empty());
+    Yielders.each(scheduler.run(report1, Sequences.empty()));
     Assert.assertNotNull(report1);
     Assert.assertEquals(3, scheduler.getTotalAvailableCapacity());
     Assert.assertEquals(1, 
scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW));
 
     Query<?> interactive2 = 
scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(makeInteractiveQuery()), 
ImmutableSet.of());
-    scheduler.run(interactive2, Sequences.empty());
+    Yielders.each(scheduler.run(interactive2, Sequences.empty()));
     Assert.assertNotNull(interactive2);
     Assert.assertEquals(2, scheduler.getTotalAvailableCapacity());
 
     Query<?> report2 = 
scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(makeReportQuery()), 
ImmutableSet.of());
-    scheduler.run(report2, Sequences.empty());
+    Yielders.each(scheduler.run(report2, Sequences.empty()));
     Assert.assertNotNull(report2);
     Assert.assertEquals(1, scheduler.getTotalAvailableCapacity());
     Assert.assertEquals(0, 
scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW));
 
     Query<?> interactive3 = 
scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(makeInteractiveQuery()), 
ImmutableSet.of());
-    scheduler.run(interactive3, Sequences.empty());
+    Yielders.each(scheduler.run(interactive3, Sequences.empty()));
     Assert.assertNotNull(interactive3);
     Assert.assertEquals(0, scheduler.getTotalAvailableCapacity());
 
     // one too many
-    scheduler.run(
-        
scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(makeInteractiveQuery()), 
ImmutableSet.of()), Sequences.empty()
+    Throwable t = Assert.assertThrows(
+        QueryCapacityExceededException.class,
+        () -> Yielders.each(scheduler.run(
+            
scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(makeInteractiveQuery()), 
ImmutableSet.of()),
+            Sequences.empty()
+        ))
+    );
+    Assert.assertEquals(
+        "Too many concurrent queries, total query capacity of 5 exceeded. 
Please try your query again later.",
+        t.getMessage()
     );
   }
 
@@ -324,6 +348,74 @@ public class QuerySchedulerTest
     getFuturesAndAssertAftermathIsChill(futures, scheduler, true, true);
   }
 
+  @Test
+  public void testExplodingWrapperDoesNotLeakLocks()
+  {
+    scheduler = new ObservableQueryScheduler(
+        5,
+        ManualQueryPrioritizationStrategy.INSTANCE,
+        new NoQueryLaningStrategy(),
+        new ServerConfig()
+    );
+
+    QueryRunnerFactory factory = GroupByQueryRunnerTest.makeQueryRunnerFactory(
+        new GroupByQueryConfig()
+        {
+          @Override
+          public String getDefaultStrategy()
+          {
+            return GroupByStrategySelector.STRATEGY_V2;
+          }
+
+          @Override
+          public String toString()
+          {
+            return "v2";
+          }
+        }
+    );
+    Future<?> f = makeMergingQueryFuture(
+        executorService,
+        scheduler,
+        GroupByQuery.builder()
+                    .setDataSource("foo")
+                    .setInterval("2020-01-01/2020-01-02")
+                    .setDimensions(DefaultDimensionSpec.of("bar"))
+                    .setAggregatorSpecs(new CountAggregatorFactory("chocula"))
+                    .setGranularity(Granularities.ALL)
+                    .setHavingSpec(
+                        new HavingSpec()
+                        {
+                          @Override
+                          public void setQuery(GroupByQuery query)
+                          {
+                            throw new RuntimeException("exploded");
+                          }
+
+                          @Override
+                          public boolean eval(ResultRow row)
+                          {
+                            return false;
+                          }
+
+                          @Override
+                          public byte[] getCacheKey()
+                          {
+                            return new byte[0];
+                          }
+                        }
+                    )
+                    .build(),
+        factory.getToolchest(),
+        NUM_ROWS
+    );
+
+    Assert.assertEquals(5, scheduler.getTotalAvailableCapacity());
+    Throwable t = Assert.assertThrows(Throwable.class, f::get);
+    Assert.assertEquals("java.lang.RuntimeException: exploded", 
t.getMessage());
+    Assert.assertEquals(5, scheduler.getTotalAvailableCapacity());
+  }
+
   @Test
   public void testConfigNone()
   {
@@ -367,7 +459,6 @@ public class QuerySchedulerTest
   @Test
   public void testMisConfigHiLo()
   {
-    expected.expect(ProvisionException.class);
     final Injector injector = createInjector();
     final String propertyPrefix = "druid.query.scheduler";
     final JsonConfigProvider<QuerySchedulerProvider> provider = 
JsonConfigProvider.of(
@@ -377,9 +468,16 @@ public class QuerySchedulerTest
     final Properties properties = new Properties();
     properties.setProperty(propertyPrefix + ".laning.strategy", "hilo");
     provider.inject(properties, injector.getInstance(JsonConfigurator.class));
-    final QueryScheduler scheduler = provider.get().get().get();
-    Assert.assertEquals(10, scheduler.getTotalAvailableCapacity());
-    Assert.assertEquals(2, 
scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW));
+    Throwable t = Assert.assertThrows(ProvisionException.class, () -> 
provider.get().get().get());
+    Assert.assertEquals(
+        "Unable to provision, see the following errors:\n"
+        + "\n"
+        + "1) Problem parsing object at prefix[druid.query.scheduler]: Cannot 
construct instance of 
`org.apache.druid.server.scheduling.HiLoQueryLaningStrategy`, problem: 
maxLowPercent must be set\n"
+        + " at [Source: UNKNOWN; line: -1, column: -1] (through reference 
chain: org.apache.druid.server.QuerySchedulerProvider[\"laning\"]).\n"
+        + "\n"
+        + "1 error",
+        t.getMessage()
+    );
   }
 
   @Test
@@ -418,7 +516,6 @@ public class QuerySchedulerTest
   @Test
   public void testMisConfigThreshold()
   {
-    expected.expect(ProvisionException.class);
     final Injector injector = createInjector();
     final String propertyPrefix = "druid.query.scheduler";
     final JsonConfigProvider<QuerySchedulerProvider> provider = 
JsonConfigProvider.of(
@@ -428,9 +525,16 @@ public class QuerySchedulerTest
     final Properties properties = new Properties();
     properties.setProperty(propertyPrefix + ".prioritization.strategy", 
"threshold");
     provider.inject(properties, injector.getInstance(JsonConfigurator.class));
-    final QueryScheduler scheduler = provider.get().get().get();
-    Assert.assertEquals(10, scheduler.getTotalAvailableCapacity());
-    Assert.assertEquals(2, 
scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW));
+    Throwable t = Assert.assertThrows(ProvisionException.class, () -> 
provider.get().get().get());
+    Assert.assertEquals(
+        "Unable to provision, see the following errors:\n"
+        + "\n"
+        + "1) Problem parsing object at prefix[druid.query.scheduler]: Cannot 
construct instance of 
`org.apache.druid.server.scheduling.ThresholdBasedQueryPrioritizationStrategy`, 
problem: periodThreshold, durationThreshold, or segmentCountThreshold must be 
set\n"
+        + " at [Source: UNKNOWN; line: -1, column: -1] (through reference 
chain: org.apache.druid.server.QuerySchedulerProvider[\"prioritization\"]).\n"
+        + "\n"
+        + "1 error",
+        t.getMessage()
+    );
   }
 
 
@@ -497,6 +601,7 @@ public class QuerySchedulerTest
         .context(ImmutableMap.of("queryId", "default-" + UUID.randomUUID()))
         .build();
   }
+
   private TopNQuery makeInteractiveQuery()
   {
     return makeBaseBuilder()
@@ -638,6 +743,36 @@ public class QuerySchedulerTest
     });
   }
 
+  private ListenableFuture<?> makeMergingQueryFuture(
+      ListeningExecutorService executorService,
+      QueryScheduler scheduler,
+      Query<?> query,
+      QueryToolChest toolChest,
+      int numRows
+  )
+  {
+    return executorService.submit(() -> {
+      try {
+        Query<?> scheduled = 
scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(query), ImmutableSet.of());
+
+        Assert.assertNotNull(scheduled);
+
+        FluentQueryRunnerBuilder fluentQueryRunnerBuilder = new 
FluentQueryRunnerBuilder(toolChest);
+        FluentQueryRunnerBuilder.FluentQueryRunner runner = 
fluentQueryRunnerBuilder.create((queryPlus, responseContext) -> {
+          Sequence<Integer> underlyingSequence = makeSequence(numRows);
+          Sequence<Integer> results = scheduler.run(scheduled, 
underlyingSequence);
+          return results;
+        });
+
+        final int actualNumRows = 
consumeAndCloseSequence(runner.mergeResults().run(QueryPlus.wrap(query)));
+        Assert.assertEquals(actualNumRows, numRows);
+      }
+      catch (IOException ex) {
+        throw new RuntimeException(ex);
+      }
+    });
+  }
+
 
   private void getFuturesAndAssertAftermathIsChill(
       List<Future<?>> futures,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to