jihoonson commented on a change in pull request #10027:
URL: https://github.com/apache/druid/pull/10027#discussion_r445720081
##########
File path:
processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java
##########
@@ -141,33 +144,34 @@ public ChainedExecutionQueryRunner(
);
}
)
- )
- );
+ );
- queryWatcher.registerQueryFuture(query, futures);
+ ListenableFuture<List<Iterable<T>>> future =
Futures.allAsList(futures);
+ queryWatcher.registerQueryFuture(query, future);
try {
return new MergeIterable<>(
ordering.nullsFirst(),
QueryContexts.hasTimeout(query) ?
- futures.get(QueryContexts.getTimeout(query),
TimeUnit.MILLISECONDS) :
- futures.get()
+ future.get(QueryContexts.getTimeout(query),
TimeUnit.MILLISECONDS) :
+ future.get()
).iterator();
}
catch (InterruptedException e) {
log.noStackTrace().warn(e, "Query interrupted, cancelling
pending results, query id [%s]", query.getId());
- futures.cancel(true);
+ GuavaUtils.cancelAll(true,
ImmutableList.<Future>builder().add(future).addAll(futures).build());
Review comment:
It seems easy to forget canceling `future` and so error-prone. How about
modifying `GuavaUtils.cancelAll()` to take `future` as well? So it would be like
```java
public static <F extends Future<?>> void cancelAll(
boolean mayInterruptIfRunning,
@Nullable ListenableFuture<?> combinedFuture,
List<F> futures
)
{
final List<Future> allFuturesToCancel = new ArrayList<>(futures);
allFuturesToCancel.add(combinedFuture);
if (allFuturesToCancel.isEmpty()) {
return;
}
allFuturesToCancel.forEach(f -> {
try {
f.cancel(mayInterruptIfRunning);
}
catch (Throwable t) {
log.warn(t, "Error while cancelling future.");
}
});
}
```
##########
File path:
processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java
##########
@@ -141,33 +144,34 @@ public ChainedExecutionQueryRunner(
);
}
)
- )
- );
+ );
- queryWatcher.registerQueryFuture(query, futures);
+ ListenableFuture<List<Iterable<T>>> future =
Futures.allAsList(futures);
+ queryWatcher.registerQueryFuture(query, future);
try {
return new MergeIterable<>(
ordering.nullsFirst(),
QueryContexts.hasTimeout(query) ?
- futures.get(QueryContexts.getTimeout(query),
TimeUnit.MILLISECONDS) :
- futures.get()
+ future.get(QueryContexts.getTimeout(query),
TimeUnit.MILLISECONDS) :
+ future.get()
).iterator();
}
catch (InterruptedException e) {
log.noStackTrace().warn(e, "Query interrupted, cancelling
pending results, query id [%s]", query.getId());
- futures.cancel(true);
+ GuavaUtils.cancelAll(true,
ImmutableList.<Future>builder().add(future).addAll(futures).build());
Review comment:
Or, more structured way to do could be adding a new `CombinedFuture`
like this
```java
public static class CombinedFuture<V> implements Future<List<V>>
{
private final List<ListenableFuture<V>> underlyingFutures;
private final ListenableFuture<List<V>> combined;
public CombinedFuture(List<ListenableFuture<V>> futures)
{
this.underlyingFutures = futures;
this.combined = Futures.allAsList(futures);
}
@Override
public boolean cancel(boolean mayInterruptIfRunning)
{
if (combined.isDone() || combined.isCancelled()) {
return false;
} else {
cancelAll(mayInterruptIfRunning, combined, underlyingFutures);
return true;
}
}
@Override
public boolean isCancelled()
{
return combined.isCancelled();
}
@Override
public boolean isDone()
{
return combined.isDone();
}
@Override
public List<V> get() throws InterruptedException, ExecutionException
{
return combined.get();
}
@Override
public List<V> get(long timeout, TimeUnit unit) throws
InterruptedException, ExecutionException, TimeoutException
{
return combined.get(timeout, unit);
}
}
```
I'm fine with either way.
##########
File path:
processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java
##########
@@ -281,4 +281,41 @@ public void testInsufficientResourcesOnBroker()
}
}
}
+
+ @Test(timeout = 60_000L)
+ public void testTimeoutExceptionOnQueryable()
+ {
+ expectedException.expect(QueryInterruptedException.class);
+
expectedException.expectCause(CoreMatchers.instanceOf(TimeoutException.class));
+
+ final GroupByQuery query = GroupByQuery
+ .builder()
+ .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
+ .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
+ .setDimensions(new DefaultDimensionSpec("quality", "alias"))
+ .setAggregatorSpecs(new LongSumAggregatorFactory("rows", "rows"))
+ .setGranularity(QueryRunnerTestHelper.DAY_GRAN)
+ .overrideContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 1))
+ .build();
+
+ GroupByQueryRunnerFactory factory = makeQueryRunnerFactory(
+ GroupByQueryRunnerTest.DEFAULT_MAPPER,
+ new GroupByQueryConfig()
+ {
+ @Override
+ public String getDefaultStrategy()
+ {
+ return "v2";
+ }
+
+ @Override
+ public boolean isSingleThreaded()
+ {
+ return true;
+ }
+ }
+ );
+ QueryRunner<ResultRow> _runnnner =
factory.mergeRunners(Execs.directExecutor(), ImmutableList.of(runner));
Review comment:
nit: the variable name starting with an underscore is not Java
convention. How about `mergedRunner`?
##########
File path:
processing/src/main/java/org/apache/druid/query/GroupByMergedQueryRunner.java
##########
@@ -187,7 +189,7 @@ private void waitForFutureCompletion(
}
catch (InterruptedException e) {
log.warn(e, "Query interrupted, cancelling pending results, query id
[%s]", query.getId());
- future.cancel(true);
+ GuavaUtils.cancelAll(true, futures);
Review comment:
nit: `future` should be canceled on exceptions too. This is nit since
this class is used only by groupBy v1 which is deprecated.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]