github-actions[bot] commented on code in PR #63763: URL: https://github.com/apache/doris/pull/63763#discussion_r3449508848
########## regression-test/suites/nereids_rules_p0/subquery_to_window_function/correlated_scalar_subquery_to_window_function.groovy: ########## @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("correlated_scalar_subquery_to_window_function") { + sql "DROP TABLE IF EXISTS fact FORCE" + sql "DROP TABLE IF EXISTS dim FORCE" + + sql """ + CREATE TABLE fact ( + id INT, + k INT, + v INT + ) ENGINE=OLAP + DUPLICATE KEY(id) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ('replication_num' = '1') + """ + + sql """ + CREATE TABLE dim ( + did INT, + k INT, + tag INT + ) ENGINE=OLAP + DUPLICATE KEY(did) Review Comment: This regression suite does not appear to exercise the new window-rewrite path. The final rule now requires the correlated outer-only slots to be unique before it calls `rewrite()`, but `dim` is a `DUPLICATE KEY(did)` table and the correlated key is `d.k` / `t.k`; there is no unique constraint or unique-key metadata on `k`. As a result these queries validate the original scalar-subquery plan and would still pass if the new window predicate-splitting or uniqueness checks regressed. Please add a positive regression case with uniqueness metadata on the correlated key, and keep this duplicate-key case only as negative/no-rewrite coverage if that is the intent. ########## fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AggScalarSubQueryToWindowFunction.java: ########## @@ -281,6 +283,45 @@ private boolean checkRelation(List<Slot> correlatedSlots) { return correlatedSlots.stream().allMatch(e -> correlatedRelationOutput.contains(e.getExprId())); } + /** + * The correlated columns of the outer-only table must form a unique key + * for the WinMagic window-function rewrite to be correct. Without uniqueness, + * the window function may aggregate over duplicated outer rows, producing + * wrong results for aggregates like SUM and COUNT. + * <p> + * Uses {@link DataTrait#isUnique(Set)} which covers both OLAP key metadata + * (PRIMARY_KEYS / UNIQUE_KEYS) and declared constraints (PRIMARY KEY / UNIQUE). + */ + private boolean checkUniqueCorrelatedTable(List<Slot> correlatedSlots) { + List<CatalogRelation> outerTables = outerPlans.stream() + .filter(CatalogRelation.class::isInstance) + .map(CatalogRelation.class::cast) + .collect(Collectors.toList()); + List<CatalogRelation> innerTables = innerPlans.stream() + .filter(CatalogRelation.class::isInstance) + .map(CatalogRelation.class::cast) + .collect(Collectors.toList()); + + List<Long> outerIds = outerTables.stream().map(node -> node.getTable().getId()).collect(Collectors.toList()); + List<Long> innerIds = innerTables.stream().map(node -> node.getTable().getId()).collect(Collectors.toList()); + + innerIds.forEach(outerIds::remove); + if (outerIds.size() != 1) { + return true; + } + + CatalogRelation outerOnlyTable = outerTables.stream() + .filter(node -> outerIds.contains(node.getTable().getId())) + .findFirst().orElse(null); + if (outerOnlyTable == null) { + return true; + } + + // Check uniqueness via DataTrait on the correlated (outer-only) table. Review Comment: This uniqueness check is still too weak for null-safe correlated predicates. The rule accepts any `ComparisonPredicate`, so an inner/outer pair like `f2.k <=> d.k` / `f.k <=> d.k` can pass `checkFilter`, but `DataTrait.isUnique(...)` does not require the correlated slots to be non-null. Existing join FD code explicitly rejects null-safe hash keys when it only has uniqueness metadata for the same reason. With nullable `d.k`, `PARTITION BY d.k` groups all null-key outer rows together; under `<=>`, those rows can join the same null-key fact rows and multiply the window aggregate, while the original scalar subquery is evaluated per outer row. Please require non-null uniqueness here, or reject nullable/null-safe correlations for this rewrite. ########## fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/AggScalarSubQueryToWindowFunctionTest.java: ########## @@ -338,6 +360,196 @@ public void testNotMatchTheRule() { } } + @Test + public void testWindowPartitionsByOuterOnlyRelationSlots() throws Exception { + // Use TPC-H Q17: correlated table is part (p_partkey is unique via constraint), + // fact table is lineitem. The window PARTITION BY should contain all output + // columns of the correlated table (part). + String sql = TPCHUtils.Q17; + + Plan plan = PlanChecker.from(createCascadesContext(sql)) + .analyze(sql) + .applyBottomUp(new PullUpProjectUnderApply()) + .applyTopDown(new PushDownFilterThroughProject()) + .customRewrite(new EliminateUnnecessaryProject()) + .customRewrite(new AggScalarSubQueryToWindowFunction()) + .rewrite() + .getPlan(); + + List<LogicalWindow<Plan>> windows = plan.collectToList(LogicalWindow.class::isInstance); + Assertions.assertEquals(1, windows.size()); + + LogicalWindow<Plan> window = windows.get(0); + List<NamedExpression> windowExpressions = window.getWindowExpressions(); + Assertions.assertEquals(1, windowExpressions.size()); + + WindowExpression windowExpression = (WindowExpression) windowExpressions.get(0).child(0); + Set<String> partitionKeys = windowExpression.getPartitionKeys().stream() + .map(Expression.class::cast) + .filter(Slot.class::isInstance) + .map(Slot.class::cast) + .map(Slot::getName) + .collect(Collectors.toSet()); + // The window PARTITION BY should include the correlated column (p_partkey) + Assertions.assertTrue(partitionKeys.contains("p_partkey"), + "Expected partition keys to contain p_partkey, got: " + partitionKeys); + } + + @Test + public void testSharedTablePredicatesStayAboveWindow() throws Exception { + createTable("CREATE TABLE tpch.fact_split (\n" + + " id INT,\n" + + " k INT,\n" + + " v INT\n" + + ") ENGINE=OLAP\n" + + "DUPLICATE KEY(id)\n" + + "DISTRIBUTED BY HASH(id) BUCKETS 1\n" + + "PROPERTIES ('replication_num' = '1')"); + createTable("CREATE TABLE tpch.dim_split (\n" + + " did INT,\n" + + " k INT,\n" + + " tag INT\n" + + ") ENGINE=OLAP\n" + + "DUPLICATE KEY(did)\n" + + "DISTRIBUTED BY HASH(did) BUCKETS 1\n" + + "PROPERTIES ('replication_num' = '1')"); + // Add UNIQUE constraint so the rule matches + addConstraint("alter table dim_split add constraint uq_dim_split_k unique (k)"); + + // Query with extra predicate on shared table (f.v > 6). + // This predicate must stay ABOVE the window, otherwise the window + // function would aggregate fewer rows than the original scalar subquery. + String sql = "SELECT d.did, d.k, d.tag, f.id, f.v " + + "FROM fact_split f, dim_split d " + + "WHERE f.k = d.k " + + " AND f.v > 6" + + " AND f.v * 2 > (" + + " SELECT SUM(f2.v) " + + " FROM fact_split f2 " + + " WHERE f2.k = d.k" + + " )"; + + Plan plan = PlanChecker.from(createCascadesContext(sql)) + .analyze(sql) + .applyBottomUp(new PullUpProjectUnderApply()) + .applyTopDown(new PushDownFilterThroughProject()) + .customRewrite(new EliminateUnnecessaryProject()) + .customRewrite(new AggScalarSubQueryToWindowFunction()) + .rewrite() + .getPlan(); + + // Rule should match and produce a window + Assertions.assertTrue(plan.anyMatch(LogicalWindow.class::isInstance), + "Rule should produce a window for this query"); + + // Verify the shared-table predicate (f.v > 6) exists in the plan and is + // placed ABOVE the window, not below it. We check that no filter below + // the window contains predicates that reference ONLY shared-table columns. + List<LogicalWindow<Plan>> windows = plan.collectToList(LogicalWindow.class::isInstance); + LogicalWindow<Plan> window = windows.get(0); + // The window's child is the below-window filter (or join). + // Check that conjuncts in filters below the window do NOT include + // predicates that reference only fact (shared table) columns like v. + Plan belowWindow = window.child(0); + List<LogicalFilter<Plan>> belowFilters = belowWindow + .collectToList(LogicalFilter.class::isInstance); + for (LogicalFilter<Plan> f : belowFilters) { + for (Expression conj : f.getConjuncts()) { + // If a conjunct references fact columns (like v), it must also + // reference dim columns (like k) to be safe below the window + // (join conditions). Pure fact-only filters must stay above. + String conjStr = conj.toSql(); Review Comment: This assertion does not actually catch the regression it is meant to guard against. `SlotReference.toSql()` renders the bare slot name (`v`), not a backticked name, so if `f.v > 6` were still below the window the string would be something like `(v > 6)` and `conjStr.contains("`v`")` would be false. Please assert on the predicate's input slots/ExprIds, `shapeInfo()`, or the bare slot names with enough qualification to distinguish `fact.v`, so this test fails if a shared-table-only predicate is pushed below the window. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
