This is an automated email from the ASF dual-hosted git repository.
huajianlan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 2cde7b0839d [fix](nereids) fix NereidsCoordinator compute wrong result
when exists CTE (#44753)
2cde7b0839d is described below
commit 2cde7b0839d1607ef6dd619b7d9db95bc4c211ee
Author: 924060929 <[email protected]>
AuthorDate: Mon Dec 2 16:41:17 2024 +0800
[fix](nereids) fix NereidsCoordinator compute wrong result when exists CTE
(#44753)
fix NereidsCoordinator compute wrong result when exists CTE, introduced
by #41730
---
.../doris/qe/runtime/ThriftPlansBuilder.java | 11 ++--
.../distribute/test_multicast_sink.groovy | 60 ++++++++++++++++++++++
2 files changed, 68 insertions(+), 3 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
index a02ee90e901..54bc0b24d3e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
@@ -240,14 +240,19 @@ public class ThriftPlansBuilder {
return senderNum;
}
- private static void setMultiCastDestinationThrift(PipelineDistributedPlan
fragmentPlan) {
+ private static void
setMultiCastDestinationThriftIfNotSet(PipelineDistributedPlan fragmentPlan) {
MultiCastDataSink multiCastDataSink = (MultiCastDataSink)
fragmentPlan.getFragmentJob().getFragment().getSink();
List<List<TPlanFragmentDestination>> destinationList =
multiCastDataSink.getDestinations();
List<DataStreamSink> dataStreamSinks =
multiCastDataSink.getDataStreamSinks();
for (int i = 0; i < dataStreamSinks.size(); i++) {
- DataStreamSink realSink = dataStreamSinks.get(i);
List<TPlanFragmentDestination> destinations =
destinationList.get(i);
+ if (!destinations.isEmpty()) {
+ // we should only set destination only once,
+ // because all backends share the same MultiCastDataSink object
+ continue;
+ }
+ DataStreamSink realSink = dataStreamSinks.get(i);
for (Entry<DataSink, List<AssignedJob>> kv :
fragmentPlan.getDestinations().entrySet()) {
DataSink sink = kv.getKey();
if (sink == realSink) {
@@ -318,7 +323,7 @@ public class ThriftPlansBuilder {
List<TPlanFragmentDestination> nonMultiCastDestinations;
if (fragment.getSink() instanceof MultiCastDataSink) {
nonMultiCastDestinations = Lists.newArrayList();
- setMultiCastDestinationThrift(fragmentPlan);
+ setMultiCastDestinationThriftIfNotSet(fragmentPlan);
} else {
nonMultiCastDestinations =
nonMultiCastDestinationToThrift(fragmentPlan);
}
diff --git
a/regression-test/suites/nereids_syntax_p0/distribute/test_multicast_sink.groovy
b/regression-test/suites/nereids_syntax_p0/distribute/test_multicast_sink.groovy
new file mode 100644
index 00000000000..eeeaad06d5e
--- /dev/null
+++
b/regression-test/suites/nereids_syntax_p0/distribute/test_multicast_sink.groovy
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_multicast_sink") {
+ multi_sql """
+ drop table if exists
table_1_undef_partitions2_keys3_properties4_distributed_by5;
+ CREATE TABLE
`table_1_undef_partitions2_keys3_properties4_distributed_by5` (
+ `col_int_undef_signed` int NULL,
+ `col_int_undef_signed_not_null` int NOT NULL,
+ `col_date_undef_signed` date NULL,
+ `col_date_undef_signed_not_null` date NOT NULL,
+ `col_varchar_10__undef_signed` varchar(10) NULL,
+ `col_varchar_10__undef_signed_not_null` varchar(10) NOT NULL,
+ `col_varchar_1024__undef_signed` varchar(1024) NULL,
+ `col_varchar_1024__undef_signed_not_null` varchar(1024) NOT NULL,
+ `pk` int NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`col_int_undef_signed`,
`col_int_undef_signed_not_null`, `col_date_undef_signed`)
+ DISTRIBUTED BY HASH(`pk`) BUCKETS 10
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ );
+ insert into
table_1_undef_partitions2_keys3_properties4_distributed_by5 values(3, 6,
'2023-12-17', '2023-12-17', 'ok', 'v', 'want', 'z', 0);
+ set enable_nereids_distribute_planner=true;
+ set parallel_pipeline_task_num = 1;
+ """
+
+ for (def i in 0..<100) {
+ test {
+ sql """
+ WITH cte1 AS(
+ SELECT t1.`pk`
+ FROM
table_1_undef_partitions2_keys3_properties4_distributed_by5 AS t1
+ ORDER BY t1.pk
+ )
+ SELECT cte1.`pk` AS pk1
+ FROM cte1
+ LEFT OUTER JOIN cte1 AS alias1
+ ON cte1 . `pk` = alias1 . `pk`
+ WHERE cte1.`pk` < 3
+ LIMIT 66666666
+ """
+ result([[0]])
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]