This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 12075f9853 [pipelineX](projection) Support projection and blocking agg
(#23256)
12075f9853 is described below
commit 12075f9853b538f188bba5da363e0176907b072b
Author: Gabriel <[email protected]>
AuthorDate: Mon Aug 21 22:23:02 2023 +0800
[pipelineX](projection) Support projection and blocking agg (#23256)
---
be/src/exec/data_sink.cpp | 30 ++---
be/src/pipeline/exec/aggregation_sink_operator.cpp | 130 ++++++++-------------
.../pipeline/exec/aggregation_source_operator.cpp | 9 +-
be/src/pipeline/exec/exchange_sink_operator.cpp | 12 +-
be/src/pipeline/exec/exchange_sink_operator.h | 7 +-
be/src/pipeline/exec/operator.cpp | 54 +++++++++
be/src/pipeline/exec/operator.h | 15 ++-
be/src/pipeline/pipeline_fragment_context.cpp | 2 +-
.../pipeline_x/pipeline_x_fragment_context.cpp | 38 +++---
be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 2 +-
be/src/vec/sink/vdata_stream_sender.cpp | 15 +--
be/src/vec/sink/vdata_stream_sender.h | 13 +--
be/src/vec/sink/vresult_file_sink.cpp | 4 +-
be/src/vec/sink/vresult_file_sink.h | 5 +-
be/src/vec/sink/vresult_sink.h | 2 +
be/test/vec/runtime/vdata_stream_test.cpp | 4 +-
16 files changed, 176 insertions(+), 166 deletions(-)
diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp
index bc914818e8..bd91c92a04 100644
--- a/be/src/exec/data_sink.cpp
+++ b/be/src/exec/data_sink.cpp
@@ -60,9 +60,9 @@ Status DataSink::create_data_sink(ObjectPool* pool, const
TDataSink& thrift_sink
? params.send_query_statistics_with_every_batch
: false;
// TODO: figure out good buffer size based on size of output row
- sink->reset(new vectorized::VDataStreamSender(
- state, pool, params.sender_id, row_desc,
thrift_sink.stream_sink,
- params.destinations, 16 * 1024,
send_query_statistics_with_every_batch));
+ sink->reset(new vectorized::VDataStreamSender(state, pool,
params.sender_id, row_desc,
+ thrift_sink.stream_sink,
params.destinations,
+
send_query_statistics_with_every_batch));
// RETURN_IF_ERROR(sender->prepare(state->obj_pool(),
thrift_sink.stream_sink));
break;
}
@@ -73,7 +73,8 @@ Status DataSink::create_data_sink(ObjectPool* pool, const
TDataSink& thrift_sink
// TODO: figure out good buffer size based on size of output row
sink->reset(new doris::vectorized::VResultSink(row_desc, output_exprs,
-
thrift_sink.result_sink, 4096));
+ thrift_sink.result_sink,
+
vectorized::RESULT_SINK_BUFFER_SIZE));
break;
}
case TDataSinkType::RESULT_FILE_SINK: {
@@ -90,11 +91,11 @@ Status DataSink::create_data_sink(ObjectPool* pool, const
TDataSink& thrift_sink
if (params.__isset.destinations && params.destinations.size() > 0) {
sink->reset(new doris::vectorized::VResultFileSink(
state, pool, params.sender_id, row_desc,
thrift_sink.result_file_sink,
- params.destinations, 16 * 1024,
send_query_statistics_with_every_batch,
- output_exprs, desc_tbl));
+ params.destinations,
send_query_statistics_with_every_batch, output_exprs,
+ desc_tbl));
} else {
sink->reset(new doris::vectorized::VResultFileSink(
- state, pool, row_desc, thrift_sink.result_file_sink, 16 *
1024,
+ state, pool, row_desc, thrift_sink.result_file_sink,
send_query_statistics_with_every_batch, output_exprs));
}
break;
@@ -197,9 +198,9 @@ Status DataSink::create_data_sink(ObjectPool* pool, const
TDataSink& thrift_sink
? params.send_query_statistics_with_every_batch
: false;
// TODO: figure out good buffer size based on size of output row
- sink->reset(new vectorized::VDataStreamSender(
- state, pool, local_params.sender_id, row_desc,
thrift_sink.stream_sink,
- params.destinations, 16 * 1024,
send_query_statistics_with_every_batch));
+ sink->reset(new vectorized::VDataStreamSender(state, pool,
local_params.sender_id, row_desc,
+ thrift_sink.stream_sink,
params.destinations,
+
send_query_statistics_with_every_batch));
// RETURN_IF_ERROR(sender->prepare(state->obj_pool(),
thrift_sink.stream_sink));
break;
}
@@ -210,7 +211,8 @@ Status DataSink::create_data_sink(ObjectPool* pool, const
TDataSink& thrift_sink
// TODO: figure out good buffer size based on size of output row
sink->reset(new doris::vectorized::VResultSink(row_desc, output_exprs,
-
thrift_sink.result_sink, 4096));
+ thrift_sink.result_sink,
+
vectorized::RESULT_SINK_BUFFER_SIZE));
break;
}
case TDataSinkType::RESULT_FILE_SINK: {
@@ -227,11 +229,11 @@ Status DataSink::create_data_sink(ObjectPool* pool, const
TDataSink& thrift_sink
if (params.__isset.destinations && params.destinations.size() > 0) {
sink->reset(new doris::vectorized::VResultFileSink(
state, pool, local_params.sender_id, row_desc,
thrift_sink.result_file_sink,
- params.destinations, 16 * 1024,
send_query_statistics_with_every_batch,
- output_exprs, desc_tbl));
+ params.destinations,
send_query_statistics_with_every_batch, output_exprs,
+ desc_tbl));
} else {
sink->reset(new doris::vectorized::VResultFileSink(
- state, pool, row_desc, thrift_sink.result_file_sink, 16 *
1024,
+ state, pool, row_desc, thrift_sink.result_file_sink,
send_query_statistics_with_every_batch, output_exprs));
}
break;
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index e0a5373dab..a856247315 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -20,6 +20,7 @@
#include <string>
#include "pipeline/exec/operator.h"
+#include "runtime/primitive_type.h"
namespace doris::pipeline {
@@ -151,6 +152,13 @@ Status AggSinkLocalState::init(RuntimeState* state,
Dependency* dependency) {
(!p._have_conjuncts) && // no having conjunct
p._needs_finalize; // agg's finalize step
}
+ // move _create_agg_status to open not in during prepare,
+ // because during prepare and open thread is not the same one,
+ // this could cause unable to get JVM
+ if (_shared_state->probe_expr_ctxs.empty()) {
+ // _create_agg_status may acquire a lot of memory, may allocate failed
when memory is very few
+
RETURN_IF_CATCH_EXCEPTION(_dependency->create_agg_status(_agg_data->without_key));
+ }
return Status::OK();
}
@@ -501,6 +509,7 @@ void
AggSinkLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* p
places);
}
} else {
+ SCOPED_TIMER(_hash_table_emplace_timer);
for (size_t i = 0; i < num_rows; ++i) {
vectorized::AggregateDataPtr mapped = nullptr;
if constexpr
(vectorized::ColumnsHashing::IsSingleNullableColumnMethod<
@@ -575,88 +584,61 @@ void
AggSinkLocalState::_find_in_hash_table(vectorized::AggregateDataPtr* places
void AggSinkLocalState::_init_hash_method(const vectorized::VExprContextSPtrs&
probe_exprs) {
DCHECK(probe_exprs.size() >= 1);
+
+ using Type = vectorized::AggregatedDataVariants::Type;
+ Type t(Type::serialized);
+
if (probe_exprs.size() == 1) {
auto is_nullable = probe_exprs[0]->root()->is_nullable();
- switch (probe_exprs[0]->root()->result_type()) {
+ PrimitiveType type = probe_exprs[0]->root()->result_type();
+ switch (type) {
case TYPE_TINYINT:
case TYPE_BOOLEAN:
-
_agg_data->init(vectorized::AggregatedDataVariants::Type::int8_key,
is_nullable);
- return;
case TYPE_SMALLINT:
-
_agg_data->init(vectorized::AggregatedDataVariants::Type::int16_key,
is_nullable);
- return;
case TYPE_INT:
case TYPE_FLOAT:
case TYPE_DATEV2:
- if (_parent->cast<AggSinkOperatorX>()._is_first_phase)
-
_agg_data->init(vectorized::AggregatedDataVariants::Type::int32_key,
is_nullable);
- else
-
_agg_data->init(vectorized::AggregatedDataVariants::Type::int32_key_phase2,
- is_nullable);
- return;
case TYPE_BIGINT:
case TYPE_DOUBLE:
case TYPE_DATE:
case TYPE_DATETIME:
case TYPE_DATETIMEV2:
- if (_parent->cast<AggSinkOperatorX>()._is_first_phase)
-
_agg_data->init(vectorized::AggregatedDataVariants::Type::int64_key,
is_nullable);
- else
-
_agg_data->init(vectorized::AggregatedDataVariants::Type::int64_key_phase2,
- is_nullable);
- return;
- case TYPE_LARGEINT: {
- if (_parent->cast<AggSinkOperatorX>()._is_first_phase)
-
_agg_data->init(vectorized::AggregatedDataVariants::Type::int128_key,
is_nullable);
- else
-
_agg_data->init(vectorized::AggregatedDataVariants::Type::int128_key_phase2,
- is_nullable);
- return;
- }
+ case TYPE_LARGEINT:
case TYPE_DECIMALV2:
case TYPE_DECIMAL32:
case TYPE_DECIMAL64:
case TYPE_DECIMAL128I: {
- vectorized::DataTypePtr& type_ptr =
probe_exprs[0]->root()->data_type();
- vectorized::TypeIndex idx =
- is_nullable ? assert_cast<const
vectorized::DataTypeNullable&>(*type_ptr)
- .get_nested_type()
- ->get_type_id()
- : type_ptr->get_type_id();
- vectorized::WhichDataType which(idx);
- if (which.is_decimal32()) {
- if (_parent->cast<AggSinkOperatorX>()._is_first_phase)
-
_agg_data->init(vectorized::AggregatedDataVariants::Type::int32_key,
- is_nullable);
- else
-
_agg_data->init(vectorized::AggregatedDataVariants::Type::int32_key_phase2,
- is_nullable);
- } else if (which.is_decimal64()) {
- if (_parent->cast<AggSinkOperatorX>()._is_first_phase)
-
_agg_data->init(vectorized::AggregatedDataVariants::Type::int64_key,
- is_nullable);
- else
-
_agg_data->init(vectorized::AggregatedDataVariants::Type::int64_key_phase2,
- is_nullable);
+ size_t size = get_primitive_type_size(type);
+ if (size == 1) {
+ t = Type::int8_key;
+ } else if (size == 2) {
+ t = Type::int16_key;
+ } else if (size == 4) {
+ t = Type::int32_key;
+ } else if (size == 8) {
+ t = Type::int64_key;
+ } else if (size == 16) {
+ t = Type::int128_key;
} else {
- if (_parent->cast<AggSinkOperatorX>()._is_first_phase)
-
_agg_data->init(vectorized::AggregatedDataVariants::Type::int128_key,
- is_nullable);
- else
-
_agg_data->init(vectorized::AggregatedDataVariants::Type::int128_key_phase2,
- is_nullable);
+ throw Exception(ErrorCode::INTERNAL_ERROR,
+ "meet invalid type size, size={}, type={}",
size,
+ type_to_string(type));
}
- return;
+ break;
}
case TYPE_CHAR:
case TYPE_VARCHAR:
case TYPE_STRING: {
-
_agg_data->init(vectorized::AggregatedDataVariants::Type::string_key,
is_nullable);
+ t = Type::string_key;
break;
}
default:
-
_agg_data->init(vectorized::AggregatedDataVariants::Type::serialized);
+ t = Type::serialized;
}
+
+ _agg_data->init(
+ get_hash_key_type_with_phase(t,
!_parent->cast<AggSinkOperatorX>()._is_first_phase),
+ is_nullable);
} else {
bool use_fixed_key = true;
bool has_null = false;
@@ -690,40 +672,19 @@ void AggSinkLocalState::_init_hash_method(const
vectorized::VExprContextSPtrs& p
if (use_fixed_key) {
if (bitmap_size + key_byte_size <= sizeof(vectorized::UInt64)) {
- if (_parent->cast<AggSinkOperatorX>()._is_first_phase) {
-
_agg_data->init(vectorized::AggregatedDataVariants::Type::int64_keys, has_null);
- } else {
-
_agg_data->init(vectorized::AggregatedDataVariants::Type::int64_keys_phase2,
- has_null);
- }
+ t = Type::int64_keys;
} else if (bitmap_size + key_byte_size <=
sizeof(vectorized::UInt128)) {
- if (_parent->cast<AggSinkOperatorX>()._is_first_phase) {
-
_agg_data->init(vectorized::AggregatedDataVariants::Type::int128_keys,
- has_null);
- } else {
-
_agg_data->init(vectorized::AggregatedDataVariants::Type::int128_keys_phase2,
- has_null);
- }
+ t = Type::int128_keys;
} else if (bitmap_size + key_byte_size <=
sizeof(vectorized::UInt136)) {
- if (_parent->cast<AggSinkOperatorX>()._is_first_phase) {
-
_agg_data->init(vectorized::AggregatedDataVariants::Type::int136_keys,
- has_null);
- } else {
-
_agg_data->init(vectorized::AggregatedDataVariants::Type::int136_keys_phase2,
- has_null);
- }
+ t = Type::int136_keys;
} else {
- if (_parent->cast<AggSinkOperatorX>()._is_first_phase) {
-
_agg_data->init(vectorized::AggregatedDataVariants::Type::int256_keys,
- has_null);
- } else {
-
_agg_data->init(vectorized::AggregatedDataVariants::Type::int256_keys_phase2,
- has_null);
- }
+ t = Type::int256_keys;
}
-
+ _agg_data->init(get_hash_key_type_with_phase(
+ t,
!_parent->cast<AggSinkOperatorX>()._is_first_phase),
+ has_null);
} else {
-
_agg_data->init(vectorized::AggregatedDataVariants::Type::serialized);
+ _agg_data->init(Type::serialized);
}
}
}
@@ -869,6 +830,7 @@ Status AggSinkOperatorX::open(doris::RuntimeState* state) {
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
RETURN_IF_ERROR(_aggregate_evaluators[i]->open(state));
+ _aggregate_evaluators[i]->set_version(state->be_exec_version());
}
return Status::OK();
diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp
b/be/src/pipeline/exec/aggregation_source_operator.cpp
index ec5e515fcc..b8436e8735 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -71,14 +71,7 @@ Status AggLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
_executor.close =
std::bind<void>(&AggLocalState::_close_with_serialized_key, this);
}
- // move _create_agg_status to open not in during prepare,
- // because during prepare and open thread is not the same one,
- // this could cause unable to get JVM
- if (_shared_state->probe_expr_ctxs.empty()) {
- // _create_agg_status may acquire a lot of memory, may allocate failed
when memory is very few
-
RETURN_IF_CATCH_EXCEPTION(_dependency->create_agg_status(_agg_data->without_key));
- _agg_data_created_without_key = true;
- }
+ _agg_data_created_without_key = p._without_key;
return Status::OK();
}
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index dfa016b15a..ec52ed5422 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -113,8 +113,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state,
Dependency* dependency)
fragment_id_to_channel_index.end()) {
channel_shared_ptrs.emplace_back(new
vectorized::PipChannel<ExchangeSinkLocalState>(
this, p._row_desc, p._dests[i].brpc_server,
fragment_instance_id,
- p._dest_node_id, p._per_channel_buffer_size, false,
- p._send_query_statistics_with_every_batch));
+ p._dest_node_id, false,
p._send_query_statistics_with_every_batch));
}
fragment_id_to_channel_index.emplace(fragment_instance_id.lo,
channel_shared_ptrs.size() - 1);
@@ -190,15 +189,13 @@ segment_v2::CompressionTypePB&
ExchangeSinkLocalState::compression_type() {
ExchangeSinkOperatorX::ExchangeSinkOperatorX(
const int id, RuntimeState* state, ObjectPool* pool, const
RowDescriptor& row_desc,
const TDataStreamSink& sink, const
std::vector<TPlanFragmentDestination>& destinations,
- int per_channel_buffer_size, bool
send_query_statistics_with_every_batch,
- PipelineXFragmentContext* context)
+ bool send_query_statistics_with_every_batch, PipelineXFragmentContext*
context)
: DataSinkOperatorX(id),
_context(context),
_pool(pool),
_row_desc(row_desc),
_part_type(sink.output_partition.type),
_dests(destinations),
- _per_channel_buffer_size(per_channel_buffer_size),
_send_query_statistics_with_every_batch(send_query_statistics_with_every_batch),
_dest_node_id(sink.dest_node_id),
_transfer_large_data_by_brpc(config::transfer_large_data_by_brpc) {
@@ -213,7 +210,7 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
ExchangeSinkOperatorX::ExchangeSinkOperatorX(
const int id, ObjectPool* pool, const RowDescriptor& row_desc,
PlanNodeId dest_node_id,
- const std::vector<TPlanFragmentDestination>& destinations, int
per_channel_buffer_size,
+ const std::vector<TPlanFragmentDestination>& destinations,
bool send_query_statistics_with_every_batch, PipelineXFragmentContext*
context)
: DataSinkOperatorX(id),
_context(context),
@@ -221,7 +218,6 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
_row_desc(row_desc),
_part_type(TPartitionType::UNPARTITIONED),
_dests(destinations),
- _per_channel_buffer_size(per_channel_buffer_size),
_send_query_statistics_with_every_batch(send_query_statistics_with_every_batch),
_dest_node_id(dest_node_id) {
_cur_pb_block = &_pb_block1;
@@ -230,14 +226,12 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
ExchangeSinkOperatorX::ExchangeSinkOperatorX(const int id, ObjectPool* pool,
const RowDescriptor& row_desc,
- int per_channel_buffer_size,
bool
send_query_statistics_with_every_batch,
PipelineXFragmentContext* context)
: DataSinkOperatorX(id),
_context(context),
_pool(pool),
_row_desc(row_desc),
- _per_channel_buffer_size(per_channel_buffer_size),
_send_query_statistics_with_every_batch(send_query_statistics_with_every_batch),
_dest_node_id(0) {
_cur_pb_block = &_pb_block1;
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h
b/be/src/pipeline/exec/exchange_sink_operator.h
index 45114d40d1..b6d92d5829 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -158,15 +158,15 @@ public:
ExchangeSinkOperatorX(const int id, RuntimeState* state, ObjectPool* pool,
const RowDescriptor& row_desc, const
TDataStreamSink& sink,
const std::vector<TPlanFragmentDestination>&
destinations,
- int per_channel_buffer_size, bool
send_query_statistics_with_every_batch,
+ bool send_query_statistics_with_every_batch,
PipelineXFragmentContext* context);
ExchangeSinkOperatorX(const int id, ObjectPool* pool, const RowDescriptor&
row_desc,
PlanNodeId dest_node_id,
const std::vector<TPlanFragmentDestination>&
destinations,
- int per_channel_buffer_size, bool
send_query_statistics_with_every_batch,
+ bool send_query_statistics_with_every_batch,
PipelineXFragmentContext* context);
ExchangeSinkOperatorX(const int id, ObjectPool* pool, const RowDescriptor&
row_desc,
- int per_channel_buffer_size, bool
send_query_statistics_with_every_batch,
+ bool send_query_statistics_with_every_batch,
PipelineXFragmentContext* context);
Status init(const TDataSink& tsink) override;
@@ -223,7 +223,6 @@ private:
vectorized::VExprContextSPtrs _partition_expr_ctxs;
const std::vector<TPlanFragmentDestination> _dests;
- const int _per_channel_buffer_size;
const bool _send_query_statistics_with_every_batch;
std::unique_ptr<MemTracker> _mem_tracker;
diff --git a/be/src/pipeline/exec/operator.cpp
b/be/src/pipeline/exec/operator.cpp
index 32a78f104b..0ea73dea54 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -19,6 +19,7 @@
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
+#include "vec/utils/util.hpp"
namespace doris {
class RowDescriptor;
@@ -153,9 +154,62 @@ Status PipelineXLocalState::init(RuntimeState* state,
LocalStateInfo& /*info*/)
profile()->total_time_counter()),
"");
_mem_tracker = std::make_unique<MemTracker>("PipelineXLocalState:" +
_runtime_profile->name());
+ _memory_used_counter = ADD_LABEL_COUNTER(_runtime_profile, "MemoryUsage");
+ _peak_memory_usage_counter = _runtime_profile->AddHighWaterMarkCounter(
+ "PeakMemoryUsage", TUnit::BYTES, "MemoryUsage");
return Status::OK();
}
+void PipelineXLocalState::clear_origin_block() {
+
_origin_block.clear_column_data(_parent->_row_descriptor.num_materialized_slots());
+}
+
+Status OperatorXBase::do_projections(RuntimeState* state, vectorized::Block*
origin_block,
+ vectorized::Block* output_block) {
+ auto local_state = state->get_local_state(id());
+ SCOPED_TIMER(local_state->_projection_timer);
+ using namespace vectorized;
+ vectorized::MutableBlock mutable_block =
+
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(output_block,
+
*_output_row_descriptor);
+ auto rows = origin_block->rows();
+
+ if (rows != 0) {
+ auto& mutable_columns = mutable_block.mutable_columns();
+ DCHECK(mutable_columns.size() == _projections.size());
+ for (int i = 0; i < mutable_columns.size(); ++i) {
+ auto result_column_id = -1;
+ RETURN_IF_ERROR(_projections[i]->execute(origin_block,
&result_column_id));
+ auto column_ptr = origin_block->get_by_position(result_column_id)
+
.column->convert_to_full_column_if_const();
+ //TODO: this is a quick fix, we need a new function like
"change_to_nullable" to do it
+ if (mutable_columns[i]->is_nullable() xor
column_ptr->is_nullable()) {
+ DCHECK(mutable_columns[i]->is_nullable() &&
!column_ptr->is_nullable());
+ reinterpret_cast<ColumnNullable*>(mutable_columns[i].get())
+ ->insert_range_from_not_nullable(*column_ptr, 0, rows);
+ } else {
+ mutable_columns[i]->insert_range_from(*column_ptr, 0, rows);
+ }
+ }
+ DCHECK(mutable_block.rows() == rows);
+ }
+
+ return Status::OK();
+}
+
+Status OperatorXBase::get_next_after_projects(RuntimeState* state,
vectorized::Block* block,
+ SourceState& source_state) {
+ auto local_state = state->get_local_state(id());
+ if (_output_row_descriptor) {
+ local_state->clear_origin_block();
+ auto status = get_block(state, &local_state->_origin_block,
source_state);
+ if (UNLIKELY(!status.ok())) return status;
+ return do_projections(state, &local_state->_origin_block, block);
+ }
+
local_state->_peak_memory_usage_counter->set(local_state->_mem_tracker->peak_consumption());
+ return get_block(state, block, source_state);
+}
+
bool PipelineXLocalState::reached_limit() const {
return _parent->_limit != -1 && _num_rows_returned >= _parent->_limit;
}
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 913d277638..844fd02ad4 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -492,6 +492,7 @@ public:
_rows_returned_counter(nullptr),
_rows_returned_rate(nullptr),
_memory_used_counter(nullptr),
+ _peak_memory_usage_counter(nullptr),
_parent(parent),
_state(state) {}
virtual ~PipelineXLocalState() {}
@@ -506,6 +507,9 @@ public:
return reinterpret_cast<const TARGET&>(*this);
}
+ // If use projection, we should clear `_origin_block`.
+ void clear_origin_block();
+
bool reached_limit() const;
void reached_limit(vectorized::Block* block, bool* eos);
RuntimeProfile* profile() { return _runtime_profile.get(); }
@@ -541,6 +545,8 @@ protected:
// Account for peak memory used by this node
RuntimeProfile::Counter* _memory_used_counter;
RuntimeProfile::Counter* _projection_timer;
+ // Account for peak memory used by this node
+ RuntimeProfile::Counter* _peak_memory_usage_counter;
OpentelemetrySpan _span;
OperatorXBase* _parent;
@@ -548,6 +554,7 @@ protected:
vectorized::VExprContextSPtrs _conjuncts;
vectorized::VExprContextSPtrs _projections;
bool _init = false;
+ vectorized::Block _origin_block;
};
class OperatorXBase : public OperatorBase {
@@ -645,6 +652,13 @@ public:
virtual bool is_source() const override { return false; }
+ Status get_next_after_projects(RuntimeState* state, vectorized::Block*
block,
+ SourceState& source_state);
+
+ /// Only use in vectorized exec engine try to do projections to trans
_row_desc -> _output_row_desc
+ Status do_projections(RuntimeState* state, vectorized::Block* origin_block,
+ vectorized::Block* output_block);
+
protected:
friend class PipelineXLocalState;
int _id; // unique w/in single plan tree
@@ -656,7 +670,6 @@ protected:
OperatorXBase* _children;
RowDescriptor _row_descriptor;
- vectorized::Block _origin_block;
std::unique_ptr<RowDescriptor> _output_row_descriptor;
vectorized::VExprContextSPtrs _projections;
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index d37410edb1..871c554781 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -794,7 +794,7 @@ Status PipelineFragmentContext::_create_sink(int sender_id,
const TDataSink& thr
_multi_cast_stream_sink_senders[i].reset(new
vectorized::VDataStreamSender(
_runtime_state.get(), _runtime_state->obj_pool(),
sender_id, row_desc,
thrift_sink.multi_cast_stream_sink.sinks[i],
- thrift_sink.multi_cast_stream_sink.destinations[i], 16 *
1024, false));
+ thrift_sink.multi_cast_stream_sink.destinations[i],
false));
// 2. create and set the source operator of
multi_cast_data_stream_source for new pipeline
OperatorBuilderPtr source_op =
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 1e9d3d3f95..a182934bd3 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -113,11 +113,10 @@ void PipelineXFragmentContext::cancel(const
PPlanFragmentCancelReason& reason,
_exec_status = Status::Cancelled(msg);
}
- for (auto& rs : _runtime_states) {
- rs->set_is_cancelled(true, msg);
- rs->set_process_status(_exec_status);
- _exec_env->vstream_mgr()->cancel(rs->fragment_instance_id());
- }
+ FOR_EACH_RUNTIME_STATE(
+ runtime_state->set_is_cancelled(true, msg);
+ runtime_state->set_process_status(_exec_status);
+
_exec_env->vstream_mgr()->cancel(runtime_state->fragment_instance_id());)
LOG(WARNING) << "PipelineFragmentContext Canceled. reason=" << msg;
@@ -236,9 +235,9 @@ Status
PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
params.__isset.send_query_statistics_with_every_batch
? params.send_query_statistics_with_every_batch
: false;
- _sink.reset(new ExchangeSinkOperatorX(
- _sink_idx++, state, pool, row_desc, thrift_sink.stream_sink,
params.destinations,
- 16 * 1024, send_query_statistics_with_every_batch, this));
+ _sink.reset(new ExchangeSinkOperatorX(_sink_idx++, state, pool,
row_desc,
+ thrift_sink.stream_sink,
params.destinations,
+
send_query_statistics_with_every_batch, this));
break;
}
case TDataSinkType::RESULT_SINK: {
@@ -248,7 +247,8 @@ Status
PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
// TODO: figure out good buffer size based on size of output row
_sink.reset(new ResultSinkOperatorX(_sink_idx++, row_desc,
output_exprs,
- thrift_sink.result_sink, 4096));
+ thrift_sink.result_sink,
+
vectorized::RESULT_SINK_BUFFER_SIZE));
break;
}
default:
@@ -379,8 +379,7 @@ Status
PipelineXFragmentContext::_build_pipelines(ObjectPool* pool,
const DescriptorTbl& descs,
OperatorXPtr* root,
PipelinePtr cur_pipe) {
if (request.fragment.plan.nodes.size() == 0) {
- *root = nullptr;
- return Status::OK();
+ throw Exception(ErrorCode::INTERNAL_ERROR, "Invalid plan which has no
plan node!");
}
int node_idx = 0;
@@ -405,7 +404,9 @@ Status
PipelineXFragmentContext::_create_tree_helper(ObjectPool* pool,
// propagate error case
if (*node_idx >= tnodes.size()) {
// TODO: print thrift msg
- return Status::InternalError("Failed to reconstruct plan tree from
thrift.");
+ return Status::InternalError(
+ "Failed to reconstruct plan tree from thrift. Node id: {},
number of nodes: {}",
+ *node_idx, tnodes.size());
}
const TPlanNode& tnode = tnodes[*node_idx];
@@ -429,7 +430,9 @@ Status
PipelineXFragmentContext::_create_tree_helper(ObjectPool* pool,
// this means we have been given a bad tree and must fail
if (*node_idx >= tnodes.size()) {
// TODO: print thrift msg
- return Status::InternalError("Failed to reconstruct plan tree from
thrift.");
+ return Status::InternalError(
+ "Failed to reconstruct plan tree from thrift. Node id: {},
number of nodes: {}",
+ *node_idx, tnodes.size());
}
}
@@ -518,12 +521,9 @@ Status PipelineXFragmentContext::submit() {
}
void PipelineXFragmentContext::close_sink() {
- if (_prepared) {
- FOR_EACH_RUNTIME_STATE(
- _sink->close(runtime_state.get(),
Status::RuntimeError("prepare failed"));)
- } else {
- FOR_EACH_RUNTIME_STATE(_sink->close(runtime_state.get(),
Status::OK());)
- }
+ FOR_EACH_RUNTIME_STATE(
+ _sink->close(runtime_state.get(),
+ _prepared ? Status::RuntimeError("prepare failed") :
Status::OK()););
}
void PipelineXFragmentContext::close_if_prepare_failed() {
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index c3da4e3e47..250bc610a9 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -170,7 +170,7 @@ Status PipelineXTask::execute(bool* eos) {
{
SCOPED_TIMER(_get_block_timer);
_get_block_counter->update(1);
- RETURN_IF_ERROR(_root->get_block(_state, block, _data_state));
+ RETURN_IF_ERROR(_root->get_next_after_projects(_state, block,
_data_state));
}
*eos = _data_state == SourceState::FINISHED;
if (_block->rows() != 0 || *eos) {
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp
b/be/src/vec/sink/vdata_stream_sender.cpp
index e19fcd3fa3..bc749f7f21 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -292,7 +292,6 @@ void Channel<Parent>::ch_roll_pb_block() {
VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool,
int sender_id,
const RowDescriptor& row_desc, const
TDataStreamSink& sink,
const
std::vector<TPlanFragmentDestination>& destinations,
- int per_channel_buffer_size,
bool
send_query_statistics_with_every_batch)
: _sender_id(sender_id),
_state(state),
@@ -330,13 +329,13 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state,
ObjectPool* pool, int
if (_enable_pipeline_exec) {
_channel_shared_ptrs.emplace_back(new
PipChannel<VDataStreamSender>(
this, row_desc, destinations[i].brpc_server,
fragment_instance_id,
- sink.dest_node_id, per_channel_buffer_size,
is_transfer_chain,
+ sink.dest_node_id, is_transfer_chain,
send_query_statistics_with_every_batch));
} else {
- _channel_shared_ptrs.emplace_back(new Channel(
- this, row_desc, destinations[i].brpc_server,
fragment_instance_id,
- sink.dest_node_id, per_channel_buffer_size,
is_transfer_chain,
- send_query_statistics_with_every_batch));
+ _channel_shared_ptrs.emplace_back(
+ new Channel(this, row_desc,
destinations[i].brpc_server,
+ fragment_instance_id, sink.dest_node_id,
is_transfer_chain,
+ send_query_statistics_with_every_batch));
}
fragment_id_to_channel_index.emplace(fragment_instance_id.lo,
_channel_shared_ptrs.size() -
1);
@@ -358,7 +357,6 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state,
ObjectPool* pool, int
VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool,
int sender_id,
const RowDescriptor& row_desc, PlanNodeId
dest_node_id,
const
std::vector<TPlanFragmentDestination>& destinations,
- int per_channel_buffer_size,
bool
send_query_statistics_with_every_batch)
: _sender_id(sender_id),
_state(state),
@@ -388,8 +386,7 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state,
ObjectPool* pool, int
fragment_id_to_channel_index.end()) {
_channel_shared_ptrs.emplace_back(
new Channel(this, row_desc, destinations[i].brpc_server,
fragment_instance_id,
- _dest_node_id, per_channel_buffer_size, false,
- send_query_statistics_with_every_batch));
+ _dest_node_id, false,
send_query_statistics_with_every_batch));
}
fragment_id_to_channel_index.emplace(fragment_instance_id.lo,
_channel_shared_ptrs.size() - 1);
diff --git a/be/src/vec/sink/vdata_stream_sender.h
b/be/src/vec/sink/vdata_stream_sender.h
index 0015839efb..5ec30b7cd4 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -100,12 +100,12 @@ public:
VDataStreamSender(RuntimeState* state, ObjectPool* pool, int sender_id,
const RowDescriptor& row_desc, const TDataStreamSink&
sink,
const std::vector<TPlanFragmentDestination>&
destinations,
- int per_channel_buffer_size, bool
send_query_statistics_with_every_batch);
+ bool send_query_statistics_with_every_batch);
VDataStreamSender(RuntimeState* state, ObjectPool* pool, int sender_id,
const RowDescriptor& row_desc, PlanNodeId dest_node_id,
const std::vector<TPlanFragmentDestination>&
destinations,
- int per_channel_buffer_size, bool
send_query_statistics_with_every_batch);
+ bool send_query_statistics_with_every_batch);
~VDataStreamSender() override;
@@ -241,8 +241,8 @@ public:
// how much tuple data is getting accumulated before being sent; it only
applies
// when data is added via add_row() and not sent directly via send_batch().
Channel(Parent* parent, const RowDescriptor& row_desc, const
TNetworkAddress& brpc_dest,
- const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id,
int buffer_size,
- bool is_transfer_chain, bool
send_query_statistics_with_every_batch)
+ const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id,
bool is_transfer_chain,
+ bool send_query_statistics_with_every_batch)
: _parent(parent),
_row_desc(row_desc),
_fragment_instance_id(fragment_instance_id),
@@ -450,11 +450,10 @@ template <typename Parent = VDataStreamSender>
class PipChannel final : public Channel<Parent> {
public:
PipChannel(Parent* parent, const RowDescriptor& row_desc, const
TNetworkAddress& brpc_dest,
- const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id,
int buffer_size,
+ const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id,
bool is_transfer_chain, bool
send_query_statistics_with_every_batch)
: Channel<Parent>(parent, row_desc, brpc_dest,
fragment_instance_id, dest_node_id,
- buffer_size, is_transfer_chain,
- send_query_statistics_with_every_batch) {
+ is_transfer_chain,
send_query_statistics_with_every_batch) {
ch_roll_pb_block();
}
diff --git a/be/src/vec/sink/vresult_file_sink.cpp
b/be/src/vec/sink/vresult_file_sink.cpp
index 5b2003d8c2..b388f450ce 100644
--- a/be/src/vec/sink/vresult_file_sink.cpp
+++ b/be/src/vec/sink/vresult_file_sink.cpp
@@ -47,7 +47,6 @@ namespace doris::vectorized {
VResultFileSink::VResultFileSink(RuntimeState* state, ObjectPool* pool,
const RowDescriptor& row_desc, const
TResultFileSink& sink,
- int per_channel_buffer_size,
bool send_query_statistics_with_every_batch,
const std::vector<TExpr>& t_output_expr)
: _t_output_expr(t_output_expr), _row_desc(row_desc) {
@@ -66,7 +65,6 @@ VResultFileSink::VResultFileSink(RuntimeState* state,
ObjectPool* pool,
VResultFileSink::VResultFileSink(RuntimeState* state, ObjectPool* pool, int
sender_id,
const RowDescriptor& row_desc, const
TResultFileSink& sink,
const std::vector<TPlanFragmentDestination>&
destinations,
- int per_channel_buffer_size,
bool send_query_statistics_with_every_batch,
const std::vector<TExpr>& t_output_expr,
DescriptorTbl& descs)
: _t_output_expr(t_output_expr),
@@ -79,7 +77,7 @@ VResultFileSink::VResultFileSink(RuntimeState* state,
ObjectPool* pool, int send
_is_top_sink = false;
CHECK_EQ(destinations.size(), 1);
_stream_sender.reset(new VDataStreamSender(state, pool, sender_id,
row_desc, sink.dest_node_id,
- destinations,
per_channel_buffer_size,
+ destinations,
send_query_statistics_with_every_batch));
_name = "VResultFileSink";
diff --git a/be/src/vec/sink/vresult_file_sink.h
b/be/src/vec/sink/vresult_file_sink.h
index c8d2f3be18..6a9d89f773 100644
--- a/be/src/vec/sink/vresult_file_sink.h
+++ b/be/src/vec/sink/vresult_file_sink.h
@@ -47,13 +47,12 @@ class VExprContext;
class VResultFileSink : public DataSink {
public:
VResultFileSink(RuntimeState* state, ObjectPool* pool, const
RowDescriptor& row_desc,
- const TResultFileSink& sink, int per_channel_buffer_size,
- bool send_query_statistics_with_every_batch,
+ const TResultFileSink& sink, bool
send_query_statistics_with_every_batch,
const std::vector<TExpr>& t_output_expr);
VResultFileSink(RuntimeState* state, ObjectPool* pool, int sender_id,
const RowDescriptor& row_desc, const TResultFileSink& sink,
const std::vector<TPlanFragmentDestination>& destinations,
- int per_channel_buffer_size, bool
send_query_statistics_with_every_batch,
+ bool send_query_statistics_with_every_batch,
const std::vector<TExpr>& t_output_expr, DescriptorTbl&
descs);
~VResultFileSink() override = default;
Status init(const TDataSink& thrift_sink) override;
diff --git a/be/src/vec/sink/vresult_sink.h b/be/src/vec/sink/vresult_sink.h
index de1126b2e1..3da07fe804 100644
--- a/be/src/vec/sink/vresult_sink.h
+++ b/be/src/vec/sink/vresult_sink.h
@@ -118,6 +118,8 @@ struct ResultFileOptions {
}
};
+constexpr int RESULT_SINK_BUFFER_SIZE = 4096;
+
class VResultSink : public DataSink {
public:
friend class pipeline::ResultSinkOperator;
diff --git a/be/test/vec/runtime/vdata_stream_test.cpp
b/be/test/vec/runtime/vdata_stream_test.cpp
index 6c10b89aa7..05c50023b2 100644
--- a/be/test/vec/runtime/vdata_stream_test.cpp
+++ b/be/test/vec/runtime/vdata_stream_test.cpp
@@ -188,11 +188,9 @@ TEST_F(VDataStreamTest, BasicTest) {
dest.__set_server(addr);
dests.push_back(dest);
}
- int per_channel_buffer_size = 1024 * 1024;
bool send_query_statistics_with_every_batch = false;
VDataStreamSender sender(&runtime_stat, &_object_pool, sender_id,
row_desc, tsink.stream_sink,
- dests, per_channel_buffer_size,
- send_query_statistics_with_every_batch);
+ dests, send_query_statistics_with_every_batch);
sender.set_query_statistics(std::make_shared<QueryStatistics>());
sender.init(tsink);
sender.prepare(&runtime_stat);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]