This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 12075f9853 [pipelineX](projection) Support projection and blocking agg 
(#23256)
12075f9853 is described below

commit 12075f9853b538f188bba5da363e0176907b072b
Author: Gabriel <[email protected]>
AuthorDate: Mon Aug 21 22:23:02 2023 +0800

    [pipelineX](projection) Support projection and blocking agg (#23256)
---
 be/src/exec/data_sink.cpp                          |  30 ++---
 be/src/pipeline/exec/aggregation_sink_operator.cpp | 130 ++++++++-------------
 .../pipeline/exec/aggregation_source_operator.cpp  |   9 +-
 be/src/pipeline/exec/exchange_sink_operator.cpp    |  12 +-
 be/src/pipeline/exec/exchange_sink_operator.h      |   7 +-
 be/src/pipeline/exec/operator.cpp                  |  54 +++++++++
 be/src/pipeline/exec/operator.h                    |  15 ++-
 be/src/pipeline/pipeline_fragment_context.cpp      |   2 +-
 .../pipeline_x/pipeline_x_fragment_context.cpp     |  38 +++---
 be/src/pipeline/pipeline_x/pipeline_x_task.cpp     |   2 +-
 be/src/vec/sink/vdata_stream_sender.cpp            |  15 +--
 be/src/vec/sink/vdata_stream_sender.h              |  13 +--
 be/src/vec/sink/vresult_file_sink.cpp              |   4 +-
 be/src/vec/sink/vresult_file_sink.h                |   5 +-
 be/src/vec/sink/vresult_sink.h                     |   2 +
 be/test/vec/runtime/vdata_stream_test.cpp          |   4 +-
 16 files changed, 176 insertions(+), 166 deletions(-)

diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp
index bc914818e8..bd91c92a04 100644
--- a/be/src/exec/data_sink.cpp
+++ b/be/src/exec/data_sink.cpp
@@ -60,9 +60,9 @@ Status DataSink::create_data_sink(ObjectPool* pool, const 
TDataSink& thrift_sink
                         ? params.send_query_statistics_with_every_batch
                         : false;
         // TODO: figure out good buffer size based on size of output row
-        sink->reset(new vectorized::VDataStreamSender(
-                state, pool, params.sender_id, row_desc, 
thrift_sink.stream_sink,
-                params.destinations, 16 * 1024, 
send_query_statistics_with_every_batch));
+        sink->reset(new vectorized::VDataStreamSender(state, pool, 
params.sender_id, row_desc,
+                                                      thrift_sink.stream_sink, 
params.destinations,
+                                                      
send_query_statistics_with_every_batch));
         // RETURN_IF_ERROR(sender->prepare(state->obj_pool(), 
thrift_sink.stream_sink));
         break;
     }
@@ -73,7 +73,8 @@ Status DataSink::create_data_sink(ObjectPool* pool, const 
TDataSink& thrift_sink
 
         // TODO: figure out good buffer size based on size of output row
         sink->reset(new doris::vectorized::VResultSink(row_desc, output_exprs,
-                                                       
thrift_sink.result_sink, 4096));
+                                                       thrift_sink.result_sink,
+                                                       
vectorized::RESULT_SINK_BUFFER_SIZE));
         break;
     }
     case TDataSinkType::RESULT_FILE_SINK: {
@@ -90,11 +91,11 @@ Status DataSink::create_data_sink(ObjectPool* pool, const 
TDataSink& thrift_sink
         if (params.__isset.destinations && params.destinations.size() > 0) {
             sink->reset(new doris::vectorized::VResultFileSink(
                     state, pool, params.sender_id, row_desc, 
thrift_sink.result_file_sink,
-                    params.destinations, 16 * 1024, 
send_query_statistics_with_every_batch,
-                    output_exprs, desc_tbl));
+                    params.destinations, 
send_query_statistics_with_every_batch, output_exprs,
+                    desc_tbl));
         } else {
             sink->reset(new doris::vectorized::VResultFileSink(
-                    state, pool, row_desc, thrift_sink.result_file_sink, 16 * 
1024,
+                    state, pool, row_desc, thrift_sink.result_file_sink,
                     send_query_statistics_with_every_batch, output_exprs));
         }
         break;
@@ -197,9 +198,9 @@ Status DataSink::create_data_sink(ObjectPool* pool, const 
TDataSink& thrift_sink
                         ? params.send_query_statistics_with_every_batch
                         : false;
         // TODO: figure out good buffer size based on size of output row
-        sink->reset(new vectorized::VDataStreamSender(
-                state, pool, local_params.sender_id, row_desc, 
thrift_sink.stream_sink,
-                params.destinations, 16 * 1024, 
send_query_statistics_with_every_batch));
+        sink->reset(new vectorized::VDataStreamSender(state, pool, 
local_params.sender_id, row_desc,
+                                                      thrift_sink.stream_sink, 
params.destinations,
+                                                      
send_query_statistics_with_every_batch));
         // RETURN_IF_ERROR(sender->prepare(state->obj_pool(), 
thrift_sink.stream_sink));
         break;
     }
