This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 3fc3071fe8356a700ce8bb7f510e87cc17a5fac2
Author: zhangstar333 <[email protected]>
AuthorDate: Wed Sep 18 10:40:52 2024 +0800

    [Bug](exchange) fix tablet sink shuffle without project not match the 
output tuple (#40299)
    
    ```
    INSERT INTO tbl_4 SELECT k1, k2, k2 FROM tbl_1;
    
    the tbl_1 have k1,k2 columns
    the tbl_4 have k1,k2,v columns
    if without project expr, will be only two columns not match the output 
tuple.
    ```
    the co-auther of FE code  from @starocean999
---
 be/src/pipeline/exec/exchange_sink_operator.cpp    | 27 +++++++++++++++++++++-
 be/src/pipeline/exec/exchange_sink_operator.h      |  5 +++-
 .../glue/translator/PhysicalPlanTranslator.java    |  3 +--
 .../plans/commands/insert/OlapInsertExecutor.java  |  1 +
 .../org/apache/doris/planner/DataStreamSink.java   | 10 ++++++++
 gensrc/thrift/DataSinks.thrift                     |  1 +
 .../data/nereids_p0/insert_into_table/random.out   |  3 +++
 .../nereids_p0/insert_into_table/random.groovy     | 11 +++++++++
 8 files changed, 57 insertions(+), 4 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index f1f6c2d0c5d..5c902a32562 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -18,6 +18,7 @@
 #include "exchange_sink_operator.h"
 
 #include <gen_cpp/DataSinks_types.h>
+#include <gen_cpp/Partitions_types.h>
 #include <gen_cpp/Types_types.h>
 #include <gen_cpp/types.pb.h>
 
@@ -190,6 +191,10 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
                 
std::make_unique<vectorized::OlapTabletFinder>(_vpartition.get(), 
find_tablet_mode);
         _tablet_sink_tuple_desc = 
_state->desc_tbl().get_tuple_descriptor(p._tablet_sink_tuple_id);
         _tablet_sink_row_desc = p._pool->add(new 
RowDescriptor(_tablet_sink_tuple_desc, false));
+        _tablet_sink_expr_ctxs.resize(p._tablet_sink_expr_ctxs.size());
+        for (size_t i = 0; i < _tablet_sink_expr_ctxs.size(); i++) {
+            RETURN_IF_ERROR(p._tablet_sink_expr_ctxs[i]->clone(state, 
_tablet_sink_expr_ctxs[i]));
+        }
         // if _part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED, 
we handle the processing of auto_increment column
         // on exchange node rather than on TabletWriter
         _block_convertor =
@@ -206,7 +211,7 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
                  .txn_id = _txn_id,
                  .pool = p._pool.get(),
                  .location = _location,
-                 .vec_output_expr_ctxs = &_fake_expr_ctxs,
+                 .vec_output_expr_ctxs = &_tablet_sink_expr_ctxs,
                  .schema = _schema,
                  .caller = (void*)this,
                  .create_partition_callback = 
&ExchangeSinkLocalState::empty_callback_function});
@@ -297,6 +302,7 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
           _tablet_sink_location(sink.tablet_sink_location),
           _tablet_sink_tuple_id(sink.tablet_sink_tuple_id),
           _tablet_sink_txn_id(sink.tablet_sink_txn_id),
+          _t_tablet_sink_exprs(&sink.tablet_sink_exprs),
           _enable_local_merge_sort(state->enable_local_merge_sort()) {
     DCHECK_GT(destinations.size(), 0);
     DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED ||
@@ -309,6 +315,9 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
            sink.output_partition.type == 
TPartitionType::TABLE_SINK_RANDOM_PARTITIONED);
     _name = "ExchangeSinkOperatorX";
     _pool = std::make_shared<ObjectPool>();
+    if (sink.__isset.output_tuple_id) {
+        _output_tuple_id = sink.output_tuple_id;
+    }
 }
 
 Status ExchangeSinkOperatorX::init(const TDataSink& tsink) {
@@ -316,6 +325,10 @@ Status ExchangeSinkOperatorX::init(const TDataSink& tsink) 
{
     if (_part_type == TPartitionType::RANGE_PARTITIONED) {
         return Status::InternalError("TPartitionType::RANGE_PARTITIONED should 
not be used");
     }
+    if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
+        
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(*_t_tablet_sink_exprs,
+                                                             
_tablet_sink_expr_ctxs));
+    }
     return Status::OK();
 }
 
