This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 9f6a930 Fix join query incase of filter explosion during CNF
conversion (#12324)
9f6a930 is described below
commit 9f6a930462c77a96beb793b5809d878534998742
Author: Rohan Garg <[email protected]>
AuthorDate: Thu Mar 10 02:13:09 2022 +0530
Fix join query incase of filter explosion during CNF conversion (#12324)
---
.../segment/join/filter/JoinFilterAnalyzer.java | 16 +-
.../druid/sql/calcite/CalciteJoinQueryTest.java | 248 +++++++++++++++++++++
2 files changed, 263 insertions(+), 1 deletion(-)
diff --git
a/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterAnalyzer.java
b/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterAnalyzer.java
index 5db5bb1..ddbaa34 100644
---
a/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterAnalyzer.java
+++
b/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterAnalyzer.java
@@ -32,6 +32,7 @@ import org.apache.druid.segment.filter.Filters;
import org.apache.druid.segment.filter.OrFilter;
import org.apache.druid.segment.filter.SelectorFilter;
import org.apache.druid.segment.filter.cnf.CNFFilterExplosionException;
+import org.apache.druid.segment.join.filter.rewrite.JoinFilterRewriteConfig;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import javax.annotation.Nullable;
@@ -107,7 +108,20 @@ public class JoinFilterAnalyzer
normalizedOrClauses = Filters.toNormalizedOrClauses(key.getFilter());
}
catch (CNFFilterExplosionException cnfFilterExplosionException) {
- return preAnalysisBuilder.build(); // disable the filter pushdown and
rewrite optimization
+ JoinFilterRewriteConfig configWithoutPushdownAndRewrite = new
JoinFilterRewriteConfig(
+ false, // disable the filter pushdown and rewrite optimization
+ false,
+ key.getRewriteConfig().isEnableRewriteValueColumnFilters(),
+ key.getRewriteConfig().isEnableRewriteJoinToFilter(),
+ key.getRewriteConfig().getFilterRewriteMaxSize()
+ );
+ JoinFilterPreAnalysisKey keyWithoutPushdownAndRewrite = new
JoinFilterPreAnalysisKey(
+ configWithoutPushdownAndRewrite,
+ key.getJoinableClauses(),
+ key.getVirtualColumns(),
+ key.getFilter()
+ );
+ return new JoinFilterPreAnalysis.Builder(keyWithoutPushdownAndRewrite,
postJoinVirtualColumns).build();
}
List<Filter> normalizedBaseTableClauses = new ArrayList<>();
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java
index 86d05b8..f1cfb49 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java
@@ -40,6 +40,7 @@ import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.JoinDataSource;
import org.apache.druid.query.LookupDataSource;
import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.QueryException;
import org.apache.druid.query.TableDataSource;
@@ -59,9 +60,11 @@ import
org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.ExtractionDimensionSpec;
import org.apache.druid.query.extraction.SubstringDimExtractionFn;
+import org.apache.druid.query.filter.AndDimFilter;
import org.apache.druid.query.filter.BoundDimFilter;
import org.apache.druid.query.filter.LikeDimFilter;
import org.apache.druid.query.filter.NotDimFilter;
+import org.apache.druid.query.filter.OrDimFilter;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.ResultRow;
@@ -4740,4 +4743,249 @@ public class CalciteJoinQueryTest extends
BaseCalciteQueryTest
ImmutableList.of(new Object[]{"[\"a\",\"b\"]", "[\"a\",\"b\"]"})
);
}
+
+ @Test
+ @Parameters(source = QueryContextForJoinProvider.class)
+ public void
testInnerJoinWithFilterPushdownAndManyFiltersEmptyResults(Map<String, Object>
queryContext) throws Exception
+ {
+ // create the query we expect
+ ScanQuery query = newScanQueryBuilder()
+ .dataSource(
+ join(
+ new TableDataSource(CalciteTests.DATASOURCE1),
+ new QueryDataSource(
+ newScanQueryBuilder()
+ .dataSource(new TableDataSource("foo"))
+ .intervals(querySegmentSpec(Filtration.eternity()))
+
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
+ .columns("m1")
+ .context(queryContext)
+ .build()
+ ),
+ "j0.",
+ equalsCondition(DruidExpression.ofColumn(ColumnType.FLOAT,
"m1"), DruidExpression.ofColumn(ColumnType.FLOAT, "j0.m1")),
+ JoinType.INNER
+ )
+ )
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .columns("j0.m1", "m1")
+ .filters(new OrDimFilter(
+ new AndDimFilter(
+ new SelectorDimFilter("dim1", "A", null),
+ new SelectorDimFilter("dim2", "B", null)
+ ),
+ new AndDimFilter(
+ new SelectorDimFilter("dim1", "C", null),
+ new SelectorDimFilter("dim2", "D", null)
+ ),
+ new AndDimFilter(
+ new SelectorDimFilter("dim1", "A", null),
+ new SelectorDimFilter("dim2", "C", null)
+ ),
+ new AndDimFilter(
+ new SelectorDimFilter("dim1", "C", null),
+ new SelectorDimFilter("dim2", "E", null)
+ ),
+ new AndDimFilter(
+ new SelectorDimFilter("dim1", "D", null),
+ new SelectorDimFilter("dim2", "H", null)
+ ),
+ new AndDimFilter(
+ new SelectorDimFilter("dim1", "A", null),
+ new SelectorDimFilter("dim2", "D", null)
+ ),
+ new AndDimFilter(
+ new SelectorDimFilter("dim1", "B", null),
+ new SelectorDimFilter("dim2", "C", null)
+ ),
+ new AndDimFilter(
+ new SelectorDimFilter("dim1", "H", null),
+ new SelectorDimFilter("dim2", "E", null)
+ ),
+ new AndDimFilter(
+ new SelectorDimFilter("dim1", "I", null),
+ new SelectorDimFilter("dim2", "J", null)
+ ),
+ new AndDimFilter(
+ new SelectorDimFilter("dim1", "I", null),
+ new SelectorDimFilter("dim2", "K", null)
+ ),
+ new AndDimFilter(
+ new SelectorDimFilter("dim1", "J", null),
+ new SelectorDimFilter("dim2", "I", null)
+ ),
+ new AndDimFilter(
+ new SelectorDimFilter("dim1", "Q", null),
+ new SelectorDimFilter("dim2", "R", null)
+ ),
+ new AndDimFilter(
+ new SelectorDimFilter("dim1", "Q", null),
+ new SelectorDimFilter("dim2", "S", null)
+ ),
+ new AndDimFilter(
+ new SelectorDimFilter("dim1", "S", null),
+ new SelectorDimFilter("dim2", "Q", null)
+ ),
+ new AndDimFilter(
+ new SelectorDimFilter("dim1", "X", null),
+ new SelectorDimFilter("dim2", "Y", null)
+ ),
+ new AndDimFilter(
+ new SelectorDimFilter("dim1", "Z", null),
+ new SelectorDimFilter("dim2", "U", null)
+ ),
+ new AndDimFilter(
+ new SelectorDimFilter("dim1", "U", null),
+ new SelectorDimFilter("dim2", "Z", null)
+ ),
+ new AndDimFilter(
+ new SelectorDimFilter("dim1", "P", null),
+ new SelectorDimFilter("dim2", "Q", null)
+ ),
+ new AndDimFilter(
+ new SelectorDimFilter("dim1", "X", null),
+ new SelectorDimFilter("dim2", "A", null)
+ )
+ ))
+ .context(queryContext)
+ .build();
+
+ assert QueryContexts.getEnableJoinFilterPushDown(query); // filter
pushdown must be enabled
+ // no results will be produced since the filter values aren't in the table
+ testQuery(
+ "SELECT f1.m1, f2.m1\n"
+ + "FROM foo f1\n"
+ + "INNER JOIN foo f2 ON f1.m1 = f2.m1 where (f1.dim1, f1.dim2) in
(('A', 'B'), ('C', 'D'), ('A', 'C'), ('C', 'E'), ('D', 'H'), ('A', 'D'), ('B',
'C'), \n"
+ + "('H', 'E'), ('I', 'J'), ('I', 'K'), ('J', 'I'), ('Q', 'R'), ('Q',
'S'), ('S', 'Q'), ('X', 'Y'), ('Z', 'U'), ('U', 'Z'), ('P', 'Q'), ('X',
'A'))\n",
+ queryContext,
+ ImmutableList.of(query),
+ ImmutableList.of()
+ );
+ }
+
+ @Test
+ @Parameters(source = QueryContextForJoinProvider.class)
+ public void
testInnerJoinWithFilterPushdownAndManyFiltersNonEmptyResults(Map<String,
Object> queryContext) throws Exception
+ {
+ // create the query we expect
+ ScanQuery query = newScanQueryBuilder()
+ .dataSource(
+ join(
+ new TableDataSource(CalciteTests.DATASOURCE1),
+ new QueryDataSource(
+ newScanQueryBuilder()
+ .dataSource(new TableDataSource("foo"))
+ .intervals(querySegmentSpec(Filtration.eternity()))
+
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
+ .columns("m1")
+ .context(queryContext)
+ .build()
+ ),
+ "j0.",
+ equalsCondition(DruidExpression.ofColumn(ColumnType.FLOAT,
"m1"), DruidExpression.ofColumn(ColumnType.FLOAT, "j0.m1")),
+ JoinType.INNER
+ )
+ )
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .columns("j0.m1", "m1")
+ .filters(new OrDimFilter(
+ new AndDimFilter(
+ new SelectorDimFilter("dim1", "1", null),
+ new SelectorDimFilter("dim2", "a", null)
+ ),
+ new AndDimFilter(
+ new SelectorDimFilter("dim1", "C", null),
+ new SelectorDimFilter("dim2", "D", null)
+ ),
+ new AndDimFilter(
+ new SelectorDimFilter("dim1", "A", null),
+ new SelectorDimFilter("dim2", "C", null)
+ ),
+ new AndDimFilter(
+ new SelectorDimFilter("dim1", "C", null),
+ new SelectorDimFilter("dim2", "E", null)
+ ),
+ new AndDimFilter(
+ new SelectorDimFilter("dim1", "D", null),
+ new SelectorDimFilter("dim2", "H", null)
+ ),
+ new AndDimFilter(
+ new SelectorDimFilter("dim1", "A", null),
+ new SelectorDimFilter("dim2", "D", null)
+ ),
+ new AndDimFilter(
+ new SelectorDimFilter("dim1", "B", null),
+ new SelectorDimFilter("dim2", "C", null)
+ ),
+ new AndDimFilter(
+ new SelectorDimFilter("dim1", "H", null),
+ new SelectorDimFilter("dim2", "E", null)
+ ),
+ new AndDimFilter(
+ new SelectorDimFilter("dim1", "I", null),
+ new SelectorDimFilter("dim2", "J", null)
+ ),
+ new AndDimFilter(
+ new SelectorDimFilter("dim1", "I", null),
+ new SelectorDimFilter("dim2", "K", null)
+ ),
+ new AndDimFilter(
+ new SelectorDimFilter("dim1", "J", null),
+ new SelectorDimFilter("dim2", "I", null)
+ ),
+ new AndDimFilter(
+ new SelectorDimFilter("dim1", "Q", null),
+ new SelectorDimFilter("dim2", "R", null)
+ ),
+ new AndDimFilter(
+ new SelectorDimFilter("dim1", "Q", null),
+ new SelectorDimFilter("dim2", "S", null)
+ ),
+ new AndDimFilter(
+ new SelectorDimFilter("dim1", "S", null),
+ new SelectorDimFilter("dim2", "Q", null)
+ ),
+ new AndDimFilter(
+ new SelectorDimFilter("dim1", "X", null),
+ new SelectorDimFilter("dim2", "Y", null)
+ ),
+ new AndDimFilter(
+ new SelectorDimFilter("dim1", "Z", null),
+ new SelectorDimFilter("dim2", "U", null)
+ ),
+ new AndDimFilter(
+ new SelectorDimFilter("dim1", "U", null),
+ new SelectorDimFilter("dim2", "Z", null)
+ ),
+ new AndDimFilter(
+ new SelectorDimFilter("dim1", "P", null),
+ new SelectorDimFilter("dim2", "Q", null)
+ ),
+ new AndDimFilter(
+ new SelectorDimFilter("dim1", "X", null),
+ new SelectorDimFilter("dim2", "A", null)
+ )
+ ))
+ .context(queryContext)
+ .build();
+
+ assert QueryContexts.getEnableJoinFilterPushDown(query); // filter
pushdown must be enabled
+ // (dim1, dim2, m1) in foo look like
+ // [, a, 1.0]
+ // [10.1, , 2.0]
+ // [2, , 3.0]
+ // [1, a, 4.0]
+ // [def, abc, 5.0]
+ // [abc, , 6.0]
+ // So (1, a) filter will produce results for 4.0
+ testQuery(
+ "SELECT f1.m1, f2.m1\n"
+ + "FROM foo f1\n"
+ + "INNER JOIN foo f2 ON f1.m1 = f2.m1 where (f1.dim1, f1.dim2) in
(('1', 'a'), ('C', 'D'), ('A', 'C'), ('C', 'E'), ('D', 'H'), ('A', 'D'), ('B',
'C'), \n"
+ + "('H', 'E'), ('I', 'J'), ('I', 'K'), ('J', 'I'), ('Q', 'R'), ('Q',
'S'), ('S', 'Q'), ('X', 'Y'), ('Z', 'U'), ('U', 'Z'), ('P', 'Q'), ('X',
'A'))\n",
+ queryContext,
+ ImmutableList.of(query),
+ ImmutableList.of(new Object[]{4.0F, 4.0F})
+ );
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]