@@ -210,7 +211,8 @@ Status DataSink::create_data_sink(ObjectPool* pool, const 
TDataSink& thrift_sink
 
         // TODO: figure out good buffer size based on size of output row
         sink->reset(new doris::vectorized::VResultSink(row_desc, output_exprs,
-                                                       
thrift_sink.result_sink, 4096));
+                                                       thrift_sink.result_sink,
+                                                       
vectorized::RESULT_SINK_BUFFER_SIZE));
         break;
     }
     case TDataSinkType::RESULT_FILE_SINK: {
@@ -227,11 +229,11 @@ Status DataSink::create_data_sink(ObjectPool* pool, const 
TDataSink& thrift_sink
         if (params.__isset.destinations && params.destinations.size() > 0) {
             sink->reset(new doris::vectorized::VResultFileSink(
                     state, pool, local_params.sender_id, row_desc, 
thrift_sink.result_file_sink,
-                    params.destinations, 16 * 1024, 
send_query_statistics_with_every_batch,
-                    output_exprs, desc_tbl));
+                    params.destinations, 
send_query_statistics_with_every_batch, output_exprs,
+                    desc_tbl));
         } else {
             sink->reset(new doris::vectorized::VResultFileSink(
-                    state, pool, row_desc, thrift_sink.result_file_sink, 16 * 
1024,
+                    state, pool, row_desc, thrift_sink.result_file_sink,
                     send_query_statistics_with_every_batch, output_exprs));
         }
         break;
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index e0a5373dab..a856247315 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -20,6 +20,7 @@
 #include <string>
 
 #include "pipeline/exec/operator.h"
+#include "runtime/primitive_type.h"
 
 namespace doris::pipeline {
 
@@ -151,6 +152,13 @@ Status AggSinkLocalState::init(RuntimeState* state, 
Dependency* dependency) {
                                (!p._have_conjuncts) && // no having conjunct
                                p._needs_finalize;      // agg's finalize step
     }
+    // move _create_agg_status to open not in during prepare,
+    // because during prepare and open thread is not the same one,
+    // this could cause unable to get JVM
+    if (_shared_state->probe_expr_ctxs.empty()) {
+        // _create_agg_status may acquire a lot of memory, may allocate failed 
when memory is very few
+        
RETURN_IF_CATCH_EXCEPTION(_dependency->create_agg_status(_agg_data->without_key));
+    }
     return Status::OK();
 }
 