@@ -328,6 +341,18 @@ Status ExchangeSinkOperatorX::prepare(RuntimeState* state) 
{
 Status ExchangeSinkOperatorX::open(RuntimeState* state) {
     DCHECK(state != nullptr);
     _compression_type = state->fragement_transmission_compression_type();
+    if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
+        if (_output_tuple_id == -1) {
+            RETURN_IF_ERROR(
+                    vectorized::VExpr::prepare(_tablet_sink_expr_ctxs, state, 
_child->row_desc()));
+        } else {
+            auto* output_tuple_desc = 
state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
+            auto* output_row_desc = _pool->add(new 
RowDescriptor(output_tuple_desc, false));
+            RETURN_IF_ERROR(
+                    vectorized::VExpr::prepare(_tablet_sink_expr_ctxs, state, 
*output_row_desc));
+        }
+        RETURN_IF_ERROR(vectorized::VExpr::open(_tablet_sink_expr_ctxs, 
state));
+    }
     return Status::OK();
 }
 
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h 
b/be/src/pipeline/exec/exchange_sink_operator.h
index 1c14f04679e..e253aeadc85 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -183,7 +183,7 @@ private:
 
     // for shuffle data by partition and tablet
     int64_t _txn_id = -1;
-    vectorized::VExprContextSPtrs _fake_expr_ctxs;
+    vectorized::VExprContextSPtrs _tablet_sink_expr_ctxs;
     std::unique_ptr<VOlapTablePartitionParam> _vpartition = nullptr;
     std::unique_ptr<vectorized::OlapTabletFinder> _tablet_finder = nullptr;
     std::shared_ptr<OlapTableSchemaParam> _schema = nullptr;
@@ -240,6 +240,7 @@ private:
     const std::vector<TExpr> _texprs;
 
     const RowDescriptor& _row_desc;
+    TTupleId _output_tuple_id = -1;
 
     TPartitionType::type _part_type;
 
@@ -266,6 +267,8 @@ private:
     const TTupleId _tablet_sink_tuple_id;
     int64_t _tablet_sink_txn_id = -1;
     std::shared_ptr<ObjectPool> _pool;
+    vectorized::VExprContextSPtrs _tablet_sink_expr_ctxs;
+    const std::vector<TExpr>* _t_tablet_sink_exprs = nullptr;
 
     // for external table sink random partition
     // Control the number of channels according to the flow, thereby 
controlling the number of table sink writers.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 55d99e6b50f..960df63a62f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -358,8 +358,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
             MultiCastDataSink multiCastDataSink = (MultiCastDataSink) 
inputFragment.getSink();
             DataStreamSink dataStreamSink = 
multiCastDataSink.getDataStreamSinks().get(
                     multiCastDataSink.getDataStreamSinks().size() - 1);
-            TupleDescriptor tupleDescriptor = 
generateTupleDesc(distribute.getOutput(), null, context);
-            exchangeNode.updateTupleIds(tupleDescriptor);
+            exchangeNode.updateTupleIds(dataStreamSink.getOutputTupleDesc());
             dataStreamSink.setExchNodeId(exchangeNode.getId());
             dataStreamSink.setOutputPartition(dataPartition);
             parentFragment.addChild(inputFragment);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
