This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new bcfbb68c75d branch-4.0: [fix](be) Pick #63969 and #63136 (#64116)
bcfbb68c75d is described below
commit bcfbb68c75d05cd02d3893d9077354803884fc26
Author: Pxl <[email protected]>
AuthorDate: Wed Jun 10 17:42:37 2026 +0800
branch-4.0: [fix](be) Pick #63969 and #63136 (#64116)
---
be/src/pipeline/exec/scan_operator.cpp | 5 --
be/src/pipeline/exec/scan_operator.h | 5 --
be/src/runtime/runtime_predicate.cpp | 1 +
be/test/runtime/runtime_predicate_test.cpp | 87 ++++++++++++++++++++++
.../main/java/org/apache/doris/qe/Coordinator.java | 1 -
.../apache/doris/qe/ResultReceiverConsumer.java | 7 +-
.../apache/doris/qe/runtime/QueryProcessor.java | 4 +
.../doris/qe/runtime/ThriftPlansBuilder.java | 9 +--
.../org/apache/doris/qe/OldCoordinatorTest.java | 31 ++++++++
.../doris/qe/ResultReceiverConsumerTest.java | 10 +++
.../doris/qe/runtime/ThriftPlansBuilderTest.java | 39 ++++++++++
11 files changed, 180 insertions(+), 19 deletions(-)
diff --git a/be/src/pipeline/exec/scan_operator.cpp
b/be/src/pipeline/exec/scan_operator.cpp
index 21334955100..681c6ef353d 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -1224,11 +1224,6 @@ Status
ScanOperatorX<LocalStateType>::prepare(RuntimeState* state) {
_slot_id_to_slot_desc[slot->id()] = slot;
}
for (auto id : topn_filter_source_node_ids) {
- if (!state->get_query_ctx()->has_runtime_predicate(id)) {
- // compatible with older versions fe
- continue;
- }
-
int cid = -1;
if
(state->get_query_ctx()->get_runtime_predicate(id).target_is_slot(node_id())) {
auto s = _slot_id_to_slot_desc[state->get_query_ctx()
diff --git a/be/src/pipeline/exec/scan_operator.h
b/be/src/pipeline/exec/scan_operator.h
index aef51db57db..7604d11d41f 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -172,11 +172,6 @@ class ScanLocalState : public ScanLocalStateBase {
std::vector<int> get_topn_filter_source_node_ids(RuntimeState* state, bool
push_down) {
std::vector<int> result;
for (int id : _parent->cast<typename
Derived::Parent>().topn_filter_source_node_ids) {
- if (!state->get_query_ctx()->has_runtime_predicate(id)) {
- // compatible with older versions fe
- continue;
- }
-
const auto& pred =
state->get_query_ctx()->get_runtime_predicate(id);
if (!pred.enable()) {
continue;
diff --git a/be/src/runtime/runtime_predicate.cpp
b/be/src/runtime/runtime_predicate.cpp
index 36e3e6cd5e6..b0e5186b55e 100644
--- a/be/src/runtime/runtime_predicate.cpp
+++ b/be/src/runtime/runtime_predicate.cpp
@@ -59,6 +59,7 @@ Status RuntimePredicate::init_target(
int32_t target_node_id, phmap::flat_hash_map<int, SlotDescriptor*>
slot_id_to_slot_desc,
const int column_id) {
if (column_id < 0) {
+ _detected_target = true;
return Status::OK();
}
std::unique_lock<std::shared_mutex> wlock(_rwlock);
diff --git a/be/test/runtime/runtime_predicate_test.cpp
b/be/test/runtime/runtime_predicate_test.cpp
new file mode 100644
index 00000000000..86ce0c357a8
--- /dev/null
+++ b/be/test/runtime/runtime_predicate_test.cpp
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/runtime_predicate.h"
+
+#include <gtest/gtest.h>
+
+#include "pipeline/thrift_builder.h"
+#include "runtime/descriptors.h"
+#include "vec/core/field.h"
+#include "vec/data_types/data_type_factory.hpp"
+
+namespace doris {
+namespace {
+
+constexpr TPlanNodeId SOURCE_NODE_ID = 10;
+constexpr TPlanNodeId TARGET_NODE_ID = 20;
+constexpr SlotId SLOT_ID = 0;
+
+TTopnFilterDesc create_topn_filter_desc() {
+ auto target_expr = TRuntimeFilterDescBuilder::get_default_expr();
+
+ TTopnFilterDesc desc;
+ desc.__set_source_node_id(SOURCE_NODE_ID);
+ desc.__set_is_asc(true);
+ desc.__set_null_first(false);
+ desc.__set_target_node_id_to_target_expr({{TARGET_NODE_ID, target_expr}});
+ return desc;
+}
+
+SlotDescriptor create_int_slot_descriptor() {
+ SlotDescriptor slot_desc;
+ slot_desc._id = SLOT_ID;
+ slot_desc._col_name = "k1";
+ slot_desc._type = vectorized::DataTypeFactory::instance().create_data_type(
+ PrimitiveType::TYPE_INT, false);
+ return slot_desc;
+}
+
+} // namespace
+
+TEST(RuntimePredicateTest,
init_target_creates_column_predicate_for_valid_column_id) {
+ vectorized::RuntimePredicate predicate(create_topn_filter_desc());
+ predicate.set_detected_source();
+
+ auto slot_desc = create_int_slot_descriptor();
+ phmap::flat_hash_map<int, SlotDescriptor*> slot_id_to_slot_desc;
+ slot_id_to_slot_desc[SLOT_ID] = &slot_desc;
+
+ ASSERT_TRUE(predicate.init_target(TARGET_NODE_ID, slot_id_to_slot_desc,
0).ok());
+
+ EXPECT_TRUE(predicate.enable());
+ EXPECT_EQ("k1", predicate.get_col_name(TARGET_NODE_ID));
+ EXPECT_NE(nullptr, predicate.get_predicate(TARGET_NODE_ID));
+}
+
+TEST(RuntimePredicateTest,
init_target_without_column_predicate_still_enables_runtime_filter) {
+ vectorized::RuntimePredicate predicate(create_topn_filter_desc());
+ predicate.set_detected_source();
+
+ phmap::flat_hash_map<int, SlotDescriptor*> slot_id_to_slot_desc;
+ ASSERT_TRUE(predicate.init_target(TARGET_NODE_ID, slot_id_to_slot_desc,
-1).ok());
+
+ EXPECT_TRUE(predicate.enable());
+ EXPECT_EQ(nullptr, predicate.get_predicate(TARGET_NODE_ID));
+
+ auto top_value = vectorized::Field::create_field<TYPE_INT>(10);
+ ASSERT_TRUE(predicate.update(top_value).ok());
+ EXPECT_TRUE(predicate.has_value());
+ EXPECT_EQ(top_value, predicate.get_value());
+}
+
+} // namespace doris
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index b1e710e0ec0..c4d11c7adbf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -3215,7 +3215,6 @@ public class Coordinator implements CoordInterface {
Map<TNetworkAddress, TPipelineFragmentParams> toThrift(int backendNum)
{
Set<SortNode> topnSortNodes = scanNodes.stream()
- .filter(scanNode -> scanNode instanceof OlapScanNode)
.flatMap(scanNode ->
scanNode.getTopnFilterSortNodes().stream()).collect(Collectors.toSet());
topnSortNodes.forEach(SortNode::setHasRuntimePredicate);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiverConsumer.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiverConsumer.java
index 88403083c54..54fe0dff977 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiverConsumer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiverConsumer.java
@@ -82,15 +82,18 @@ public class ResultReceiverConsumer {
ReceiverContext context = new
ReceiverContext(resultReceivers.get(i), i);
contexts.add(context);
}
- this.readyOffsets = new ArrayBlockingQueue<>(resultReceivers.size());
+ this.readyOffsets = new ArrayBlockingQueue<>(Math.max(1,
resultReceivers.size()));
timeoutTs = timeoutDeadline;
}
public boolean isEos() {
- return finishedReceivers == contexts.size();
+ return !contexts.isEmpty() && finishedReceivers == contexts.size();
}
public RowBatch getNext(Status status) throws TException,
InterruptedException, ExecutionException, UserException {
+ if (contexts.isEmpty()) {
+ throw new UserException("There is no receiver.");
+ }
if (!futureInitialized) {
futureInitialized = true;
for (ReceiverContext context : contexts) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/QueryProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/QueryProcessor.java
index 4ab7f041f59..88ceb432c2e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/QueryProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/QueryProcessor.java
@@ -86,10 +86,14 @@ public class QueryProcessor extends AbstractJobProcessor {
}
boolean regenerateInstanceId =
coordinatorContext.connectContext.consumeNeedRegenerateQueryId();
+ boolean returnResultFromLocal =
coordinatorContext.connectContext.isReturnResultFromLocal();
for (AssignedJob topInstance : distinctWorkerJobs.values()) {
if (regenerateInstanceId) {
topInstance.resetInstanceId(coordinatorContext.connectContext.nextInstanceId());
}
+ if (!returnResultFromLocal) {
+ continue;
+ }
DistributedPlanWorker topWorker = topInstance.getAssignedWorker();
TNetworkAddress execBeAddr = new TNetworkAddress(topWorker.host(),
topWorker.brpcPort());
receivers.add(
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
index 294432d7620..c1fca4a9536 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
@@ -38,7 +38,6 @@ import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.DataStreamSink;
import org.apache.doris.planner.ExchangeNode;
import org.apache.doris.planner.MultiCastDataSink;
-import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.OlapTableSink;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.ScanNode;
@@ -166,12 +165,10 @@ public class ThriftPlansBuilder {
return workerToInstances;
}
- private static void setRuntimePredicateIfNeed(Collection<ScanNode>
scanNodes) {
+ static void setRuntimePredicateIfNeed(Collection<ScanNode> scanNodes) {
for (ScanNode scanNode : scanNodes) {
- if (scanNode instanceof OlapScanNode) {
- for (SortNode topnFilterSortNode :
scanNode.getTopnFilterSortNodes()) {
- topnFilterSortNode.setHasRuntimePredicate();
- }
+ for (SortNode topnFilterSortNode :
scanNode.getTopnFilterSortNodes()) {
+ topnFilterSortNode.setHasRuntimePredicate();
}
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/qe/OldCoordinatorTest.java
b/fe/fe-core/src/test/java/org/apache/doris/qe/OldCoordinatorTest.java
index b8cb5b74165..df062e02c1f 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/OldCoordinatorTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/OldCoordinatorTest.java
@@ -17,16 +17,24 @@
package org.apache.doris.qe;
+import org.apache.doris.analysis.DescriptorTable;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.PlanFragmentId;
import org.apache.doris.planner.PlanNode;
+import org.apache.doris.planner.ScanNode;
+import org.apache.doris.planner.SortNode;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TPlanFragment;
+import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.utframe.TestWithFeService;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -99,4 +107,27 @@ public class OldCoordinatorTest extends TestWithFeService {
}.test();
Assertions.assertTrue(shuffleFragmentHasMultiInstances.get());
}
+
+ @Test
+ public void testFragmentExecParamsMarksNonOlapTopnFilterSource() {
+ ScanNode scanNode = Mockito.mock(ScanNode.class);
+ SortNode sortNode = Mockito.mock(SortNode.class);
+
Mockito.when(scanNode.getTopnFilterSortNodes()).thenReturn(Collections.singletonList(sortNode));
+
+ PlanFragment fragment = Mockito.mock(PlanFragment.class);
+ Mockito.when(fragment.getFragmentId()).thenReturn(new
PlanFragmentId(0));
+ Mockito.when(fragment.toThrift()).thenReturn(new TPlanFragment());
+
Mockito.when(fragment.isTransferQueryStatisticsWithEveryBatch()).thenReturn(false);
+
+ Coordinator.FragmentExecParams fragParams = new Coordinator(0L, new
TUniqueId(1L, 1L),
+ new DescriptorTable(), Collections.singletonList(fragment),
Collections.singletonList(scanNode),
+ "UTC", false, false).new FragmentExecParams(fragment);
+ TNetworkAddress host = new TNetworkAddress("127.0.0.1", 9060);
+ fragParams.instanceExecParams.add(
+ new Coordinator.FInstanceExecParam(new TUniqueId(2L, 2L),
host, fragParams));
+
+ fragParams.toThrift(0);
+
+ Mockito.verify(sortNode).setHasRuntimePredicate();
+ }
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/qe/ResultReceiverConsumerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/qe/ResultReceiverConsumerTest.java
index e67069209a0..3f76d4227b0 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/qe/ResultReceiverConsumerTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/qe/ResultReceiverConsumerTest.java
@@ -40,6 +40,16 @@ public class ResultReceiverConsumerTest {
@Injectable
private ResultReceiver receiver3;
+ @Test
+ public void testEmptyReceiversForRemoteResult() throws Exception {
+ ResultReceiverConsumer consumer = new
ResultReceiverConsumer(Lists.newArrayList(),
+ System.currentTimeMillis() + 3600);
+ Status status = new Status();
+
+ Assert.assertFalse(consumer.isEos());
+ Assertions.assertThrows(UserException.class, () ->
consumer.getNext(status));
+ }
+
@Test
public void testEosHandling() throws Exception {
ResultReceiverConsumer consumer = new ResultReceiverConsumer(
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/qe/runtime/ThriftPlansBuilderTest.java
b/fe/fe-core/src/test/java/org/apache/doris/qe/runtime/ThriftPlansBuilderTest.java
new file mode 100644
index 00000000000..88140c0cb2e
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/qe/runtime/ThriftPlansBuilderTest.java
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe.runtime;
+
+import org.apache.doris.planner.ScanNode;
+import org.apache.doris.planner.SortNode;
+
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.Collections;
+
+public class ThriftPlansBuilderTest {
+ @Test
+ public void testSetRuntimePredicateForNonOlapScanNode() {
+ ScanNode scanNode = Mockito.mock(ScanNode.class);
+ SortNode sortNode = Mockito.mock(SortNode.class);
+
Mockito.when(scanNode.getTopnFilterSortNodes()).thenReturn(Collections.singletonList(sortNode));
+
+
ThriftPlansBuilder.setRuntimePredicateIfNeed(Collections.singletonList(scanNode));
+
+ Mockito.verify(sortNode).setHasRuntimePredicate();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]