@@ -501,6 +509,7 @@ void 
AggSinkLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* p
                                                 places);
                     }
                 } else {
+                    SCOPED_TIMER(_hash_table_emplace_timer);
                     for (size_t i = 0; i < num_rows; ++i) {
                         vectorized::AggregateDataPtr mapped = nullptr;
                         if constexpr 
(vectorized::ColumnsHashing::IsSingleNullableColumnMethod<
@@ -575,88 +584,61 @@ void 
AggSinkLocalState::_find_in_hash_table(vectorized::AggregateDataPtr* places
 
 void AggSinkLocalState::_init_hash_method(const vectorized::VExprContextSPtrs& 
probe_exprs) {
     DCHECK(probe_exprs.size() >= 1);
+
+    using Type = vectorized::AggregatedDataVariants::Type;
+    Type t(Type::serialized);
+
     if (probe_exprs.size() == 1) {
         auto is_nullable = probe_exprs[0]->root()->is_nullable();
-        switch (probe_exprs[0]->root()->result_type()) {
+        PrimitiveType type = probe_exprs[0]->root()->result_type();
+        switch (type) {
         case TYPE_TINYINT:
         case TYPE_BOOLEAN:
-            
_agg_data->init(vectorized::AggregatedDataVariants::Type::int8_key, 
is_nullable);
-            return;
         case TYPE_SMALLINT:
-            
_agg_data->init(vectorized::AggregatedDataVariants::Type::int16_key, 
is_nullable);
-            return;
         case TYPE_INT:
         case TYPE_FLOAT:
         case TYPE_DATEV2:
-            if (_parent->cast<AggSinkOperatorX>()._is_first_phase)
-                
_agg_data->init(vectorized::AggregatedDataVariants::Type::int32_key, 
is_nullable);
-            else
-                
_agg_data->init(vectorized::AggregatedDataVariants::Type::int32_key_phase2,
-                                is_nullable);
-            return;
         case TYPE_BIGINT:
         case TYPE_DOUBLE:
         case TYPE_DATE:
         case TYPE_DATETIME:
         case TYPE_DATETIMEV2:
-            if (_parent->cast<AggSinkOperatorX>()._is_first_phase)
-                
_agg_data->init(vectorized::AggregatedDataVariants::Type::int64_key, 
is_nullable);
-            else
-                
_agg_data->init(vectorized::AggregatedDataVariants::Type::int64_key_phase2,
-                                is_nullable);
-            return;
-        case TYPE_LARGEINT: {
-            if (_parent->cast<AggSinkOperatorX>()._is_first_phase)
-                
_agg_data->init(vectorized::AggregatedDataVariants::Type::int128_key, 
is_nullable);
-            else
-                
_agg_data->init(vectorized::AggregatedDataVariants::Type::int128_key_phase2,
-                                is_nullable);
-            return;
-        }
+        case TYPE_LARGEINT:
         case TYPE_DECIMALV2:
         case TYPE_DECIMAL32:
         case TYPE_DECIMAL64:
         case TYPE_DECIMAL128I: {
-            vectorized::DataTypePtr& type_ptr = 
probe_exprs[0]->root()->data_type();
-            vectorized::TypeIndex idx =
-                    is_nullable ? assert_cast<const 
vectorized::DataTypeNullable&>(*type_ptr)
-                                          .get_nested_type()
-                                          ->get_type_id()
-                                : type_ptr->get_type_id();
-            vectorized::WhichDataType which(idx);
-            if (which.is_decimal32()) {
-                if (_parent->cast<AggSinkOperatorX>()._is_first_phase)
-                    
_agg_data->init(vectorized::AggregatedDataVariants::Type::int32_key,
-                                    is_nullable);
-                else
-                    
_agg_data->init(vectorized::AggregatedDataVariants::Type::int32_key_phase2,
-                                    is_nullable);
-            } else if (which.is_decimal64()) {
-                if (_parent->cast<AggSinkOperatorX>()._is_first_phase)
-                    
_agg_data->init(vectorized::AggregatedDataVariants::Type::int64_key,
-                                    is_nullable);
-                else
-                    
_agg_data->init(vectorized::AggregatedDataVariants::Type::int64_key_phase2,
-                                    is_nullable);
+            size_t size = get_primitive_type_size(type);
+            if (size == 1) {
+                t = Type::int8_key;
+            } else if (size == 2) {
+                t = Type::int16_key;
+            } else if (size == 4) {
+                t = Type::int32_key;
+            } else if (size == 8) {
+                t = Type::int64_key;
+            } else if (size == 16) {
+                t = Type::int128_key;
             } else {
-                if (_parent->cast<AggSinkOperatorX>()._is_first_phase)
-                    
_agg_data->init(vectorized::AggregatedDataVariants::Type::int128_key,
-                                    is_nullable);
-                else
-                    
_agg_data->init(vectorized::AggregatedDataVariants::Type::int128_key_phase2,
-                                    is_nullable);
+                throw Exception(ErrorCode::INTERNAL_ERROR,
+                                "meet invalid type size, size={}, type={}", 
size,
+                                type_to_string(type));
             }
-            return;
+            break;
         }
         case TYPE_CHAR:
         case TYPE_VARCHAR:
         case TYPE_STRING: {
-            
_agg_data->init(vectorized::AggregatedDataVariants::Type::string_key, 
is_nullable);
+            t = Type::string_key;
             break;
         }
         default:
-            
_agg_data->init(vectorized::AggregatedDataVariants::Type::serialized);
+            t = Type::serialized;
         }
+
+        _agg_data->init(
+                get_hash_key_type_with_phase(t, 
!_parent->cast<AggSinkOperatorX>()._is_first_phase),
+                is_nullable);
     } else {
         bool use_fixed_key = true;
         bool has_null = false;
@@ -690,40 +672,19 @@ void AggSinkLocalState::_init_hash_method(const 
vectorized::VExprContextSPtrs& p
 
         if (use_fixed_key) {
             if (bitmap_size + key_byte_size <= sizeof(vectorized::UInt64)) {
-                if (_parent->cast<AggSinkOperatorX>()._is_first_phase) {
-                    
_agg_data->init(vectorized::AggregatedDataVariants::Type::int64_keys, has_null);
-                } else {
-                    
_agg_data->init(vectorized::AggregatedDataVariants::Type::int64_keys_phase2,
-                                    has_null);
-                }
+                t = Type::int64_keys;
             } else if (bitmap_size + key_byte_size <= 
sizeof(vectorized::UInt128)) {
-                if (_parent->cast<AggSinkOperatorX>()._is_first_phase) {
-                    
_agg_data->init(vectorized::AggregatedDataVariants::Type::int128_keys,
-                                    has_null);
-                } else {
-                    
_agg_data->init(vectorized::AggregatedDataVariants::Type::int128_keys_phase2,
-                                    has_null);
-                }
+                t = Type::int128_keys;
             } else if (bitmap_size + key_byte_size <= 
sizeof(vectorized::UInt136)) {
-                if (_parent->cast<AggSinkOperatorX>()._is_first_phase) {
-                    
_agg_data->init(vectorized::AggregatedDataVariants::Type::int136_keys,
-                                    has_null);
-                } else {
-                    
_agg_data->init(vectorized::AggregatedDataVariants::Type::int136_keys_phase2,
-                                    has_null);
-                }
+                t = Type::int136_keys;
             } else {
-                if (_parent->cast<AggSinkOperatorX>()._is_first_phase) {
-                    
_agg_data->init(vectorized::AggregatedDataVariants::Type::int256_keys,
-                                    has_null);
-                } else {
-                    
_agg_data->init(vectorized::AggregatedDataVariants::Type::int256_keys_phase2,
-                                    has_null);
-                }
+                t = Type::int256_keys;
             }
-
+            _agg_data->init(get_hash_key_type_with_phase(
+                                    t, 
!_parent->cast<AggSinkOperatorX>()._is_first_phase),
+                            has_null);
         } else {
-            
_agg_data->init(vectorized::AggregatedDataVariants::Type::serialized);
+            _agg_data->init(Type::serialized);
         }
     }
 }
@@ -869,6 +830,7 @@ Status AggSinkOperatorX::open(doris::RuntimeState* state) {
 
     for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
         RETURN_IF_ERROR(_aggregate_evaluators[i]->open(state));
+        _aggregate_evaluators[i]->set_version(state->be_exec_version());
     }
 
     return Status::OK();
diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp 
b/be/src/pipeline/exec/aggregation_source_operator.cpp
index ec5e515fcc..b8436e8735 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -71,14 +71,7 @@ Status AggLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
         _executor.close = 
std::bind<void>(&AggLocalState::_close_with_serialized_key, this);
     }
 
-    // move _create_agg_status to open not in during prepare,
-    // because during prepare and open thread is not the same one,
-    // this could cause unable to get JVM
-    if (_shared_state->probe_expr_ctxs.empty()) {
-        // _create_agg_status may acquire a lot of memory, may allocate failed 
when memory is very few
-        
RETURN_IF_CATCH_EXCEPTION(_dependency->create_agg_status(_agg_data->without_key));
-        _agg_data_created_without_key = true;
-    }
+    _agg_data_created_without_key = p._without_key;
     return Status::OK();
 }
 
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index dfa016b15a..ec52ed5422 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -113,8 +113,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, 
Dependency* dependency)
             fragment_id_to_channel_index.end()) {
             channel_shared_ptrs.emplace_back(new 
vectorized::PipChannel<ExchangeSinkLocalState>(
                     this, p._row_desc, p._dests[i].brpc_server, 
fragment_instance_id,
-                    p._dest_node_id, p._per_channel_buffer_size, false,
-                    p._send_query_statistics_with_every_batch));
+                    p._dest_node_id, false, 
p._send_query_statistics_with_every_batch));
         }
         fragment_id_to_channel_index.emplace(fragment_instance_id.lo,
                                              channel_shared_ptrs.size() - 1);
@@ -190,15 +189,13 @@ segment_v2::CompressionTypePB& 
ExchangeSinkLocalState::compression_type() {
 ExchangeSinkOperatorX::ExchangeSinkOperatorX(
         const int id, RuntimeState* state, ObjectPool* pool, const 
RowDescriptor& row_desc,
         const TDataStreamSink& sink, const 
std::vector<TPlanFragmentDestination>& destinations,
-        int per_channel_buffer_size, bool 
send_query_statistics_with_every_batch,
-        PipelineXFragmentContext* context)
+        bool send_query_statistics_with_every_batch, PipelineXFragmentContext* 
context)
         : DataSinkOperatorX(id),
           _context(context),
           _pool(pool),
           _row_desc(row_desc),
           _part_type(sink.output_partition.type),
           _dests(destinations),
-          _per_channel_buffer_size(per_channel_buffer_size),
           
_send_query_statistics_with_every_batch(send_query_statistics_with_every_batch),
           _dest_node_id(sink.dest_node_id),
           _transfer_large_data_by_brpc(config::transfer_large_data_by_brpc) {
@@ -213,7 +210,7 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
 
 ExchangeSinkOperatorX::ExchangeSinkOperatorX(
         const int id, ObjectPool* pool, const RowDescriptor& row_desc, 
PlanNodeId dest_node_id,
-        const std::vector<TPlanFragmentDestination>& destinations, int 
per_channel_buffer_size,
+        const std::vector<TPlanFragmentDestination>& destinations,
         bool send_query_statistics_with_every_batch, PipelineXFragmentContext* 
context)
         : DataSinkOperatorX(id),
           _context(context),
@@ -221,7 +218,6 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
           _row_desc(row_desc),
           _part_type(TPartitionType::UNPARTITIONED),
           _dests(destinations),
-          _per_channel_buffer_size(per_channel_buffer_size),
           
_send_query_statistics_with_every_batch(send_query_statistics_with_every_batch),
           _dest_node_id(dest_node_id) {
     _cur_pb_block = &_pb_block1;
@@ -230,14 +226,12 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
 
 ExchangeSinkOperatorX::ExchangeSinkOperatorX(const int id, ObjectPool* pool,
                                              const RowDescriptor& row_desc,
-                                             int per_channel_buffer_size,
                                              bool 
send_query_statistics_with_every_batch,
                                              PipelineXFragmentContext* context)
         : DataSinkOperatorX(id),
           _context(context),
           _pool(pool),
           _row_desc(row_desc),
-          _per_channel_buffer_size(per_channel_buffer_size),
           
_send_query_statistics_with_every_batch(send_query_statistics_with_every_batch),
           _dest_node_id(0) {
     _cur_pb_block = &_pb_block1;
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h 
b/be/src/pipeline/exec/exchange_sink_operator.h
index 45114d40d1..b6d92d5829 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -158,15 +158,15 @@ public:
     ExchangeSinkOperatorX(const int id, RuntimeState* state, ObjectPool* pool,
                           const RowDescriptor& row_desc, const 
TDataStreamSink& sink,
                           const std::vector<TPlanFragmentDestination>& 
destinations,
-                          int per_channel_buffer_size, bool 
send_query_statistics_with_every_batch,
+                          bool send_query_statistics_with_every_batch,
                           PipelineXFragmentContext* context);
     ExchangeSinkOperatorX(const int id, ObjectPool* pool, const RowDescriptor& 
row_desc,
                           PlanNodeId dest_node_id,
                           const std::vector<TPlanFragmentDestination>& 
destinations,
-                          int per_channel_buffer_size, bool 
send_query_statistics_with_every_batch,
+                          bool send_query_statistics_with_every_batch,
                           PipelineXFragmentContext* context);
     ExchangeSinkOperatorX(const int id, ObjectPool* pool, const RowDescriptor& 
row_desc,
-                          int per_channel_buffer_size, bool 
send_query_statistics_with_every_batch,
+                          bool send_query_statistics_with_every_batch,
                           PipelineXFragmentContext* context);
     Status init(const TDataSink& tsink) override;
 
@@ -223,7 +223,6 @@ private:
     vectorized::VExprContextSPtrs _partition_expr_ctxs;
 
     const std::vector<TPlanFragmentDestination> _dests;
-    const int _per_channel_buffer_size;
     const bool _send_query_statistics_with_every_batch;
 
     std::unique_ptr<MemTracker> _mem_tracker;
diff --git a/be/src/pipeline/exec/operator.cpp 
b/be/src/pipeline/exec/operator.cpp
index 32a78f104b..0ea73dea54 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -19,6 +19,7 @@
 
 #include "vec/exprs/vexpr.h"
 #include "vec/exprs/vexpr_context.h"
+#include "vec/utils/util.hpp"
 
 namespace doris {
 class RowDescriptor;
@@ -153,9 +154,62 @@ Status PipelineXLocalState::init(RuntimeState* state, 
LocalStateInfo& /*info*/)
                                profile()->total_time_counter()),
             "");
     _mem_tracker = std::make_unique<MemTracker>("PipelineXLocalState:" + 
_runtime_profile->name());
+    _memory_used_counter = ADD_LABEL_COUNTER(_runtime_profile, "MemoryUsage");
+    _peak_memory_usage_counter = _runtime_profile->AddHighWaterMarkCounter(
+            "PeakMemoryUsage", TUnit::BYTES, "MemoryUsage");
     return Status::OK();
 }
 
+void PipelineXLocalState::clear_origin_block() {
+    
_origin_block.clear_column_data(_parent->_row_descriptor.num_materialized_slots());
+}
+
+Status OperatorXBase::do_projections(RuntimeState* state, vectorized::Block* 
origin_block,
+                                     vectorized::Block* output_block) {
+    auto local_state = state->get_local_state(id());
+    SCOPED_TIMER(local_state->_projection_timer);
+    using namespace vectorized;
+    vectorized::MutableBlock mutable_block =
+            
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(output_block,
+                                                                       
*_output_row_descriptor);
+    auto rows = origin_block->rows();
+
+    if (rows != 0) {
+        auto& mutable_columns = mutable_block.mutable_columns();
+        DCHECK(mutable_columns.size() == _projections.size());
+        for (int i = 0; i < mutable_columns.size(); ++i) {
+            auto result_column_id = -1;
+            RETURN_IF_ERROR(_projections[i]->execute(origin_block, 
&result_column_id));
+            auto column_ptr = origin_block->get_by_position(result_column_id)
+                                      
.column->convert_to_full_column_if_const();
+            //TODO: this is a quick fix, we need a new function like 
"change_to_nullable" to do it
+            if (mutable_columns[i]->is_nullable() xor 
column_ptr->is_nullable()) {
+                DCHECK(mutable_columns[i]->is_nullable() && 
!column_ptr->is_nullable());
+                reinterpret_cast<ColumnNullable*>(mutable_columns[i].get())
+                        ->insert_range_from_not_nullable(*column_ptr, 0, rows);
+            } else {
+                mutable_columns[i]->insert_range_from(*column_ptr, 0, rows);
+            }
+        }
+        DCHECK(mutable_block.rows() == rows);
+    }
+
+    return Status::OK();
+}
+
+Status OperatorXBase::get_next_after_projects(RuntimeState* state, 
vectorized::Block* block,
+                                              SourceState& source_state) {
+    auto local_state = state->get_local_state(id());
+    if (_output_row_descriptor) {
+        local_state->clear_origin_block();
+        auto status = get_block(state, &local_state->_origin_block, 
source_state);
+        if (UNLIKELY(!status.ok())) return status;
+        return do_projections(state, &local_state->_origin_block, block);
+    }
+    
local_state->_peak_memory_usage_counter->set(local_state->_mem_tracker->peak_consumption());
+    return get_block(state, block, source_state);
+}
+
 bool PipelineXLocalState::reached_limit() const {
     return _parent->_limit != -1 && _num_rows_returned >= _parent->_limit;
 }
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 913d277638..844fd02ad4 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -492,6 +492,7 @@ public:
               _rows_returned_counter(nullptr),
               _rows_returned_rate(nullptr),
               _memory_used_counter(nullptr),
+              _peak_memory_usage_counter(nullptr),
               _parent(parent),
               _state(state) {}
     virtual ~PipelineXLocalState() {}
@@ -506,6 +507,9 @@ public:
         return reinterpret_cast<const TARGET&>(*this);
     }
 
+    // If use projection, we should clear `_origin_block`.
+    void clear_origin_block();
+
     bool reached_limit() const;
     void reached_limit(vectorized::Block* block, bool* eos);
     RuntimeProfile* profile() { return _runtime_profile.get(); }
@@ -541,6 +545,8 @@ protected:
     // Account for peak memory used by this node
     RuntimeProfile::Counter* _memory_used_counter;
     RuntimeProfile::Counter* _projection_timer;
+    // Account for peak memory used by this node
+    RuntimeProfile::Counter* _peak_memory_usage_counter;
 
     OpentelemetrySpan _span;
     OperatorXBase* _parent;
@@ -548,6 +554,7 @@ protected:
     vectorized::VExprContextSPtrs _conjuncts;
     vectorized::VExprContextSPtrs _projections;
     bool _init = false;
+    vectorized::Block _origin_block;
 };
 
 class OperatorXBase : public OperatorBase {
@@ -645,6 +652,13 @@ public:
 
     virtual bool is_source() const override { return false; }
 
+    Status get_next_after_projects(RuntimeState* state, vectorized::Block* 
block,
+                                   SourceState& source_state);
+
+    /// Only use in vectorized exec engine try to do projections to trans 
_row_desc -> _output_row_desc
+    Status do_projections(RuntimeState* state, vectorized::Block* origin_block,
+                          vectorized::Block* output_block);
+
 protected:
     friend class PipelineXLocalState;
     int _id; // unique w/in single plan tree
@@ -656,7 +670,6 @@ protected:
 
     OperatorXBase* _children;
     RowDescriptor _row_descriptor;
-    vectorized::Block _origin_block;
 
     std::unique_ptr<RowDescriptor> _output_row_descriptor;
     vectorized::VExprContextSPtrs _projections;
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index d37410edb1..871c554781 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -794,7 +794,7 @@ Status PipelineFragmentContext::_create_sink(int sender_id, 
const TDataSink& thr
             _multi_cast_stream_sink_senders[i].reset(new 
vectorized::VDataStreamSender(
                     _runtime_state.get(), _runtime_state->obj_pool(), 
sender_id, row_desc,
                     thrift_sink.multi_cast_stream_sink.sinks[i],
-                    thrift_sink.multi_cast_stream_sink.destinations[i], 16 * 
1024, false));
+                    thrift_sink.multi_cast_stream_sink.destinations[i], 
false));
 
             // 2. create and set the source operator of 
