gianm commented on code in PR #16533:
URL: https://github.com/apache/druid/pull/16533#discussion_r1717639536
##########
processing/src/main/java/org/apache/druid/math/expr/ExprMacroTable.java:
##########
@@ -68,6 +73,11 @@ public static ExprMacroTable nil()
return NIL;
}
+ public static ExprMacroTable granularity()
Review Comment:
Could use some javadoc to explain the purpose of this method. I don't think
it would be obvious to someone that isn't very familiar with how query engines
handle granularity and how that maps onto projections.
##########
processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java:
##########
@@ -193,13 +205,74 @@ private static boolean canUsePooledAlgorithm(
// non-string output cannot use the pooled algorith, even if the
underlying selector supports it
return false;
}
- if (Types.is(capabilities, ValueType.STRING)) {
- // string columns must use the on heap algorithm unless they have the
following capabilites
- return capabilities.isDictionaryEncoded().isTrue() &&
capabilities.areDictionaryValuesUnique().isTrue();
- } else {
+ if (!Types.is(capabilities, ValueType.STRING)) {
// non-strings are not eligible to use the pooled algorithm, and should
use a heap algorithm
return false;
}
+
+ // string columns must use the on heap algorithm unless they have the
following capabilites
+ if (!capabilities.isDictionaryEncoded().isTrue() ||
!capabilities.areDictionaryValuesUnique().isTrue()) {
+ return false;
+ }
+ if (Granularities.ALL.equals(query.getGranularity())) {
+ // all other requirements have been satisfied, ALL granularity can
always use the pooled algorithms
+ return true;
+ }
+ // if not using ALL granularity, we can still potentially use the pooled
algorithm if we are certain it doesn't
+ // need to make multiple passes (e.g. reset the cursor)
+ try (final ResourceHolder<ByteBuffer> resultsBufHolder =
bufferPool.take()) {
+ final ByteBuffer resultsBuf = resultsBufHolder.get();
+ resultsBuf.clear();
+
+ final int numBytesToWorkWith = resultsBuf.remaining();
Review Comment:
Could also check `resultsBuf.capacity()` without doing `resultsBuf.clear()`.
##########
processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java:
##########
@@ -193,13 +205,74 @@ private static boolean canUsePooledAlgorithm(
// non-string output cannot use the pooled algorith, even if the
underlying selector supports it
return false;
}
- if (Types.is(capabilities, ValueType.STRING)) {
- // string columns must use the on heap algorithm unless they have the
following capabilites
- return capabilities.isDictionaryEncoded().isTrue() &&
capabilities.areDictionaryValuesUnique().isTrue();
- } else {
+ if (!Types.is(capabilities, ValueType.STRING)) {
// non-strings are not eligible to use the pooled algorithm, and should
use a heap algorithm
return false;
}
+
+ // string columns must use the on heap algorithm unless they have the
following capabilites
+ if (!capabilities.isDictionaryEncoded().isTrue() ||
!capabilities.areDictionaryValuesUnique().isTrue()) {
+ return false;
+ }
+ if (Granularities.ALL.equals(query.getGranularity())) {
+ // all other requirements have been satisfied, ALL granularity can
always use the pooled algorithms
+ return true;
+ }
+ // if not using ALL granularity, we can still potentially use the pooled
algorithm if we are certain it doesn't
+ // need to make multiple passes (e.g. reset the cursor)
+ try (final ResourceHolder<ByteBuffer> resultsBufHolder =
bufferPool.take()) {
+ final ByteBuffer resultsBuf = resultsBufHolder.get();
+ resultsBuf.clear();
+
+ final int numBytesToWorkWith = resultsBuf.remaining();
+ final int numValuesPerPass = numBytesPerRecord > 0 ? numBytesToWorkWith
/ numBytesPerRecord : cardinality;
+
+ return numValuesPerPass <= cardinality;
+ }
+ }
+
+ private static boolean shouldUseAggregateMetricFirstAlgorithm(TopNQuery
query, TopNAlgorithmSelector selector)
+ {
+ // must be using ALL granularity since it makes multiple passes and must
reset the cursor
+ if (Granularities.ALL.equals(query.getGranularity())) {
+ return selector.isAggregateTopNMetricFirst() ||
query.context().getBoolean("doAggregateTopNMetricFirst", false);
+ }
+ return false;
+ }
+
+ public static CursorBuildSpec makeCursorBuildSpec(TopNQuery query, @Nullable
QueryMetrics<?> queryMetrics)
+ {
+ // virtual column is currently only used as a decorator to pass to the
cursor holder to allow specializing cursor
+ // and vector cursors if any pre-aggregated data at the matching
granularity is available
+ // eventually this could probably be reworked to be used by the
granularizer instead of the existing method
+ // of creating a selector on the time column
+ final VirtualColumn granularityVirtual =
Granularities.toVirtualColumn(query);
Review Comment:
Is it possible to make a helper method for this logic? Seems like there's
very similar logic in the topN, timeseries, and groupBy engines. Especially if
it's planned to be reworked, it's good for the logic to be more centralized so
it can be edited & tested in one place.
##########
processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryEngine.java:
##########
@@ -94,64 +103,46 @@ public Sequence<Result<TimeseriesResultValue>> process(
);
}
- final Filter filter = Filters.convertToCNFFromQueryContext(query,
Filters.toFilter(query.getFilter()));
final Interval interval = Iterables.getOnlyElement(query.getIntervals());
final Granularity gran = query.getGranularity();
- final boolean descending = query.isDescending();
- final ColumnInspector inspector =
query.getVirtualColumns().wrapInspector(adapter);
-
- final boolean doVectorize = query.context().getVectorize().shouldVectorize(
- adapter.canVectorize(filter, query.getVirtualColumns(), descending)
- && VirtualColumns.shouldVectorize(query, query.getVirtualColumns(),
adapter)
- && VectorGroupByEngine.canVectorizeAggregators(inspector,
query.getAggregatorSpecs())
- );
+ final CursorHolder cursorHolder =
adapter.makeCursorHolder(makeCursorBuildSpec(query, timeseriesQueryMetrics));
+ Cursors.requireTimeOrdering(cursorHolder, query.isDescending() ?
Order.DESCENDING : Order.ASCENDING);
final Sequence<Result<TimeseriesResultValue>> result;
- if (doVectorize) {
- result = processVectorized(query, adapter, filter, interval, gran,
descending, timeseriesQueryMetrics);
+ if
(query.context().getVectorize().shouldVectorize(cursorHolder.canVectorize(),
cursorHolder::close)) {
+ result = processVectorized(query, adapter, cursorHolder, interval, gran);
Review Comment:
`cursorHolder` can leak here if `processVectorized` throws an error; for
example if the "Not enough space for aggregators" case gets hit. Perhaps it
makes sense to wrap this entire block (the entire `if` / `else` including
`shouldVectorize`) in a `try` that closes the `cursorHolder` if a `Throwable`
is caught. If we do that, then I think we don't need the new 2-arg
`shouldVectorize` anymore.
Please double check the other query engines for similar leaks.
##########
processing/src/main/java/org/apache/druid/query/topn/AggregateTopNMetricFirstAlgorithm.java:
##########
@@ -112,6 +112,8 @@ public void run(
PooledTopNAlgorithm allMetricAlgo = new
PooledTopNAlgorithm(storageAdapter, query, bufferPool);
PooledTopNAlgorithm.PooledTopNParams allMetricsParam = null;
try {
+ // reset cursor to starting position (mark was set by the call to run)
Review Comment:
There's no mark anymore.
##########
processing/src/main/java/org/apache/druid/query/BaseQuery.java:
##########
@@ -66,11 +66,10 @@ public static void checkInterrupted()
public BaseQuery(
DataSource dataSource,
QuerySegmentSpec querySegmentSpec,
- boolean descending,
Map<String, Object> context
)
{
- this(dataSource, querySegmentSpec, descending, context, Granularities.ALL);
+ this(dataSource, querySegmentSpec, false, context, Granularities.ALL);
Review Comment:
Is `isDescending()` needed on the Query interface anymore? (If it ever was?)
Since we're doing some breaking changes here for extension-query authors
anyway, this seems like a good opportunity to remove it.
##########
processing/src/main/java/org/apache/druid/query/QueryContexts.java:
##########
@@ -154,10 +165,26 @@ public boolean shouldVectorize(final boolean canVectorize)
return true;
}
+
+ @Override
+ public boolean shouldVectorize(boolean canVectorize, Runnable cleanup)
+ {
+ if (!canVectorize) {
+ cleanup.run();
+ throw new ISE("Cannot vectorize!");
+ }
+
+ return true;
+ }
};
public abstract boolean shouldVectorize(boolean canVectorize);
+ public boolean shouldVectorize(boolean canVectorize, Runnable cleanup)
Review Comment:
About the new `cleanup` parameter:
1. Javadoc should explain when it will be called
2. Consider making it `Closeable` instead of `Runnable`; that seems more
semantically accurate & it looks like the only time it's actually used, it's
with a Closeable thing anyway.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]