utafrali commented on code in PR #62314:
URL: https://github.com/apache/doris/pull/62314#discussion_r3061998184


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RunTimeFilterTranslatorV2.java:
##########
@@ -93,13 +94,22 @@ private void translateRuntimeFilterGroup(PlanNode 
sourceNode,
         Expr srcExpr = 
ExpressionTranslator.translate(head.getSourceExpression(), ctx);
 
         List<RuntimeFilterTarget> targets = new ArrayList<>();
+        boolean isLocalTarget = true;

Review Comment:
   The `isLocalTarget` computation iterates `filters` once here and then again 
in the target-building loop below. V1 (`RuntimeFilterTranslator`) does this in 
a single pass with `stream().allMatch(...)`. This is a minor inefficiency but 
also a style inconsistency with the code being mirrored. Consider:
   ```java
   boolean isLocalTarget = filters.stream()
       .map(RuntimeFilterV2::getLegacyTargetNode)
       .allMatch(t -> !(t instanceof CTEScanNode)
           && t.getFragmentId().equals(sourceNode.getFragmentId()));
   ```
   This is a direct port of the V1 logic and makes the parallel intent explicit.



##########
fe/fe-core/src/test/java/org/apache/doris/nereids/glue/translator/RunTimeFilterTranslatorV2Test.java:
##########
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.glue.translator;
+
+import org.apache.doris.planner.Planner;
+import org.apache.doris.planner.RuntimeFilter;
+import org.apache.doris.planner.SetOperationNode;
+import org.apache.doris.utframe.TestWithFeService;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+/**
+ * Tests for RunTimeFilterTranslatorV2, specifically verifying that runtime 
filter
+ * locality (isLocalTarget) is correctly computed for INTERSECT/EXCEPT 
operations.
+ */
+public class RunTimeFilterTranslatorV2Test extends TestWithFeService {
+
+    @Override
+    protected void runBeforeAll() throws Exception {
+        createDatabase("test_rf_v2");
+        connectContext.getSessionVariable().enableFallbackToOriginalPlanner = 
false;
+        connectContext.getSessionVariable().setRuntimeFilterType(8);
+
+        createTable("create table test_rf_v2.t1 ("
+                + "k1 int, "
+                + "v1 int"
+                + ") distributed by hash(k1) buckets 3 "
+                + "properties('replication_num' = '1');");
+
+        createTable("create table test_rf_v2.t2 ("
+                + "k1 int, "
+                + "v1 int"
+                + ") distributed by hash(k1) buckets 3 "
+                + "properties('replication_num' = '1');");
+
+        createTable("create table test_rf_v2.t3 ("
+                + "k1 int, "
+                + "v1 int"
+                + ") distributed by hash(k1) buckets 3 "
+                + "properties('replication_num' = '1');");
+    }
+
+    /**
+     * Verify that runtime filters produced by INTERSECT correctly compute
+     * hasRemoteTargets / hasLocalTargets (not both false, which was the bug).
+     * Before the fix, the V2 translator used the 2-arg RuntimeFilterTarget
+     * constructor which hard-coded isLocalTarget=false, causing all filters
+     * to be incorrectly marked as remote-only.
+     */
+    @Test
+    public void testIntersectRuntimeFilterLocality() throws Exception {
+        String sql = "select k1 from test_rf_v2.t1 intersect select k1 from 
test_rf_v2.t2";
+        Planner planner = getSQLPlanner(sql);
+        Assertions.assertNotNull(planner);
+
+        List<RuntimeFilter> runtimeFilters = planner.getRuntimeFilters();
+        // INTERSECT may or may not generate runtime filters depending on 
stats/cost.
+        // If runtime filters are generated, verify locality flags are 
consistent.
+        for (RuntimeFilter rf : runtimeFilters) {
+            if (rf.getBuilderNode() instanceof SetOperationNode) {
+                verifyLocalityConsistency(rf);
+            }
+        }
+    }
+
+    /**
+     * Verify that runtime filters produced by EXCEPT correctly compute
+     * hasRemoteTargets / hasLocalTargets.
+     */
+    @Test
+    public void testExceptRuntimeFilterLocality() throws Exception {
+        String sql = "select k1 from test_rf_v2.t1 except select k1 from 
test_rf_v2.t2";
+        Planner planner = getSQLPlanner(sql);
+        Assertions.assertNotNull(planner);
+
+        List<RuntimeFilter> runtimeFilters = planner.getRuntimeFilters();
+        for (RuntimeFilter rf : runtimeFilters) {
+            if (rf.getBuilderNode() instanceof SetOperationNode) {
+                verifyLocalityConsistency(rf);
+            }
+        }
+    }
+
+    /**
+     * Multi-child INTERSECT: verify runtime filter locality with 3 branches.
+     */
+    @Test
+    public void testMultiChildIntersectRuntimeFilterLocality() throws 
Exception {
+        String sql = "select k1 from test_rf_v2.t1 "
+                + "intersect select k1 from test_rf_v2.t2 "
+                + "intersect select k1 from test_rf_v2.t3";
+        Planner planner = getSQLPlanner(sql);
+        Assertions.assertNotNull(planner);
+
+        List<RuntimeFilter> runtimeFilters = planner.getRuntimeFilters();
+        for (RuntimeFilter rf : runtimeFilters) {
+            if (rf.getBuilderNode() instanceof SetOperationNode) {
+                verifyLocalityConsistency(rf);
+            }
+        }
+    }
+
+    /**
+     * Verify that hasRemoteTargets and hasLocalTargets are not both true
+     * and not both false (the BE requires exactly one to be true).
+     * Before the fix, hasLocalTargets was always false and hasRemoteTargets
+     * was always true, regardless of actual fragment placement.
+     */
+    private void verifyLocalityConsistency(RuntimeFilter rf) {
+        boolean hasRemote = rf.hasRemoteTargets();
+        // Use toThrift() to get hasLocalTargets since there's no public getter
+        boolean hasLocal = rf.toThrift().has_local_targets;

Review Comment:
   This assertion does not distinguish the buggy behavior from the fixed 
behavior. Before the fix, all targets had `isLocalTarget=false`, which means 
`hasRemoteTargets=true` and `hasLocal=false` — these are already not equal, so 
`assertNotEquals(hasRemote, hasLocal)` passes with the buggy code too.
   
   To actually validate the fix, the test needs to assert the *direction* of 
locality. For a two-table INTERSECT where the SetOperation node and its scan 
targets are in the same plan fragment, the expected result is `hasLocal=true, 
hasRemote=false`. Something like:
   ```java
   Assertions.assertTrue(hasLocal,
       "Expected same-fragment targets to be marked local for filter " + 
rf.getFilterId().asInt());
   Assertions.assertFalse(hasRemote,
       "Expected no remote targets for same-fragment filter " + 
rf.getFilterId().asInt());
   ```
   This requires knowing the expected locality per test case, but without it 
the test is not a regression guard for the actual bug.



##########
fe/fe-core/src/test/java/org/apache/doris/nereids/glue/translator/RunTimeFilterTranslatorV2Test.java:
##########
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.glue.translator;
+
+import org.apache.doris.planner.Planner;
+import org.apache.doris.planner.RuntimeFilter;
+import org.apache.doris.planner.SetOperationNode;
+import org.apache.doris.utframe.TestWithFeService;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+/**
+ * Tests for RunTimeFilterTranslatorV2, specifically verifying that runtime 
filter
+ * locality (isLocalTarget) is correctly computed for INTERSECT/EXCEPT 
operations.
+ */
+public class RunTimeFilterTranslatorV2Test extends TestWithFeService {
+
+    @Override
+    protected void runBeforeAll() throws Exception {
+        createDatabase("test_rf_v2");
+        connectContext.getSessionVariable().enableFallbackToOriginalPlanner = 
false;
+        connectContext.getSessionVariable().setRuntimeFilterType(8);
+
+        createTable("create table test_rf_v2.t1 ("
+                + "k1 int, "
+                + "v1 int"
+                + ") distributed by hash(k1) buckets 3 "
+                + "properties('replication_num' = '1');");
+
+        createTable("create table test_rf_v2.t2 ("
+                + "k1 int, "
+                + "v1 int"
+                + ") distributed by hash(k1) buckets 3 "
+                + "properties('replication_num' = '1');");
+
+        createTable("create table test_rf_v2.t3 ("
+                + "k1 int, "
+                + "v1 int"
+                + ") distributed by hash(k1) buckets 3 "
+                + "properties('replication_num' = '1');");
+    }
+
+    /**
+     * Verify that runtime filters produced by INTERSECT correctly compute
+     * hasRemoteTargets / hasLocalTargets (not both false, which was the bug).
+     * Before the fix, the V2 translator used the 2-arg RuntimeFilterTarget
+     * constructor which hard-coded isLocalTarget=false, causing all filters
+     * to be incorrectly marked as remote-only.
+     */
+    @Test
+    public void testIntersectRuntimeFilterLocality() throws Exception {
+        String sql = "select k1 from test_rf_v2.t1 intersect select k1 from 
test_rf_v2.t2";
+        Planner planner = getSQLPlanner(sql);
+        Assertions.assertNotNull(planner);
+
+        List<RuntimeFilter> runtimeFilters = planner.getRuntimeFilters();
+        // INTERSECT may or may not generate runtime filters depending on 
stats/cost.
+        // If runtime filters are generated, verify locality flags are 
consistent.
+        for (RuntimeFilter rf : runtimeFilters) {
+            if (rf.getBuilderNode() instanceof SetOperationNode) {
+                verifyLocalityConsistency(rf);
+            }
+        }
+    }
+
+    /**
+     * Verify that runtime filters produced by EXCEPT correctly compute
+     * hasRemoteTargets / hasLocalTargets.
+     */
+    @Test
+    public void testExceptRuntimeFilterLocality() throws Exception {
+        String sql = "select k1 from test_rf_v2.t1 except select k1 from 
test_rf_v2.t2";
+        Planner planner = getSQLPlanner(sql);
+        Assertions.assertNotNull(planner);
+
+        List<RuntimeFilter> runtimeFilters = planner.getRuntimeFilters();
+        for (RuntimeFilter rf : runtimeFilters) {
+            if (rf.getBuilderNode() instanceof SetOperationNode) {
+                verifyLocalityConsistency(rf);
+            }
+        }
+    }
+
+    /**
+     * Multi-child INTERSECT: verify runtime filter locality with 3 branches.
+     */
+    @Test
+    public void testMultiChildIntersectRuntimeFilterLocality() throws 
Exception {
+        String sql = "select k1 from test_rf_v2.t1 "
+                + "intersect select k1 from test_rf_v2.t2 "
+                + "intersect select k1 from test_rf_v2.t3";
+        Planner planner = getSQLPlanner(sql);
+        Assertions.assertNotNull(planner);
+
+        List<RuntimeFilter> runtimeFilters = planner.getRuntimeFilters();
+        for (RuntimeFilter rf : runtimeFilters) {
+            if (rf.getBuilderNode() instanceof SetOperationNode) {
+                verifyLocalityConsistency(rf);
+            }
+        }
+    }
+
+    /**
+     * Verify that hasRemoteTargets and hasLocalTargets are not both true
+     * and not both false (the BE requires exactly one to be true).
+     * Before the fix, hasLocalTargets was always false and hasRemoteTargets
+     * was always true, regardless of actual fragment placement.
+     */

Review Comment:
   Using `rf.toThrift()` to read `has_local_targets` is fragile: `toThrift()` 
is a serialization method that may have side effects or preconditions (e.g., 
requiring `finalize()` to have been called first), and it couples the test to 
the Thrift wire format rather than the Java model. Since the bug is about 
`RuntimeFilterTarget.isLocalTarget`, a cleaner approach would be to add a 
`@VisibleForTesting` method to `RuntimeFilter` that exposes `hasLocalTargets` 
directly, or to inspect `RuntimeFilterTarget.isLocalTarget` on the individual 
target objects if they are accessible.



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RunTimeFilterTranslatorV2.java:
##########
@@ -93,13 +94,22 @@ private void translateRuntimeFilterGroup(PlanNode 
sourceNode,
         Expr srcExpr = 
ExpressionTranslator.translate(head.getSourceExpression(), ctx);
 
         List<RuntimeFilterTarget> targets = new ArrayList<>();
+        boolean isLocalTarget = true;

Review Comment:
   A brief inline comment explaining the all-or-nothing semantics would help 
future readers. Without it, the single shared flag looks like it might be a 
mistake. V1 has the same pattern without explanation, but this is the 
translation of a subtle BE constraint. Something like:
   ```java
   // Compute locality once for the whole group: if any target is in a 
different fragment
   // or is a CTE scan, mark all targets as remote. BE requires 
has_remote_targets and
   // has_local_targets to never both be true for the same RuntimeFilter.
   ```



##########
fe/fe-core/src/test/java/org/apache/doris/nereids/glue/translator/RunTimeFilterTranslatorV2Test.java:
##########
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.glue.translator;
+
+import org.apache.doris.planner.Planner;
+import org.apache.doris.planner.RuntimeFilter;
+import org.apache.doris.planner.SetOperationNode;
+import org.apache.doris.utframe.TestWithFeService;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+/**
+ * Tests for RunTimeFilterTranslatorV2, specifically verifying that runtime 
filter
+ * locality (isLocalTarget) is correctly computed for INTERSECT/EXCEPT 
operations.
+ */
+public class RunTimeFilterTranslatorV2Test extends TestWithFeService {
+
+    @Override
+    protected void runBeforeAll() throws Exception {
+        createDatabase("test_rf_v2");
+        connectContext.getSessionVariable().enableFallbackToOriginalPlanner = 
false;
+        connectContext.getSessionVariable().setRuntimeFilterType(8);
+
+        createTable("create table test_rf_v2.t1 ("
+                + "k1 int, "
+                + "v1 int"
+                + ") distributed by hash(k1) buckets 3 "
+                + "properties('replication_num' = '1');");
+
+        createTable("create table test_rf_v2.t2 ("
+                + "k1 int, "
+                + "v1 int"
+                + ") distributed by hash(k1) buckets 3 "
+                + "properties('replication_num' = '1');");
+
+        createTable("create table test_rf_v2.t3 ("
+                + "k1 int, "
+                + "v1 int"
+                + ") distributed by hash(k1) buckets 3 "
+                + "properties('replication_num' = '1');");
+    }
+
+    /**
+     * Verify that runtime filters produced by INTERSECT correctly compute
+     * hasRemoteTargets / hasLocalTargets (not both false, which was the bug).
+     * Before the fix, the V2 translator used the 2-arg RuntimeFilterTarget
+     * constructor which hard-coded isLocalTarget=false, causing all filters
+     * to be incorrectly marked as remote-only.
+     */
+    @Test
+    public void testIntersectRuntimeFilterLocality() throws Exception {
+        String sql = "select k1 from test_rf_v2.t1 intersect select k1 from 
test_rf_v2.t2";
+        Planner planner = getSQLPlanner(sql);
+        Assertions.assertNotNull(planner);
+
+        List<RuntimeFilter> runtimeFilters = planner.getRuntimeFilters();
+        // INTERSECT may or may not generate runtime filters depending on 
stats/cost.
+        // If runtime filters are generated, verify locality flags are 
consistent.
+        for (RuntimeFilter rf : runtimeFilters) {
+            if (rf.getBuilderNode() instanceof SetOperationNode) {
+                verifyLocalityConsistency(rf);

Review Comment:
   The comment "INTERSECT may or may not generate runtime filters depending on 
stats/cost" effectively admits the test can pass vacuously with an empty filter 
list. If `runtimeFilters` is empty (or contains no `SetOperationNode`-built 
filters), the `for` loop body never runs and the test proves nothing.
   
   Consider adding an assertion that at least one filter was actually verified:
   ```java
   long checked = runtimeFilters.stream()
       .filter(rf -> rf.getBuilderNode() instanceof SetOperationNode)
       .peek(this::verifyLocalityConsistency)
       .count();
   Assertions.assertTrue(checked > 0, "Expected at least one SetOperationNode 
runtime filter to be present");
   ```
   Alternatively, pin `enable_runtime_filter_prune=false` or use a session 
variable that guarantees filter generation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to