gianm commented on code in PR #16372:
URL: https://github.com/apache/druid/pull/16372#discussion_r1591433218


##########
processing/src/main/java/org/apache/druid/query/groupby/GroupByResourcesReservationPool.java:
##########
@@ -125,17 +146,22 @@ public void reserve(QueryResourceId queryResourceId, 
GroupByQuery groupByQuery,
   @Nullable
   public GroupByQueryResources fetch(QueryResourceId queryResourceId)
   {
-    return pool.get(queryResourceId);
+    GroupByQueryResources resource = pool.get(queryResourceId).get();
+    assert resource != null;
+    return resource;
   }
 
   /**
    * Removes the entry corresponding to the unique id from the map, and cleans 
up the resources.
    */
   public void clean(QueryResourceId queryResourceId)
   {
-    GroupByQueryResources resources = pool.remove(queryResourceId);
-    if (resources != null) {
-      resources.close();
+    AtomicReference<GroupByQueryResources> resourcesReference = 
pool.remove(queryResourceId);
+    if (resourcesReference != null) {
+      GroupByQueryResources resource = resourcesReference.get();
+      // Reference should refer to a non-empty resource
+      assert resource != null;

Review Comment:
   Better to use `DruidException.defensive` than `assert`.



##########
processing/src/main/java/org/apache/druid/query/groupby/GroupByResourcesReservationPool.java:
##########
@@ -104,19 +109,35 @@ public GroupByResourcesReservationPool(
   }
 
   /**
-   * Reserves appropriate resources, and maps it to the queryResourceId 
(usually the query's resource id) in the internal map
+   * Reserves appropriate resources, and maps it to the queryResourceId 
(usually the query's resource id) in the internal map.
+   * This is a blocking call, and can block up to the given query's timeout
    */
   public void reserve(QueryResourceId queryResourceId, GroupByQuery 
groupByQuery, boolean willMergeRunner)
   {
     if (queryResourceId == null) {
       throw DruidException.defensive("Query resource id must be populated");
     }
-    pool.compute(queryResourceId, (id, existingResource) -> {
-      if (existingResource != null) {
-        throw DruidException.defensive("Resource with the given identifier 
[%s] is already present", id);
-      }
-      return GroupingEngine.prepareResource(groupByQuery, mergeBufferPool, 
willMergeRunner, groupByQueryConfig);
-    });
+
+    // First check if the query resource id is present in the map, and if not, 
populate a dummy reference. This will
+    // block other threads from populating the map with the same query id, and 
is essentially same as reserving a spot in
+    // the map for the given query id. Since the actual allocation of the 
resource might take longer than expected, we
+    // do it out of the critical section, once we have "reserved" the spot
+    AtomicReference<GroupByQueryResources> reference = new 
AtomicReference<>(null);
+    AtomicReference<GroupByQueryResources> existingResource = 
pool.putIfAbsent(queryResourceId, reference);
+
+    // Multiple attempts made to allocate the query resource for a given 
resource id. Throw an exception
+    //noinspection VariableNotUsedInsideIf
+    if (existingResource != null) {
+      throw DruidException.defensive("Resource with the given identifier [%s] 
is already present", queryResourceId);
+    }
+
+    // We have reserved a spot in the map. Now begin the blocking call.
+    GroupByQueryResources resources =

Review Comment:
   What happens if `prepareResource` fails, for example due to timeout? Will 
the reference be cleaned up from the map somehow?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to