jon-wei commented on a change in pull request #9516: More efficient join filter 
rewrites
URL: https://github.com/apache/druid/pull/9516#discussion_r393386193
 
 

 ##########
 File path: 
processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterAnalyzer.java
 ##########
 @@ -311,124 +431,99 @@ private static JoinFilterAnalysis rewriteOrFilter(
     );
   }
 
-  /**
-   * Rewrites a selector filter on a join table into an IN filter on the base 
table.
-   *
-   * @param baseColumnNames  Set of names of columns that belong to the base 
table, including pre-join virtual
-   *                         columns
-   * @param selectorFilter   SelectorFilter to be rewritten
-   * @param prefixes         Map of join table prefixes to clauses
-   * @param equiconditions   Map of equiconditions
-   * @param correlationCache Cache of column correlation analyses. This will 
be potentially modified by adding
-   *                         any new column correlation analyses to the cache.
-   *
-   * @return A JoinFilterAnalysis that indicates how to handle the potentially 
rewritten filter
-   */
   private static JoinFilterAnalysis rewriteSelectorFilter(
-      Set<String> baseColumnNames,
       SelectorFilter selectorFilter,
-      Map<String, JoinableClause> prefixes,
-      Map<String, Set<Expr>> equiconditions,
-      Map<String, Optional<List<JoinFilterColumnCorrelationAnalysis>>> 
correlationCache
+      JoinFilterPreAnalysis joinFilterPreAnalysis
   )
   {
+
+    List<Filter> newFilters = new ArrayList<>();
+    List<VirtualColumn> pushdownVirtualColumns = new ArrayList<>();
+
     String filteringColumn = selectorFilter.getDimension();
-    for (Map.Entry<String, JoinableClause> prefixAndClause : 
prefixes.entrySet()) {
-      if (prefixAndClause.getValue().includesColumn(filteringColumn)) {
-        Optional<List<JoinFilterColumnCorrelationAnalysis>> correlations = 
correlationCache.computeIfAbsent(
-            prefixAndClause.getKey(),
-            p -> findCorrelatedBaseTableColumns(
-                baseColumnNames,
-                p,
-                prefixes.get(p),
-                equiconditions
-            )
-        );
+    String filteringValue = selectorFilter.getValue();
 
-        if (!correlations.isPresent()) {
-          return 
JoinFilterAnalysis.createNoPushdownFilterAnalysis(selectorFilter);
-        }
+    if (areSomeColumnsFromPostJoinVirtualColumns(
+        joinFilterPreAnalysis.getPostJoinVirtualColumns(),
+        selectorFilter.getRequiredColumns()
+    )) {
+      return JoinFilterAnalysis.createNoPushdownFilterAnalysis(selectorFilter);
+    }
 
-        List<Filter> newFilters = new ArrayList<>();
-        List<VirtualColumn> pushdownVirtualColumns = new ArrayList<>();
+    if (!areSomeColumnsFromJoin(joinFilterPreAnalysis.getJoinableClauses(), 
selectorFilter.getRequiredColumns())) {
+      return new JoinFilterAnalysis(
+          true,
+          selectorFilter,
+          selectorFilter,
+          pushdownVirtualColumns
+      );
+    }
 
-        for (JoinFilterColumnCorrelationAnalysis correlationAnalysis : 
correlations.get()) {
-          if (correlationAnalysis.supportsPushDown()) {
-            Set<String> correlatedValues = getCorrelatedValuesForPushDown(
-                selectorFilter.getDimension(),
-                selectorFilter.getValue(),
-                correlationAnalysis.getJoinColumn(),
-                prefixAndClause.getValue()
-            );
+    Optional<List<JoinFilterColumnCorrelationAnalysis>> correlationAnalyses = 
joinFilterPreAnalysis.getCorrelationsByFilteringColumn()
+                                                                               
                    .get(filteringColumn);
 
-            if (correlatedValues.isEmpty()) {
-              return 
JoinFilterAnalysis.createNoPushdownFilterAnalysis(selectorFilter);
-            }
+    if (!correlationAnalyses.isPresent()) {
+      return JoinFilterAnalysis.createNoPushdownFilterAnalysis(selectorFilter);
+    }
 
-            for (String correlatedBaseColumn : 
correlationAnalysis.getBaseColumns()) {
-              Filter rewrittenFilter = new InDimFilter(
-                  correlatedBaseColumn,
-                  correlatedValues,
-                  null,
-                  null
-              ).toFilter();
-              newFilters.add(rewrittenFilter);
-            }
 
-            for (Expr correlatedBaseExpr : 
correlationAnalysis.getBaseExpressions()) {
-              // We need to create a virtual column for the expressions when 
pushing down.
-              // Note that this block is never entered right now, since 
correlationAnalysis.supportsPushDown()
-              // will return false if there any correlated expressions on the 
base table.
-              // Pushdown of such filters is disabled until the expressions 
system supports converting an expression
-              // into a String representation that can be reparsed into the 
same expression.
-              // https://github.com/apache/druid/issues/9326 tracks this 
expressions issue.
-              String vcName = 
getCorrelatedBaseExprVirtualColumnName(pushdownVirtualColumns.size());
-
-              VirtualColumn correlatedBaseExprVirtualColumn = new 
ExpressionVirtualColumn(
-                  vcName,
-                  correlatedBaseExpr,
-                  ValueType.STRING
-              );
-              pushdownVirtualColumns.add(correlatedBaseExprVirtualColumn);
-
-              Filter rewrittenFilter = new InDimFilter(
-                  vcName,
-                  correlatedValues,
-                  null,
-                  null
-              ).toFilter();
-              newFilters.add(rewrittenFilter);
-            }
-          }
-        }
+    for (JoinFilterColumnCorrelationAnalysis correlationAnalysis : 
correlationAnalyses.get()) {
+      if (correlationAnalysis.supportsPushDown()) {
+        Optional<Set<String>> correlatedValues = 
correlationAnalysis.getCorrelatedValuesMap().get(
+            Pair.of(filteringColumn, filteringValue)
+        );
 
-        if (newFilters.isEmpty()) {
+        if (!correlatedValues.isPresent()) {
           return 
JoinFilterAnalysis.createNoPushdownFilterAnalysis(selectorFilter);
         }
 
-        return new JoinFilterAnalysis(
-            true,
-            selectorFilter,
-            Filters.and(newFilters),
-            pushdownVirtualColumns
-        );
+        for (String correlatedBaseColumn : 
correlationAnalysis.getBaseColumns()) {
+          Filter rewrittenFilter = new InDimFilter(
+              correlatedBaseColumn,
+              correlatedValues.get(),
+              null,
+              null
+          ).toFilter();
+          newFilters.add(rewrittenFilter);
+        }
+
+        for (Expr correlatedBaseExpr : 
correlationAnalysis.getBaseExpressions()) {
+          // We need to create a virtual column for the expressions when 
pushing down.
+          // Note that this block is never entered right now, since 
correlationAnalysis.supportsPushDown()
+          // will return false if there any correlated expressions on the base 
table.
+          // Pushdown of such filters is disabled until the expressions system 
supports converting an expression
+          // into a String representation that can be reparsed into the same 
expression.
+          // https://github.com/apache/druid/issues/9326 tracks this 
expressions issue.
 
 Review comment:
   Actually, I changed my mind, I re-enabled filter rewrites when there are LHS 
expressions in the join condition, and removed the `@Ignore` annotations on the 
tests for that

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to