index b57ac383495..e38ee40bc9a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
@@ -173,6 +173,7 @@ public class OlapInsertExecutor extends 
AbstractInsertExecutor {
                         .createLocation(database.getId(), 
olapTableSink.getDstTable());
                 
dataStreamSink.setTabletSinkLocationParam(locationParams.get(0));
                 dataStreamSink.setTabletSinkTxnId(olapTableSink.getTxnId());
+                dataStreamSink.setTabletSinkExprs(fragment.getOutputExprs());
             }
         } catch (Exception e) {
             throw new AnalysisException(e.getMessage(), e);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java
index b9cf516bc3d..ef42190fa25 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java
@@ -62,6 +62,7 @@ public class DataStreamSink extends DataSink {
     protected TOlapTableLocationParam tabletSinkLocationParam = null;
     protected TupleDescriptor tabletSinkTupleDesc = null;
     protected long tabletSinkTxnId = -1;
+    protected List<Expr> tabletSinkExprs = null;
 
     public DataStreamSink() {
 
@@ -145,6 +146,10 @@ public class DataStreamSink extends DataSink {
         this.tabletSinkLocationParam = locationParam;
     }
 
+    public void setTabletSinkExprs(List<Expr> tabletSinkExprs) {
+        this.tabletSinkExprs = tabletSinkExprs;
+    }
+
     public void setTabletSinkTxnId(long txnId) {
         this.tabletSinkTxnId = txnId;
     }
@@ -224,6 +229,11 @@ public class DataStreamSink extends DataSink {
         if (tabletSinkLocationParam != null) {
             tStreamSink.setTabletSinkLocation(tabletSinkLocationParam);
         }
+        if (tabletSinkExprs != null) {
+            for (Expr expr : tabletSinkExprs) {
+                tStreamSink.addToTabletSinkExprs(expr.treeToThrift());
+            }
+        }
         tStreamSink.setTabletSinkTxnId(tabletSinkTxnId);
         result.setStreamSink(tStreamSink);
         return result;
diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift
index d509e9816e7..91e3c06ac4b 100644
--- a/gensrc/thrift/DataSinks.thrift
+++ b/gensrc/thrift/DataSinks.thrift
@@ -181,6 +181,7 @@ struct TDataStreamSink {
   10: optional Descriptors.TOlapTableLocationParam tablet_sink_location
   11: optional i64 tablet_sink_txn_id
   12: optional Types.TTupleId tablet_sink_tuple_id
+  13: optional list<Exprs.TExpr> tablet_sink_exprs
 }
 
 struct TMultiCastDataStreamSink {
diff --git a/regression-test/data/nereids_p0/insert_into_table/random.out 
b/regression-test/data/nereids_p0/insert_into_table/random.out
index d42426a991f..dd5bdc8e1d9 100644
--- a/regression-test/data/nereids_p0/insert_into_table/random.out
+++ b/regression-test/data/nereids_p0/insert_into_table/random.out
@@ -135,3 +135,6 @@
 13     12      20480.0 48640045.000000 10944010779     2012-03-12      
2012-03-12T12:11:12     22.634
 13     12      20480.0 48640045.000000 10944010779     2012-03-12      
2012-03-12T12:11:12     22.634
 
+-- !sql_select --
+1      11      11
+
diff --git a/regression-test/suites/nereids_p0/insert_into_table/random.groovy 
b/regression-test/suites/nereids_p0/insert_into_table/random.groovy
index 6cc5cb2b991..f820ca89bd2 100644
--- a/regression-test/suites/nereids_p0/insert_into_table/random.groovy
+++ b/regression-test/suites/nereids_p0/insert_into_table/random.groovy
@@ -43,4 +43,15 @@ suite('nereids_insert_random') {
     sql 'set delete_without_partition=true'
     sql '''delete from dup_t_type_cast_rd where id is not null'''
     sql '''delete from dup_t_type_cast_rd where id is null'''
+
+    sql 'set enable_strict_consistency_dml=true'
+    sql 'drop table if exists tbl_1'
+    sql 'drop table if exists tbl_4'
+    sql """CREATE TABLE tbl_1 (k1 INT, k2 INT) DISTRIBUTED BY HASH(k1) BUCKETS 
10 PROPERTIES ( "light_schema_change" = "false", "replication_num" = "1");"""
+    sql """INSERT INTO tbl_1 VALUES (1, 11);"""
+    sql 'sync'
+    sql """CREATE TABLE tbl_4 (k1 INT, k2 INT, v INT SUM) AGGREGATE KEY (k1, 
k2) DISTRIBUTED BY HASH(k1) BUCKETS 10  PROPERTIES ( "replication_num" = "1"); 
""" 
+    sql """INSERT INTO tbl_4 SELECT k1, k2, k2 FROM tbl_1;"""
+    sql 'sync'
+    qt_sql_select """ select * from tbl_4; """;
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to