multi_cast_data_stream_source for new pipeline
             OperatorBuilderPtr source_op =
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 1e9d3d3f95..a182934bd3 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -113,11 +113,10 @@ void PipelineXFragmentContext::cancel(const 
PPlanFragmentCancelReason& reason,
             _exec_status = Status::Cancelled(msg);
         }
 
-        for (auto& rs : _runtime_states) {
-            rs->set_is_cancelled(true, msg);
-            rs->set_process_status(_exec_status);
-            _exec_env->vstream_mgr()->cancel(rs->fragment_instance_id());
-        }
+        FOR_EACH_RUNTIME_STATE(
+                runtime_state->set_is_cancelled(true, msg);
+                runtime_state->set_process_status(_exec_status);
+                
_exec_env->vstream_mgr()->cancel(runtime_state->fragment_instance_id());)
 
         LOG(WARNING) << "PipelineFragmentContext Canceled. reason=" << msg;
 
@@ -236,9 +235,9 @@ Status 
PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
                 params.__isset.send_query_statistics_with_every_batch
                         ? params.send_query_statistics_with_every_batch
                         : false;
-        _sink.reset(new ExchangeSinkOperatorX(
-                _sink_idx++, state, pool, row_desc, thrift_sink.stream_sink, 
params.destinations,
-                16 * 1024, send_query_statistics_with_every_batch, this));
+        _sink.reset(new ExchangeSinkOperatorX(_sink_idx++, state, pool, 
row_desc,
+                                              thrift_sink.stream_sink, 
params.destinations,
+                                              
send_query_statistics_with_every_batch, this));
         break;
     }
     case TDataSinkType::RESULT_SINK: {
@@ -248,7 +247,8 @@ Status 
PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
 
         // TODO: figure out good buffer size based on size of output row
         _sink.reset(new ResultSinkOperatorX(_sink_idx++, row_desc, 
output_exprs,
-                                            thrift_sink.result_sink, 4096));
+                                            thrift_sink.result_sink,
+                                            
vectorized::RESULT_SINK_BUFFER_SIZE));
         break;
     }
     default:
