This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new a841471d08a branch-4.1: [fix](be) Pick #63417 and #63969 (#64117)
a841471d08a is described below
commit a841471d08a974120ea0ca6652cbf79860d6296e
Author: Pxl <[email protected]>
AuthorDate: Mon Jun 8 18:26:15 2026 +0800
branch-4.1: [fix](be) Pick #63417 and #63969 (#64117)
## Summary
- Pick #63417 to branch-4.1.
- Pick #63969 to branch-4.1.
- Do not include #62854 in this PR because branch-4.1 does not have the
offset-only prerequisite infrastructure (`ACCESS_STRING_OFFSET`,
`only_read_offsets`; prerequisites such as #61888/#62205 are not on
branch-4.0/4.1). Direct conflict resolution would effectively backport a
larger optimization stack.
## Testing
- `build-support/check-format.sh`
- `./run-be-ut.sh --run --filter=RuntimePredicateTest.*`
- `./run-fe-ut.sh --run
org.apache.doris.qe.runtime.ThriftPlansBuilderTest,org.apache.doris.qe.OldCoordinatorTest`
---------
Co-authored-by: Copilot <[email protected]>
---
be/src/core/assert_cast.h | 2 +-
be/src/exec/operator/scan_operator.cpp | 5 --
be/src/exec/operator/scan_operator.h | 5 --
be/src/runtime/runtime_predicate.cpp | 1 +
be/test/runtime/runtime_predicate_test.cpp | 86 ++++++++++++++++++++++
.../main/java/org/apache/doris/qe/Coordinator.java | 1 -
.../doris/qe/runtime/ThriftPlansBuilder.java | 9 +--
.../org/apache/doris/qe/OldCoordinatorTest.java | 31 ++++++++
.../doris/qe/runtime/ThriftPlansBuilderTest.java | 39 ++++++++++
9 files changed, 161 insertions(+), 18 deletions(-)
diff --git a/be/src/core/assert_cast.h b/be/src/core/assert_cast.h
index acadf8705a7..d54f0ff31ba 100644
--- a/be/src/core/assert_cast.h
+++ b/be/src/core/assert_cast.h
@@ -50,7 +50,7 @@ using AssertCastClassType_t =
std::remove_pointer_t<AssertCastNormalizedType_t<T
* The exact match of the type is checked. That is, cast to the ancestor will
be unsuccessful.
*/
template <typename To, TypeCheckOnRelease check = TypeCheckOnRelease::ENABLE,
typename From>
-PURE To assert_cast(From&& from) {
+To assert_cast(From&& from) {
static_assert(!std::is_same_v<AssertCastNormalizedType_t<To>,
AssertCastNormalizedType_t<From>>,
"assert_cast is redundant for the same type after removing
cv/ref qualifiers");
static_assert(std::is_class_v<AssertCastClassType_t<To>> &&
diff --git a/be/src/exec/operator/scan_operator.cpp
b/be/src/exec/operator/scan_operator.cpp
index 87c02aab2b6..b26175686b7 100644
--- a/be/src/exec/operator/scan_operator.cpp
+++ b/be/src/exec/operator/scan_operator.cpp
@@ -1232,11 +1232,6 @@ Status
ScanOperatorX<LocalStateType>::prepare(RuntimeState* state) {
_slot_id_to_slot_desc[slot->id()] = slot;
}
for (auto id : _topn_filter_source_node_ids) {
- if (!state->get_query_ctx()->has_runtime_predicate(id)) {
- // compatible with older versions fe
- continue;
- }
-
int cid = -1;
if
(state->get_query_ctx()->get_runtime_predicate(id).target_is_slot(node_id())) {
auto s = _slot_id_to_slot_desc[state->get_query_ctx()
diff --git a/be/src/exec/operator/scan_operator.h
b/be/src/exec/operator/scan_operator.h
index cfffa5a50b3..8f2ad826956 100644
--- a/be/src/exec/operator/scan_operator.h
+++ b/be/src/exec/operator/scan_operator.h
@@ -254,11 +254,6 @@ class ScanLocalState : public ScanLocalStateBase {
std::vector<int> get_topn_filter_source_node_ids(RuntimeState* state, bool
push_down) {
std::vector<int> result;
for (int id : _parent->cast<typename
Derived::Parent>()._topn_filter_source_node_ids) {
- if (!state->get_query_ctx()->has_runtime_predicate(id)) {
- // compatible with older versions fe
- continue;
- }
-
const auto& pred =
state->get_query_ctx()->get_runtime_predicate(id);
if (!pred.enable()) {
continue;
diff --git a/be/src/runtime/runtime_predicate.cpp
b/be/src/runtime/runtime_predicate.cpp
index 7d96d831600..8c9e040f7a5 100644
--- a/be/src/runtime/runtime_predicate.cpp
+++ b/be/src/runtime/runtime_predicate.cpp
@@ -59,6 +59,7 @@ Status RuntimePredicate::init_target(
int32_t target_node_id, phmap::flat_hash_map<int, SlotDescriptor*>
slot_id_to_slot_desc,
const int column_id) {
if (column_id < 0) {
+ _detected_target = true;
return Status::OK();
}
std::unique_lock<std::shared_mutex> wlock(_rwlock);
diff --git a/be/test/runtime/runtime_predicate_test.cpp
b/be/test/runtime/runtime_predicate_test.cpp
new file mode 100644
index 00000000000..77e65b569a4
--- /dev/null
+++ b/be/test/runtime/runtime_predicate_test.cpp
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/runtime_predicate.h"
+
+#include <gtest/gtest.h>
+
+#include "core/data_type/data_type_factory.hpp"
+#include "core/field.h"
+#include "exec/pipeline/thrift_builder.h"
+#include "runtime/descriptors.h"
+
+namespace doris {
+namespace {
+
+constexpr TPlanNodeId SOURCE_NODE_ID = 10;
+constexpr TPlanNodeId TARGET_NODE_ID = 20;
+constexpr SlotId SLOT_ID = 0;
+
+TTopnFilterDesc create_topn_filter_desc() {
+ auto target_expr = TRuntimeFilterDescBuilder::get_default_expr();
+
+ TTopnFilterDesc desc;
+ desc.__set_source_node_id(SOURCE_NODE_ID);
+ desc.__set_is_asc(true);
+ desc.__set_null_first(false);
+ desc.__set_target_node_id_to_target_expr({{TARGET_NODE_ID, target_expr}});
+ return desc;
+}
+
+SlotDescriptor create_int_slot_descriptor() {
+ SlotDescriptor slot_desc;
+ slot_desc._id = SLOT_ID;
+ slot_desc._col_name = "k1";
+ slot_desc._type =
DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_INT, false);
+ return slot_desc;
+}
+
+} // namespace
+
+TEST(RuntimePredicateTest,
init_target_creates_column_predicate_for_valid_column_id) {
+ RuntimePredicate predicate(create_topn_filter_desc());
+ predicate.set_detected_source();
+
+ auto slot_desc = create_int_slot_descriptor();
+ phmap::flat_hash_map<int, SlotDescriptor*> slot_id_to_slot_desc;
+ slot_id_to_slot_desc[SLOT_ID] = &slot_desc;
+
+ ASSERT_TRUE(predicate.init_target(TARGET_NODE_ID, slot_id_to_slot_desc,
0).ok());
+
+ EXPECT_TRUE(predicate.enable());
+ EXPECT_EQ("k1", predicate.get_col_name(TARGET_NODE_ID));
+ EXPECT_NE(nullptr, predicate.get_predicate(TARGET_NODE_ID));
+}
+
+TEST(RuntimePredicateTest,
init_target_without_column_predicate_still_enables_runtime_filter) {
+ RuntimePredicate predicate(create_topn_filter_desc());
+ predicate.set_detected_source();
+
+ phmap::flat_hash_map<int, SlotDescriptor*> slot_id_to_slot_desc;
+ ASSERT_TRUE(predicate.init_target(TARGET_NODE_ID, slot_id_to_slot_desc,
-1).ok());
+
+ EXPECT_TRUE(predicate.enable());
+ EXPECT_EQ(nullptr, predicate.get_predicate(TARGET_NODE_ID));
+
+ auto top_value = Field::create_field<TYPE_INT>(10);
+ ASSERT_TRUE(predicate.update(top_value).ok());
+ EXPECT_TRUE(predicate.has_value());
+ EXPECT_EQ(top_value, predicate.get_value());
+}
+
+} // namespace doris
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index dac24f2f798..0449ffcdfb4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -3241,7 +3241,6 @@ public class Coordinator implements CoordInterface {
Map<TNetworkAddress, TPipelineFragmentParams> toThrift(int backendNum)
{
Set<SortNode> topnSortNodes = scanNodes.stream()
- .filter(scanNode -> scanNode instanceof OlapScanNode)
.flatMap(scanNode ->
scanNode.getTopnFilterSortNodes().stream()).collect(Collectors.toSet());
topnSortNodes.forEach(SortNode::setHasRuntimePredicate);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
index 409037e3b61..107d615fb89 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
@@ -39,7 +39,6 @@ import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.DataStreamSink;
import org.apache.doris.planner.ExchangeNode;
import org.apache.doris.planner.MultiCastDataSink;
-import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.OlapTableSink;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.PlanFragmentId;
@@ -188,12 +187,10 @@ public class ThriftPlansBuilder {
return workerToInstances;
}
- private static void setRuntimePredicateIfNeed(Collection<ScanNode>
scanNodes) {
+ static void setRuntimePredicateIfNeed(Collection<ScanNode> scanNodes) {
for (ScanNode scanNode : scanNodes) {
- if (scanNode instanceof OlapScanNode) {
- for (SortNode topnFilterSortNode :
scanNode.getTopnFilterSortNodes()) {
- topnFilterSortNode.setHasRuntimePredicate();
- }
+ for (SortNode topnFilterSortNode :
scanNode.getTopnFilterSortNodes()) {
+ topnFilterSortNode.setHasRuntimePredicate();
}
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/qe/OldCoordinatorTest.java
b/fe/fe-core/src/test/java/org/apache/doris/qe/OldCoordinatorTest.java
index b8cb5b74165..df062e02c1f 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/OldCoordinatorTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/OldCoordinatorTest.java
@@ -17,16 +17,24 @@
package org.apache.doris.qe;
+import org.apache.doris.analysis.DescriptorTable;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.PlanFragmentId;
import org.apache.doris.planner.PlanNode;
+import org.apache.doris.planner.ScanNode;
+import org.apache.doris.planner.SortNode;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TPlanFragment;
+import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.utframe.TestWithFeService;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -99,4 +107,27 @@ public class OldCoordinatorTest extends TestWithFeService {
}.test();
Assertions.assertTrue(shuffleFragmentHasMultiInstances.get());
}
+
+ @Test
+ public void testFragmentExecParamsMarksNonOlapTopnFilterSource() {
+ ScanNode scanNode = Mockito.mock(ScanNode.class);
+ SortNode sortNode = Mockito.mock(SortNode.class);
+
Mockito.when(scanNode.getTopnFilterSortNodes()).thenReturn(Collections.singletonList(sortNode));
+
+ PlanFragment fragment = Mockito.mock(PlanFragment.class);
+ Mockito.when(fragment.getFragmentId()).thenReturn(new
PlanFragmentId(0));
+ Mockito.when(fragment.toThrift()).thenReturn(new TPlanFragment());
+
Mockito.when(fragment.isTransferQueryStatisticsWithEveryBatch()).thenReturn(false);
+
+ Coordinator.FragmentExecParams fragParams = new Coordinator(0L, new
TUniqueId(1L, 1L),
+ new DescriptorTable(), Collections.singletonList(fragment),
Collections.singletonList(scanNode),
+ "UTC", false, false).new FragmentExecParams(fragment);
+ TNetworkAddress host = new TNetworkAddress("127.0.0.1", 9060);
+ fragParams.instanceExecParams.add(
+ new Coordinator.FInstanceExecParam(new TUniqueId(2L, 2L),
host, fragParams));
+
+ fragParams.toThrift(0);
+
+ Mockito.verify(sortNode).setHasRuntimePredicate();
+ }
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/qe/runtime/ThriftPlansBuilderTest.java
b/fe/fe-core/src/test/java/org/apache/doris/qe/runtime/ThriftPlansBuilderTest.java
new file mode 100644
index 00000000000..88140c0cb2e
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/qe/runtime/ThriftPlansBuilderTest.java
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe.runtime;
+
+import org.apache.doris.planner.ScanNode;
+import org.apache.doris.planner.SortNode;
+
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.Collections;
+
+public class ThriftPlansBuilderTest {
+ @Test
+ public void testSetRuntimePredicateForNonOlapScanNode() {
+ ScanNode scanNode = Mockito.mock(ScanNode.class);
+ SortNode sortNode = Mockito.mock(SortNode.class);
+
Mockito.when(scanNode.getTopnFilterSortNodes()).thenReturn(Collections.singletonList(sortNode));
+
+
ThriftPlansBuilder.setRuntimePredicateIfNeed(Collections.singletonList(scanNode));
+
+ Mockito.verify(sortNode).setHasRuntimePredicate();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]