jon-wei commented on a change in pull request #9516: More efficient join filter
rewrites
URL: https://github.com/apache/druid/pull/9516#discussion_r393386193
##########
File path:
processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterAnalyzer.java
##########
@@ -311,124 +431,99 @@ private static JoinFilterAnalysis rewriteOrFilter(
);
}
- /**
- * Rewrites a selector filter on a join table into an IN filter on the base
table.
- *
- * @param baseColumnNames Set of names of columns that belong to the base
table, including pre-join virtual
- * columns
- * @param selectorFilter SelectorFilter to be rewritten
- * @param prefixes Map of join table prefixes to clauses
- * @param equiconditions Map of equiconditions
- * @param correlationCache Cache of column correlation analyses. This will
be potentially modified by adding
- * any new column correlation analyses to the cache.
- *
- * @return A JoinFilterAnalysis that indicates how to handle the potentially
rewritten filter
- */
private static JoinFilterAnalysis rewriteSelectorFilter(
- Set<String> baseColumnNames,
SelectorFilter selectorFilter,
- Map<String, JoinableClause> prefixes,
- Map<String, Set<Expr>> equiconditions,
- Map<String, Optional<List<JoinFilterColumnCorrelationAnalysis>>>
correlationCache
+ JoinFilterPreAnalysis joinFilterPreAnalysis
)
{
+
+ List<Filter> newFilters = new ArrayList<>();
+ List<VirtualColumn> pushdownVirtualColumns = new ArrayList<>();
+
String filteringColumn = selectorFilter.getDimension();
- for (Map.Entry<String, JoinableClause> prefixAndClause :
prefixes.entrySet()) {
- if (prefixAndClause.getValue().includesColumn(filteringColumn)) {
- Optional<List<JoinFilterColumnCorrelationAnalysis>> correlations =
correlationCache.computeIfAbsent(
- prefixAndClause.getKey(),
- p -> findCorrelatedBaseTableColumns(
- baseColumnNames,
- p,
- prefixes.get(p),
- equiconditions
- )
- );
+ String filteringValue = selectorFilter.getValue();
- if (!correlations.isPresent()) {
- return
JoinFilterAnalysis.createNoPushdownFilterAnalysis(selectorFilter);
- }
+ if (areSomeColumnsFromPostJoinVirtualColumns(
+ joinFilterPreAnalysis.getPostJoinVirtualColumns(),
+ selectorFilter.getRequiredColumns()
+ )) {
+ return JoinFilterAnalysis.createNoPushdownFilterAnalysis(selectorFilter);
+ }
- List<Filter> newFilters = new ArrayList<>();
- List<VirtualColumn> pushdownVirtualColumns = new ArrayList<>();
+ if (!areSomeColumnsFromJoin(joinFilterPreAnalysis.getJoinableClauses(),
selectorFilter.getRequiredColumns())) {
+ return new JoinFilterAnalysis(
+ true,
+ selectorFilter,
+ selectorFilter,
+ pushdownVirtualColumns
+ );
+ }
- for (JoinFilterColumnCorrelationAnalysis correlationAnalysis :
correlations.get()) {
- if (correlationAnalysis.supportsPushDown()) {
- Set<String> correlatedValues = getCorrelatedValuesForPushDown(
- selectorFilter.getDimension(),
- selectorFilter.getValue(),
- correlationAnalysis.getJoinColumn(),
- prefixAndClause.getValue()
- );
+ Optional<List<JoinFilterColumnCorrelationAnalysis>> correlationAnalyses =
joinFilterPreAnalysis.getCorrelationsByFilteringColumn()
+
.get(filteringColumn);
- if (correlatedValues.isEmpty()) {
- return
JoinFilterAnalysis.createNoPushdownFilterAnalysis(selectorFilter);
- }
+ if (!correlationAnalyses.isPresent()) {
+ return JoinFilterAnalysis.createNoPushdownFilterAnalysis(selectorFilter);
+ }
- for (String correlatedBaseColumn :
correlationAnalysis.getBaseColumns()) {
- Filter rewrittenFilter = new InDimFilter(
- correlatedBaseColumn,
- correlatedValues,
- null,
- null
- ).toFilter();
- newFilters.add(rewrittenFilter);
- }
- for (Expr correlatedBaseExpr :
correlationAnalysis.getBaseExpressions()) {
- // We need to create a virtual column for the expressions when
pushing down.
- // Note that this block is never entered right now, since
correlationAnalysis.supportsPushDown()
- // will return false if there any correlated expressions on the
base table.
- // Pushdown of such filters is disabled until the expressions
system supports converting an expression
- // into a String representation that can be reparsed into the
same expression.
- // https://github.com/apache/druid/issues/9326 tracks this
expressions issue.
- String vcName =
getCorrelatedBaseExprVirtualColumnName(pushdownVirtualColumns.size());
-
- VirtualColumn correlatedBaseExprVirtualColumn = new
ExpressionVirtualColumn(
- vcName,
- correlatedBaseExpr,
- ValueType.STRING
- );
- pushdownVirtualColumns.add(correlatedBaseExprVirtualColumn);
-
- Filter rewrittenFilter = new InDimFilter(
- vcName,
- correlatedValues,
- null,
- null
- ).toFilter();
- newFilters.add(rewrittenFilter);
- }
- }
- }
+ for (JoinFilterColumnCorrelationAnalysis correlationAnalysis :
correlationAnalyses.get()) {
+ if (correlationAnalysis.supportsPushDown()) {
+ Optional<Set<String>> correlatedValues =
correlationAnalysis.getCorrelatedValuesMap().get(
+ Pair.of(filteringColumn, filteringValue)
+ );
- if (newFilters.isEmpty()) {
+ if (!correlatedValues.isPresent()) {
return
JoinFilterAnalysis.createNoPushdownFilterAnalysis(selectorFilter);
}
- return new JoinFilterAnalysis(
- true,
- selectorFilter,
- Filters.and(newFilters),
- pushdownVirtualColumns
- );
+ for (String correlatedBaseColumn :
correlationAnalysis.getBaseColumns()) {
+ Filter rewrittenFilter = new InDimFilter(
+ correlatedBaseColumn,
+ correlatedValues.get(),
+ null,
+ null
+ ).toFilter();
+ newFilters.add(rewrittenFilter);
+ }
+
+ for (Expr correlatedBaseExpr :
correlationAnalysis.getBaseExpressions()) {
+ // We need to create a virtual column for the expressions when
pushing down.
+ // Note that this block is never entered right now, since
correlationAnalysis.supportsPushDown()
+ // will return false if there any correlated expressions on the base
table.
+ // Pushdown of such filters is disabled until the expressions system
supports converting an expression
+ // into a String representation that can be reparsed into the same
expression.
+ // https://github.com/apache/druid/issues/9326 tracks this
expressions issue.
Review comment:
Actually, I changed my mind, I re-enabled filter rewrites when there are LHS
expressions in the join condition, and removed the `@Ignore` annotations on the
tests for that
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]