@@ -379,8 +379,7 @@ Status 
PipelineXFragmentContext::_build_pipelines(ObjectPool* pool,
                                                   const DescriptorTbl& descs, 
OperatorXPtr* root,
                                                   PipelinePtr cur_pipe) {
     if (request.fragment.plan.nodes.size() == 0) {
-        *root = nullptr;
-        return Status::OK();
+        throw Exception(ErrorCode::INTERNAL_ERROR, "Invalid plan which has no 
plan node!");
     }
 
     int node_idx = 0;
@@ -405,7 +404,9 @@ Status 
PipelineXFragmentContext::_create_tree_helper(ObjectPool* pool,
     // propagate error case
     if (*node_idx >= tnodes.size()) {
         // TODO: print thrift msg
-        return Status::InternalError("Failed to reconstruct plan tree from 
thrift.");
+        return Status::InternalError(
+                "Failed to reconstruct plan tree from thrift. Node id: {}, 
number of nodes: {}",
+                *node_idx, tnodes.size());
     }
     const TPlanNode& tnode = tnodes[*node_idx];
 
@@ -429,7 +430,9 @@ Status 
PipelineXFragmentContext::_create_tree_helper(ObjectPool* pool,
         // this means we have been given a bad tree and must fail
         if (*node_idx >= tnodes.size()) {
             // TODO: print thrift msg
-            return Status::InternalError("Failed to reconstruct plan tree from 
thrift.");
+            return Status::InternalError(
+                    "Failed to reconstruct plan tree from thrift. Node id: {}, 
number of nodes: {}",
+                    *node_idx, tnodes.size());
         }
     }
 
@@ -518,12 +521,9 @@ Status PipelineXFragmentContext::submit() {
 }
 
 void PipelineXFragmentContext::close_sink() {
-    if (_prepared) {
-        FOR_EACH_RUNTIME_STATE(
-                _sink->close(runtime_state.get(), 
Status::RuntimeError("prepare failed"));)
-    } else {
-        FOR_EACH_RUNTIME_STATE(_sink->close(runtime_state.get(), 
Status::OK());)
-    }
+    FOR_EACH_RUNTIME_STATE(
+            _sink->close(runtime_state.get(),
+                         _prepared ? Status::RuntimeError("prepare failed") : 
Status::OK()););
 }
 
 void PipelineXFragmentContext::close_if_prepare_failed() {
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index c3da4e3e47..250bc610a9 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -170,7 +170,7 @@ Status PipelineXTask::execute(bool* eos) {
         {
             SCOPED_TIMER(_get_block_timer);
             _get_block_counter->update(1);
-            RETURN_IF_ERROR(_root->get_block(_state, block, _data_state));
+            RETURN_IF_ERROR(_root->get_next_after_projects(_state, block, 
_data_state));
         }
         *eos = _data_state == SourceState::FINISHED;
         if (_block->rows() != 0 || *eos) {
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index e19fcd3fa3..bc749f7f21 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -292,7 +292,6 @@ void Channel<Parent>::ch_roll_pb_block() {
 VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, 
int sender_id,
                                      const RowDescriptor& row_desc, const 
TDataStreamSink& sink,
                                      const 
std::vector<TPlanFragmentDestination>& destinations,
-                                     int per_channel_buffer_size,
                                      bool 
send_query_statistics_with_every_batch)
         : _sender_id(sender_id),
           _state(state),
@@ -330,13 +329,13 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, 
ObjectPool* pool, int
             if (_enable_pipeline_exec) {
                 _channel_shared_ptrs.emplace_back(new 
PipChannel<VDataStreamSender>(
                         this, row_desc, destinations[i].brpc_server, 
fragment_instance_id,
-                        sink.dest_node_id, per_channel_buffer_size, 
is_transfer_chain,
+                        sink.dest_node_id, is_transfer_chain,
                         send_query_statistics_with_every_batch));
             } else {
-                _channel_shared_ptrs.emplace_back(new Channel(
-                        this, row_desc, destinations[i].brpc_server, 
fragment_instance_id,
-                        sink.dest_node_id, per_channel_buffer_size, 
is_transfer_chain,
-                        send_query_statistics_with_every_batch));
+                _channel_shared_ptrs.emplace_back(
+                        new Channel(this, row_desc, 
destinations[i].brpc_server,
+                                    fragment_instance_id, sink.dest_node_id, 
is_transfer_chain,
+                                    send_query_statistics_with_every_batch));
             }
             fragment_id_to_channel_index.emplace(fragment_instance_id.lo,
                                                  _channel_shared_ptrs.size() - 
1);
@@ -358,7 +357,6 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, 
ObjectPool* pool, int
 VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, 
int sender_id,
                                      const RowDescriptor& row_desc, PlanNodeId 
dest_node_id,
                                      const 
std::vector<TPlanFragmentDestination>& destinations,
-                                     int per_channel_buffer_size,
                                      bool 
send_query_statistics_with_every_batch)
         : _sender_id(sender_id),
           _state(state),
@@ -388,8 +386,7 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, 
ObjectPool* pool, int
             fragment_id_to_channel_index.end()) {
             _channel_shared_ptrs.emplace_back(
                     new Channel(this, row_desc, destinations[i].brpc_server, 
fragment_instance_id,
-                                _dest_node_id, per_channel_buffer_size, false,
-                                send_query_statistics_with_every_batch));
+                                _dest_node_id, false, 
send_query_statistics_with_every_batch));
         }
         fragment_id_to_channel_index.emplace(fragment_instance_id.lo,
                                              _channel_shared_ptrs.size() - 1);
diff --git a/be/src/vec/sink/vdata_stream_sender.h 
b/be/src/vec/sink/vdata_stream_sender.h
index 0015839efb..5ec30b7cd4 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -100,12 +100,12 @@ public:
     VDataStreamSender(RuntimeState* state, ObjectPool* pool, int sender_id,
                       const RowDescriptor& row_desc, const TDataStreamSink& 
sink,
                       const std::vector<TPlanFragmentDestination>& 
destinations,
-                      int per_channel_buffer_size, bool 
send_query_statistics_with_every_batch);
+                      bool send_query_statistics_with_every_batch);
 
     VDataStreamSender(RuntimeState* state, ObjectPool* pool, int sender_id,
                       const RowDescriptor& row_desc, PlanNodeId dest_node_id,
                       const std::vector<TPlanFragmentDestination>& 
destinations,
-                      int per_channel_buffer_size, bool 
send_query_statistics_with_every_batch);
+                      bool send_query_statistics_with_every_batch);
 
     ~VDataStreamSender() override;
 
@@ -241,8 +241,8 @@ public:
     // how much tuple data is getting accumulated before being sent; it only 
applies
     // when data is added via add_row() and not sent directly via send_batch().
     Channel(Parent* parent, const RowDescriptor& row_desc, const 
TNetworkAddress& brpc_dest,
-            const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, 
int buffer_size,
-            bool is_transfer_chain, bool 
send_query_statistics_with_every_batch)
+            const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, 
bool is_transfer_chain,
+            bool send_query_statistics_with_every_batch)
             : _parent(parent),
               _row_desc(row_desc),
               _fragment_instance_id(fragment_instance_id),
@@ -450,11 +450,10 @@ template <typename Parent = VDataStreamSender>
 class PipChannel final : public Channel<Parent> {
 public:
     PipChannel(Parent* parent, const RowDescriptor& row_desc, const 
TNetworkAddress& brpc_dest,
-               const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, 
int buffer_size,
+               const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id,
                bool is_transfer_chain, bool 
send_query_statistics_with_every_batch)
             : Channel<Parent>(parent, row_desc, brpc_dest, 
fragment_instance_id, dest_node_id,
-                              buffer_size, is_transfer_chain,
-                              send_query_statistics_with_every_batch) {
+                              is_transfer_chain, 
send_query_statistics_with_every_batch) {
         ch_roll_pb_block();
     }
 
diff --git a/be/src/vec/sink/vresult_file_sink.cpp 
b/be/src/vec/sink/vresult_file_sink.cpp
index 5b2003d8c2..b388f450ce 100644
--- a/be/src/vec/sink/vresult_file_sink.cpp
+++ b/be/src/vec/sink/vresult_file_sink.cpp
@@ -47,7 +47,6 @@ namespace doris::vectorized {
 
 VResultFileSink::VResultFileSink(RuntimeState* state, ObjectPool* pool,
                                  const RowDescriptor& row_desc, const 
TResultFileSink& sink,
-                                 int per_channel_buffer_size,
                                  bool send_query_statistics_with_every_batch,
                                  const std::vector<TExpr>& t_output_expr)
         : _t_output_expr(t_output_expr), _row_desc(row_desc) {
@@ -66,7 +65,6 @@ VResultFileSink::VResultFileSink(RuntimeState* state, 
ObjectPool* pool,
 VResultFileSink::VResultFileSink(RuntimeState* state, ObjectPool* pool, int 
sender_id,
                                  const RowDescriptor& row_desc, const 
TResultFileSink& sink,
                                  const std::vector<TPlanFragmentDestination>& 
destinations,
-                                 int per_channel_buffer_size,
                                  bool send_query_statistics_with_every_batch,
                                  const std::vector<TExpr>& t_output_expr, 
DescriptorTbl& descs)
         : _t_output_expr(t_output_expr),
@@ -79,7 +77,7 @@ VResultFileSink::VResultFileSink(RuntimeState* state, 
ObjectPool* pool, int send
     _is_top_sink = false;
     CHECK_EQ(destinations.size(), 1);
     _stream_sender.reset(new VDataStreamSender(state, pool, sender_id, 
row_desc, sink.dest_node_id,
-                                               destinations, 
per_channel_buffer_size,
+                                               destinations,
                                                
send_query_statistics_with_every_batch));
 
     _name = "VResultFileSink";
diff --git a/be/src/vec/sink/vresult_file_sink.h 
b/be/src/vec/sink/vresult_file_sink.h
index c8d2f3be18..6a9d89f773 100644
--- a/be/src/vec/sink/vresult_file_sink.h
+++ b/be/src/vec/sink/vresult_file_sink.h
@@ -47,13 +47,12 @@ class VExprContext;
 class VResultFileSink : public DataSink {
 public:
     VResultFileSink(RuntimeState* state, ObjectPool* pool, const 
RowDescriptor& row_desc,
-                    const TResultFileSink& sink, int per_channel_buffer_size,
-                    bool send_query_statistics_with_every_batch,
+                    const TResultFileSink& sink, bool 
send_query_statistics_with_every_batch,
                     const std::vector<TExpr>& t_output_expr);
     VResultFileSink(RuntimeState* state, ObjectPool* pool, int sender_id,
                     const RowDescriptor& row_desc, const TResultFileSink& sink,
                     const std::vector<TPlanFragmentDestination>& destinations,
-                    int per_channel_buffer_size, bool 
send_query_statistics_with_every_batch,
+                    bool send_query_statistics_with_every_batch,
                     const std::vector<TExpr>& t_output_expr, DescriptorTbl& 
descs);
     ~VResultFileSink() override = default;
     Status init(const TDataSink& thrift_sink) override;
diff --git a/be/src/vec/sink/vresult_sink.h b/be/src/vec/sink/vresult_sink.h
index de1126b2e1..3da07fe804 100644
--- a/be/src/vec/sink/vresult_sink.h
+++ b/be/src/vec/sink/vresult_sink.h
@@ -118,6 +118,8 @@ struct ResultFileOptions {
     }
 };
 
+constexpr int RESULT_SINK_BUFFER_SIZE = 4096;
+
 class VResultSink : public DataSink {
 public:
     friend class pipeline::ResultSinkOperator;
diff --git a/be/test/vec/runtime/vdata_stream_test.cpp 
b/be/test/vec/runtime/vdata_stream_test.cpp
index 6c10b89aa7..05c50023b2 100644
--- a/be/test/vec/runtime/vdata_stream_test.cpp
+++ b/be/test/vec/runtime/vdata_stream_test.cpp
@@ -188,11 +188,9 @@ TEST_F(VDataStreamTest, BasicTest) {
         dest.__set_server(addr);
         dests.push_back(dest);
     }
-    int per_channel_buffer_size = 1024 * 1024;
     bool send_query_statistics_with_every_batch = false;
     VDataStreamSender sender(&runtime_stat, &_object_pool, sender_id, 
row_desc, tsink.stream_sink,
-                             dests, per_channel_buffer_size,
-                             send_query_statistics_with_every_batch);
+                             dests, send_query_statistics_with_every_batch);
     sender.set_query_statistics(std::make_shared<QueryStatistics>());
     sender.init(tsink);
     sender.prepare(&runtime_stat);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to