This is an automated email from the ASF dual-hosted git repository.

adarshsanjeev pushed a commit to branch 30.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/30.0.0 by this push:
     new 305abae8b19 Fix another deadlock which can occur while acquiring merge 
buffers (#16372) (#16427)
305abae8b19 is described below

commit 305abae8b198cd10f8f718d6da053348e3e5ac89
Author: Adarsh Sanjeev <[email protected]>
AuthorDate: Fri May 10 15:15:38 2024 +0530

    Fix another deadlock which can occur while acquiring merge buffers (#16372) 
(#16427)
    
    Fixes a deadlock while acquiring merge buffers
    
    Co-authored-by: Laksh Singla <[email protected]>
---
 .../groupby/GroupByResourcesReservationPool.java   |  76 +++++--
 .../apache/druid/query/groupby/GroupingEngine.java |   2 +
 .../GroupByResourcesReservationPoolTest.java       | 227 +++++++++++++++++++++
 3 files changed, 291 insertions(+), 14 deletions(-)

diff --git 
a/processing/src/main/java/org/apache/druid/query/groupby/GroupByResourcesReservationPool.java
 
b/processing/src/main/java/org/apache/druid/query/groupby/GroupByResourcesReservationPool.java
index 49a698b42ae..9c65ce445b1 100644
--- 
a/processing/src/main/java/org/apache/druid/query/groupby/GroupByResourcesReservationPool.java
+++ 
b/processing/src/main/java/org/apache/druid/query/groupby/GroupByResourcesReservationPool.java
@@ -28,6 +28,7 @@ import javax.annotation.Nullable;
 import javax.inject.Inject;
 import java.nio.ByteBuffer;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Reserves the {@link GroupByQueryResources} for a given group by query and 
maps them to the query's resource ID.
@@ -67,21 +68,25 @@ import java.util.concurrent.ConcurrentHashMap;
  * nested ones execute via an unoptimized way.
  * 3. There's some knowledge to the mergeResults that the query runner passed 
to it is the one created by the corresponding toolchest's
  * mergeRunners (which is the typical use case). This is encoded in the 
argument {@code willMergeRunner}, and is to be set by the callers.
- * The only production use case where this isn't true is when the broker is 
merging the results gathered from the historical)
+ * The only production use case where this isn't true is when the broker is 
merging the results gathered from the historical
  * <p>
  * TESTING
  * Unit tests mimic the broker-historical interaction in many places, which 
can lead to the code not working as intended because the assumptions don't hold.
  * In many test cases, there are two nested mergeResults calls, the outer call 
mimics what the broker does, while the inner one mimics what the historical 
does,
  * and the assumption (1) fails. Therefore, the testing code should assign a 
unique resource id b/w each mergeResults call, and also make sure that the top 
level mergeResults
  * would have willMergeRunner = false, since it's being called on top of a 
mergeResults's runner, while the inner one would have willMergeRunner = true 
because its being
- * called on actual runners (as it happens in the brokers, and the historicals)
+ * called on actual runners (as it happens in the brokers, and the 
historicals).
+ * <p>
+ * There is a test in GroupByResourcesReservationPoolTest that checks for 
deadlocks when the operations are interleaved in a
+ * certain maanner. It is ignored because it sleeps and can increase time when 
the test suite is run. Developers making any changes
+ * to this class, or a related class should manually verify that all the tests 
in the test class are running as expected.
  */
 public class GroupByResourcesReservationPool
 {
   /**
    * Map of query's resource id -> group by resources reserved for the query 
to execute
    */
-  final ConcurrentHashMap<QueryResourceId, GroupByQueryResources> pool = new 
ConcurrentHashMap<>();
+  final ConcurrentHashMap<QueryResourceId, 
AtomicReference<GroupByQueryResources>> pool = new ConcurrentHashMap<>();
 
   /**
    * Buffer pool from where the merge buffers are picked and reserved
@@ -104,19 +109,42 @@ public class GroupByResourcesReservationPool
   }
 
   /**
-   * Reserves appropriate resources, and maps it to the queryResourceId 
(usually the query's resource id) in the internal map
+   * Reserves appropriate resources, and maps it to the queryResourceId 
(usually the query's resource id) in the internal map.
+   * This is a blocking call, and can block up to the given query's timeout
    */
   public void reserve(QueryResourceId queryResourceId, GroupByQuery 
groupByQuery, boolean willMergeRunner)
   {
     if (queryResourceId == null) {
       throw DruidException.defensive("Query resource id must be populated");
     }
-    pool.compute(queryResourceId, (id, existingResource) -> {
-      if (existingResource != null) {
-        throw DruidException.defensive("Resource with the given identifier 
[%s] is already present", id);
-      }
-      return GroupingEngine.prepareResource(groupByQuery, mergeBufferPool, 
willMergeRunner, groupByQueryConfig);
-    });
+
+    // First check if the query resource id is present in the map, and if not, 
populate a dummy reference. This will
+    // block other threads from populating the map with the same query id, and 
is essentially same as reserving a spot in
+    // the map for the given query id. Since the actual allocation of the 
resource might take longer than expected, we
+    // do it out of the critical section, once we have "reserved" the spot
+    AtomicReference<GroupByQueryResources> reference = new 
AtomicReference<>(null);
+    AtomicReference<GroupByQueryResources> existingResource = 
pool.putIfAbsent(queryResourceId, reference);
+
+    // Multiple attempts made to allocate the query resource for a given 
resource id. Throw an exception
+    //noinspection VariableNotUsedInsideIf
+    if (existingResource != null) {
+      throw DruidException.defensive("Resource with the given identifier [%s] 
is already present", queryResourceId);
+    }
+
+    GroupByQueryResources resources;
+    try {
+      // We have reserved a spot in the map. Now begin the blocking call.
+      resources = GroupingEngine.prepareResource(groupByQuery, 
mergeBufferPool, willMergeRunner, groupByQueryConfig);
+    }
+    catch (Throwable t) {
+      // Unable to allocate the resources, perform cleanup and rethrow the 
exception
+      pool.remove(queryResourceId);
+      throw t;
+    }
+
+    // Resources have been allocated, spot has been reserved. The reference 
would ALWAYS refer to 'null'. Refer the
+    // allocated resources from it
+    reference.compareAndSet(null, resources);
   }
 
   /**
@@ -125,7 +153,19 @@ public class GroupByResourcesReservationPool
   @Nullable
   public GroupByQueryResources fetch(QueryResourceId queryResourceId)
   {
-    return pool.get(queryResourceId);
+    AtomicReference<GroupByQueryResources> resourcesReference = 
pool.get(queryResourceId);
+    if (resourcesReference == null) {
+      // There weren't any resources allocated corresponding to the provided 
resource id
+      return null;
+    }
+    GroupByQueryResources resource = resourcesReference.get();
+    if (resource == null) {
+      throw DruidException.defensive(
+          "Query id [%s] had a non-null reference in the resource reservation 
pool, but no resources were found",
+          queryResourceId
+      );
+    }
+    return resource;
   }
 
   /**
@@ -133,9 +173,17 @@ public class GroupByResourcesReservationPool
    */
   public void clean(QueryResourceId queryResourceId)
   {
-    GroupByQueryResources resources = pool.remove(queryResourceId);
-    if (resources != null) {
-      resources.close();
+    AtomicReference<GroupByQueryResources> resourcesReference = 
pool.remove(queryResourceId);
+    if (resourcesReference != null) {
+      GroupByQueryResources resource = resourcesReference.get();
+      // Reference should refer to a non-empty resource
+      if (resource == null) {
+        throw DruidException.defensive(
+            "Query id [%s] had a non-null reference in the resource 
reservation pool, but no resources were found",
+            queryResourceId
+        );
+      }
+      resource.close();
     }
   }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java 
b/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java
index 5fa26d3fa2b..6451fb9b943 100644
--- 
a/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java
+++ 
b/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java
@@ -142,6 +142,8 @@ public class GroupingEngine
    * {@link GroupByMergingQueryRunner} for a particular query. The resources 
are to be acquired once throughout the
    * execution of the query, or need to be re-acquired (if needed). Users must 
ensure that throughout the execution,
    * a query already holding the resources shouldn't request for more 
resources, because that can cause deadlocks.
+   * <p>
+   * This method throws an exception if it is not able to allocate sufficient 
resources required for the query to succeed
    */
   public static GroupByQueryResources prepareResource(
       GroupByQuery query,
diff --git 
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByResourcesReservationPoolTest.java
 
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByResourcesReservationPoolTest.java
new file mode 100644
index 00000000000..8d0f2d9e37d
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByResourcesReservationPoolTest.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.collections.BlockingPool;
+import org.apache.druid.collections.DefaultBlockingPool;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.query.QueryResourceId;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+
+public class GroupByResourcesReservationPoolTest
+{
+
+  /**
+   * CONFIG + QUERY require exactly 1 merge buffer to succeed if 
'willMergeRunners' is true while allocating the resources
+   */
+  private static final GroupByQueryConfig CONFIG = new GroupByQueryConfig();
+  private static final GroupByQuery QUERY = GroupByQuery.builder()
+                                                        
.setInterval(Intervals.ETERNITY)
+                                                        .setDataSource("foo")
+                                                        .setDimensions(
+                                                            ImmutableList.of(
+                                                                new 
DefaultDimensionSpec("dim2", "_d0")
+                                                            )
+                                                        )
+                                                        
.setGranularity(Granularities.ALL)
+                                                        .setContext(
+                                                            
ImmutableMap.of("timeout", 0)
+                                                        ) // Query can block 
indefinitely
+                                                        .build();
+
+  /**
+   * This test confirms that the interleaved 
GroupByResourcesReservationPool.reserve() and 
GroupByResourcesReservationPool.clean()
+   * between multiple threads succeed. It is specifically designed to test the 
case when the operations are interleaved in the
+   * following manner:
+   * <p>
+   * THREAD1                      THREAD2
+   * pool.reserve(query1)
+   *                              pool.reserve(query2)
+   * pool.clean(query1)
+   * <p>
+   * This test assumes a few things about the implementation of the 
interfaces, which are laid out in the comments.
+   * <p>
+   * The test should complete under 10 seconds, and the majority of the time 
would be consumed by waiting for the thread
+   * that sleeps for 5 seconds
+   */
+  @Ignore(
+      "Isn't run as a part of CI since it sleeps for 5 seconds. Callers must 
run the test manually if any changes are made "
+      + "to the corresponding class"
+  )
+  @Test(timeout = 100_000L)
+  public void testInterleavedReserveAndRemove()
+  {
+    ExecutorService executor = Execs.multiThreaded(3, 
"group-by-resources-reservation-pool-test-%d");
+
+    // Sanity checks that the query will acquire exactly one merge buffer. 
This safeguards the test being useful in
+    // case the merge buffer acquisition code changes to acquire less than one 
merge buffer (the test would be
+    // useless in that case) or more than one merge buffer (the test would 
incorrectly fail in that case)
+    Assert.assertEquals(
+        1,
+        
GroupByQueryResources.countRequiredMergeBufferNumForMergingQueryRunner(CONFIG, 
QUERY)
+        + 
GroupByQueryResources.countRequiredMergeBufferNumForToolchestMerge(QUERY)
+    );
+
+    // Blocking pool with a single buffer, which means only one of the queries 
can succeed at a time
+    BlockingPool<ByteBuffer> mergeBufferPool = new DefaultBlockingPool<>(() -> 
ByteBuffer.allocate(100), 1);
+    GroupByResourcesReservationPool groupByResourcesReservationPool =
+        new GroupByResourcesReservationPool(mergeBufferPool, CONFIG);
+
+    // Latch indicating that the first thread has called 
reservationPool.reserve()
+    CountDownLatch reserveCalledByFirstThread = new CountDownLatch(1);
+    // Latch indicating that the second thread has called 
reservationPool.reserve()
+    CountDownLatch reserveCalledBySecondThread = new CountDownLatch(1);
+    // Latch indicating that all the threads have been completed successfully. 
Main thread waits on this latch before exiting
+    CountDownLatch threadsCompleted = new CountDownLatch(2);
+
+    // THREAD 1
+    executor.submit(() -> {
+
+      QueryResourceId queryResourceId1 = new QueryResourceId("test-id-1")
+      {
+        @Override
+        public int hashCode()
+        {
+          // IMPORTANT ASSUMPTION: For the test to be useful, it assumes that 
under the hood we are using a
+          // ConcurrentHashMap<QueryResourceId, GroupByResources> (or a 
concurrent map with similar implementation) that
+          // implements granular locking of the nodes
+          // The hashCode of the queryResourceId used in Thread1 and Thread2 
is the same. Therefore, both the queryIds
+          // would be guarded by the same lock
+          return 10;
+        }
+
+        @Override
+        public boolean equals(Object o)
+        {
+          return super.equals(o);
+        }
+      };
+      groupByResourcesReservationPool.reserve(queryResourceId1, QUERY, true);
+      reserveCalledByFirstThread.countDown();
+      try {
+        reserveCalledBySecondThread.await();
+      }
+      catch (InterruptedException e) {
+        Assert.fail("Interrupted while waiting for second reserve call to be 
made");
+      }
+      groupByResourcesReservationPool.clean(queryResourceId1);
+      threadsCompleted.countDown();
+    });
+
+    // THREAD 2
+    executor.submit(() -> {
+      try {
+        reserveCalledByFirstThread.await();
+      }
+      catch (InterruptedException e) {
+        Assert.fail("Interrupted while waiting for first reserve call to be 
made");
+      }
+
+      QueryResourceId queryResourceId2 = new QueryResourceId("test-id-2")
+      {
+        @Override
+        public int hashCode()
+        {
+          return 10;
+        }
+
+        @Override
+        public boolean equals(Object o)
+        {
+          return super.equals(o);
+        }
+      };
+
+      // Since the reserve() call is blocking, we need to execute it 
separately, so that we can count down the latch
+      // and inform Thread1 the reserve call has been made by this thread
+      executor.submit(() -> {
+        groupByResourcesReservationPool.reserve(queryResourceId2, QUERY, true);
+        threadsCompleted.countDown();
+      });
+      try {
+        // This sleep call "ensures" that the statment 
pool.reserve(queryResourceId2) is called before we release the
+        // latch (that will cause Thread1 to release the acquired resources). 
It still doesn't guarantee the previous
+        // statement, however that's the best we can do, given that reserve() 
is blocking
+        Thread.sleep(5_000);
+      }
+      catch (InterruptedException e) {
+        Assert.fail("Interrupted while sleeping");
+      }
+      reserveCalledBySecondThread.countDown();
+    });
+
+    try {
+      threadsCompleted.await();
+    }
+    catch (InterruptedException e) {
+      Assert.fail("Interrupted while waiting for the threads to complete");
+    }
+  }
+
+  @Test
+  public void testMultipleSimultaneousAllocationAttemptsFail()
+  {
+    BlockingPool<ByteBuffer> mergeBufferPool = new DefaultBlockingPool<>(() -> 
ByteBuffer.allocate(100), 1);
+    GroupByResourcesReservationPool groupByResourcesReservationPool =
+        new GroupByResourcesReservationPool(mergeBufferPool, CONFIG);
+    QueryResourceId queryResourceId = new QueryResourceId("test-id");
+
+    groupByResourcesReservationPool.reserve(queryResourceId, QUERY, true);
+
+    Assert.assertThrows(
+        DruidException.class,
+        () -> groupByResourcesReservationPool.reserve(queryResourceId, QUERY, 
true)
+    );
+  }
+
+  @Test
+  public void testMultipleSequentialAllocationAttemptsSucceed()
+  {
+    BlockingPool<ByteBuffer> mergeBufferPool = new DefaultBlockingPool<>(() -> 
ByteBuffer.allocate(100), 1);
+    GroupByResourcesReservationPool groupByResourcesReservationPool =
+        new GroupByResourcesReservationPool(mergeBufferPool, CONFIG);
+    QueryResourceId queryResourceId = new QueryResourceId("test-id");
+
+    groupByResourcesReservationPool.reserve(queryResourceId, QUERY, true);
+    GroupByQueryResources oldResources = 
groupByResourcesReservationPool.fetch(queryResourceId);
+
+    // Cleanup the resources
+    groupByResourcesReservationPool.clean(queryResourceId);
+
+    // Repeat the calls
+    groupByResourcesReservationPool.reserve(queryResourceId, QUERY, true);
+    GroupByQueryResources newResources = 
groupByResourcesReservationPool.fetch(queryResourceId);
+    Assert.assertNotNull(newResources);
+
+    Assert.assertNotSame(oldResources, newResources);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to