chenyuzhi459 commented on a change in pull request #10027:
URL: https://github.com/apache/druid/pull/10027#discussion_r446193150



##########
File path: 
processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java
##########
@@ -141,33 +144,34 @@ public ChainedExecutionQueryRunner(
                           );
                         }
                     )
-                )
-            );
+                );
 
-            queryWatcher.registerQueryFuture(query, futures);
+            ListenableFuture<List<Iterable<T>>> future = 
Futures.allAsList(futures);
+            queryWatcher.registerQueryFuture(query, future);
 
             try {
               return new MergeIterable<>(
                   ordering.nullsFirst(),
                   QueryContexts.hasTimeout(query) ?
-                      futures.get(QueryContexts.getTimeout(query), 
TimeUnit.MILLISECONDS) :
-                      futures.get()
+                      future.get(QueryContexts.getTimeout(query), 
TimeUnit.MILLISECONDS) :
+                      future.get()
               ).iterator();
             }
             catch (InterruptedException e) {
               log.noStackTrace().warn(e, "Query interrupted, cancelling 
pending results, query id [%s]", query.getId());
-              futures.cancel(true);
+              GuavaUtils.cancelAll(true, 
ImmutableList.<Future>builder().add(future).addAll(futures).build());

Review comment:
       > It seems easy to forget canceling `future` and so error-prone. How 
about modifying `GuavaUtils.cancelAll()` to take `future` as well? So it would 
be like
   > 
   > ```java
   >   public static <F extends Future<?>> void cancelAll(
   >       boolean mayInterruptIfRunning,
   >       @Nullable ListenableFuture<?> combinedFuture,
   >       List<F> futures
   >   )
   >   {
   >     final List<Future> allFuturesToCancel = new ArrayList<>(futures);
   >     allFuturesToCancel.add(combinedFuture);
   >     if (allFuturesToCancel.isEmpty()) {
   >       return;
   >     }
   >     allFuturesToCancel.forEach(f -> {
   >       try {
   >         f.cancel(mayInterruptIfRunning);
   >       }
   >       catch (Throwable t) {
   >         log.warn(t, "Error while cancelling future.");
   >       }
   >     });
   >   }
   > ```
   
   Well, thanks for your tips, i'll follow it except one point. It's better to 
cancel the `combinedFuture` first, because if we cancel the first future in  
`underlyingFutures`  , it will trigger the listener of 
`com.google.common.util.concurrent.Futures.CombinedFuture` which is added for 
every future in `underlyingFutures`  by `init` method. This listener is 
actually a method called `setOneValue`  which will set combinedFuture's status 
as `CANCELLED` rather than `INTERRUTED` as we expect when we cancel the first 
future of underlyingFutures cause of the `CancellationException`.
   
    In addition , the listener of `combinedFuture` will set the status of other 
future in `underlyingFutures`   as the same with itself(`CANCELLED` rather than 
`INTERRUTED` as we expect). I have test it in the test of `testQueryTimeout` in 
`ChainedExecutionQueryRunnerTest` use the sequences of [underlyingFutures, 
combinedFuture], it failed cause  the second future was not `INTERRUTED` but 
`CANCELLED`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to