This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 3fc3071fe8356a700ce8bb7f510e87cc17a5fac2 Author: zhangstar333 <[email protected]> AuthorDate: Wed Sep 18 10:40:52 2024 +0800 [Bug](exchange) fix tablet sink shuffle without project not match the output tuple (#40299) ``` INSERT INTO tbl_4 SELECT k1, k2, k2 FROM tbl_1; the tbl_1 have k1,k2 columns the tbl_4 have k1,k2,v columns if without project expr, will be only two columns not match the output tuple. ``` the co-auther of FE code from @starocean999 --- be/src/pipeline/exec/exchange_sink_operator.cpp | 27 +++++++++++++++++++++- be/src/pipeline/exec/exchange_sink_operator.h | 5 +++- .../glue/translator/PhysicalPlanTranslator.java | 3 +-- .../plans/commands/insert/OlapInsertExecutor.java | 1 + .../org/apache/doris/planner/DataStreamSink.java | 10 ++++++++ gensrc/thrift/DataSinks.thrift | 1 + .../data/nereids_p0/insert_into_table/random.out | 3 +++ .../nereids_p0/insert_into_table/random.groovy | 11 +++++++++ 8 files changed, 57 insertions(+), 4 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index f1f6c2d0c5d..5c902a32562 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -18,6 +18,7 @@ #include "exchange_sink_operator.h" #include <gen_cpp/DataSinks_types.h> +#include <gen_cpp/Partitions_types.h> #include <gen_cpp/Types_types.h> #include <gen_cpp/types.pb.h> @@ -190,6 +191,10 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { std::make_unique<vectorized::OlapTabletFinder>(_vpartition.get(), find_tablet_mode); _tablet_sink_tuple_desc = _state->desc_tbl().get_tuple_descriptor(p._tablet_sink_tuple_id); _tablet_sink_row_desc = p._pool->add(new RowDescriptor(_tablet_sink_tuple_desc, false)); + _tablet_sink_expr_ctxs.resize(p._tablet_sink_expr_ctxs.size()); + for (size_t i = 0; i < _tablet_sink_expr_ctxs.size(); i++) { + RETURN_IF_ERROR(p._tablet_sink_expr_ctxs[i]->clone(state, _tablet_sink_expr_ctxs[i])); + } // if _part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED, we handle the processing of auto_increment column // on exchange node rather than on TabletWriter _block_convertor = @@ -206,7 +211,7 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { .txn_id = _txn_id, .pool = p._pool.get(), .location = _location, - .vec_output_expr_ctxs = &_fake_expr_ctxs, + .vec_output_expr_ctxs = &_tablet_sink_expr_ctxs, .schema = _schema, .caller = (void*)this, .create_partition_callback = &ExchangeSinkLocalState::empty_callback_function}); @@ -297,6 +302,7 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX( _tablet_sink_location(sink.tablet_sink_location), _tablet_sink_tuple_id(sink.tablet_sink_tuple_id), _tablet_sink_txn_id(sink.tablet_sink_txn_id), + _t_tablet_sink_exprs(&sink.tablet_sink_exprs), _enable_local_merge_sort(state->enable_local_merge_sort()) { DCHECK_GT(destinations.size(), 0); DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED || @@ -309,6 +315,9 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX( sink.output_partition.type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED); _name = "ExchangeSinkOperatorX"; _pool = std::make_shared<ObjectPool>(); + if (sink.__isset.output_tuple_id) { + _output_tuple_id = sink.output_tuple_id; + } } Status ExchangeSinkOperatorX::init(const TDataSink& tsink) { @@ -316,6 +325,10 @@ Status ExchangeSinkOperatorX::init(const TDataSink& tsink) { if (_part_type == TPartitionType::RANGE_PARTITIONED) { return Status::InternalError("TPartitionType::RANGE_PARTITIONED should not be used"); } + if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) { + RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(*_t_tablet_sink_exprs, + _tablet_sink_expr_ctxs)); + } return Status::OK(); } @@ -328,6 +341,18 @@ Status ExchangeSinkOperatorX::prepare(RuntimeState* state) { Status ExchangeSinkOperatorX::open(RuntimeState* state) { DCHECK(state != nullptr); _compression_type = state->fragement_transmission_compression_type(); + if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) { + if (_output_tuple_id == -1) { + RETURN_IF_ERROR( + vectorized::VExpr::prepare(_tablet_sink_expr_ctxs, state, _child->row_desc())); + } else { + auto* output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id); + auto* output_row_desc = _pool->add(new RowDescriptor(output_tuple_desc, false)); + RETURN_IF_ERROR( + vectorized::VExpr::prepare(_tablet_sink_expr_ctxs, state, *output_row_desc)); + } + RETURN_IF_ERROR(vectorized::VExpr::open(_tablet_sink_expr_ctxs, state)); + } return Status::OK(); } diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 1c14f04679e..e253aeadc85 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -183,7 +183,7 @@ private: // for shuffle data by partition and tablet int64_t _txn_id = -1; - vectorized::VExprContextSPtrs _fake_expr_ctxs; + vectorized::VExprContextSPtrs _tablet_sink_expr_ctxs; std::unique_ptr<VOlapTablePartitionParam> _vpartition = nullptr; std::unique_ptr<vectorized::OlapTabletFinder> _tablet_finder = nullptr; std::shared_ptr<OlapTableSchemaParam> _schema = nullptr; @@ -240,6 +240,7 @@ private: const std::vector<TExpr> _texprs; const RowDescriptor& _row_desc; + TTupleId _output_tuple_id = -1; TPartitionType::type _part_type; @@ -266,6 +267,8 @@ private: const TTupleId _tablet_sink_tuple_id; int64_t _tablet_sink_txn_id = -1; std::shared_ptr<ObjectPool> _pool; + vectorized::VExprContextSPtrs _tablet_sink_expr_ctxs; + const std::vector<TExpr>* _t_tablet_sink_exprs = nullptr; // for external table sink random partition // Control the number of channels according to the flow, thereby controlling the number of table sink writers. diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 55d99e6b50f..960df63a62f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -358,8 +358,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla MultiCastDataSink multiCastDataSink = (MultiCastDataSink) inputFragment.getSink(); DataStreamSink dataStreamSink = multiCastDataSink.getDataStreamSinks().get( multiCastDataSink.getDataStreamSinks().size() - 1); - TupleDescriptor tupleDescriptor = generateTupleDesc(distribute.getOutput(), null, context); - exchangeNode.updateTupleIds(tupleDescriptor); + exchangeNode.updateTupleIds(dataStreamSink.getOutputTupleDesc()); dataStreamSink.setExchNodeId(exchangeNode.getId()); dataStreamSink.setOutputPartition(dataPartition); parentFragment.addChild(inputFragment); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java index b57ac383495..e38ee40bc9a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java @@ -173,6 +173,7 @@ public class OlapInsertExecutor extends AbstractInsertExecutor { .createLocation(database.getId(), olapTableSink.getDstTable()); dataStreamSink.setTabletSinkLocationParam(locationParams.get(0)); dataStreamSink.setTabletSinkTxnId(olapTableSink.getTxnId()); + dataStreamSink.setTabletSinkExprs(fragment.getOutputExprs()); } } catch (Exception e) { throw new AnalysisException(e.getMessage(), e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java index b9cf516bc3d..ef42190fa25 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java @@ -62,6 +62,7 @@ public class DataStreamSink extends DataSink { protected TOlapTableLocationParam tabletSinkLocationParam = null; protected TupleDescriptor tabletSinkTupleDesc = null; protected long tabletSinkTxnId = -1; + protected List<Expr> tabletSinkExprs = null; public DataStreamSink() { @@ -145,6 +146,10 @@ public class DataStreamSink extends DataSink { this.tabletSinkLocationParam = locationParam; } + public void setTabletSinkExprs(List<Expr> tabletSinkExprs) { + this.tabletSinkExprs = tabletSinkExprs; + } + public void setTabletSinkTxnId(long txnId) { this.tabletSinkTxnId = txnId; } @@ -224,6 +229,11 @@ public class DataStreamSink extends DataSink { if (tabletSinkLocationParam != null) { tStreamSink.setTabletSinkLocation(tabletSinkLocationParam); } + if (tabletSinkExprs != null) { + for (Expr expr : tabletSinkExprs) { + tStreamSink.addToTabletSinkExprs(expr.treeToThrift()); + } + } tStreamSink.setTabletSinkTxnId(tabletSinkTxnId); result.setStreamSink(tStreamSink); return result; diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift index d509e9816e7..91e3c06ac4b 100644 --- a/gensrc/thrift/DataSinks.thrift +++ b/gensrc/thrift/DataSinks.thrift @@ -181,6 +181,7 @@ struct TDataStreamSink { 10: optional Descriptors.TOlapTableLocationParam tablet_sink_location 11: optional i64 tablet_sink_txn_id 12: optional Types.TTupleId tablet_sink_tuple_id + 13: optional list<Exprs.TExpr> tablet_sink_exprs } struct TMultiCastDataStreamSink { diff --git a/regression-test/data/nereids_p0/insert_into_table/random.out b/regression-test/data/nereids_p0/insert_into_table/random.out index d42426a991f..dd5bdc8e1d9 100644 --- a/regression-test/data/nereids_p0/insert_into_table/random.out +++ b/regression-test/data/nereids_p0/insert_into_table/random.out @@ -135,3 +135,6 @@ 13 12 20480.0 48640045.000000 10944010779 2012-03-12 2012-03-12T12:11:12 22.634 13 12 20480.0 48640045.000000 10944010779 2012-03-12 2012-03-12T12:11:12 22.634 +-- !sql_select -- +1 11 11 + diff --git a/regression-test/suites/nereids_p0/insert_into_table/random.groovy b/regression-test/suites/nereids_p0/insert_into_table/random.groovy index 6cc5cb2b991..f820ca89bd2 100644 --- a/regression-test/suites/nereids_p0/insert_into_table/random.groovy +++ b/regression-test/suites/nereids_p0/insert_into_table/random.groovy @@ -43,4 +43,15 @@ suite('nereids_insert_random') { sql 'set delete_without_partition=true' sql '''delete from dup_t_type_cast_rd where id is not null''' sql '''delete from dup_t_type_cast_rd where id is null''' + + sql 'set enable_strict_consistency_dml=true' + sql 'drop table if exists tbl_1' + sql 'drop table if exists tbl_4' + sql """CREATE TABLE tbl_1 (k1 INT, k2 INT) DISTRIBUTED BY HASH(k1) BUCKETS 10 PROPERTIES ( "light_schema_change" = "false", "replication_num" = "1");""" + sql """INSERT INTO tbl_1 VALUES (1, 11);""" + sql 'sync' + sql """CREATE TABLE tbl_4 (k1 INT, k2 INT, v INT SUM) AGGREGATE KEY (k1, k2) DISTRIBUTED BY HASH(k1) BUCKETS 10 PROPERTIES ( "replication_num" = "1"); """ + sql """INSERT INTO tbl_4 SELECT k1, k2, k2 FROM tbl_1;""" + sql 'sync' + qt_sql_select """ select * from tbl_4; """; } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
