This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new de62c00f4e4 [fix](move-memtable) init auto partition context in
VRowDistribution::open (#26911)
de62c00f4e4 is described below
commit de62c00f4e4414c1f83ecd2db9b216f1bfc52100
Author: Kaijie Chen <[email protected]>
AuthorDate: Tue Nov 14 08:16:14 2023 +0800
[fix](move-memtable) init auto partition context in VRowDistribution::open
(#26911)
---
be/src/vec/sink/vrow_distribution.h | 16 ++++++++++++++++
be/src/vec/sink/vtablet_sink_v2.cpp | 10 ++++++++--
be/src/vec/sink/vtablet_sink_v2.h | 3 ++-
be/src/vec/sink/writer/vtablet_writer.cpp | 18 +++++-------------
be/src/vec/sink/writer/vtablet_writer.h | 4 +---
5 files changed, 32 insertions(+), 19 deletions(-)
diff --git a/be/src/vec/sink/vrow_distribution.h
b/be/src/vec/sink/vrow_distribution.h
index 5da964d44fc..3376eb5ab6f 100644
--- a/be/src/vec/sink/vrow_distribution.h
+++ b/be/src/vec/sink/vrow_distribution.h
@@ -88,6 +88,22 @@ public:
_schema = ctx->schema;
}
+ Status open(RowDescriptor* output_row_desc) {
+ if (_vpartition->is_auto_partition()) {
+ auto [part_ctx, part_func] = _get_partition_function();
+ RETURN_IF_ERROR(part_ctx->prepare(_state, *output_row_desc));
+ RETURN_IF_ERROR(part_ctx->open(_state));
+ }
+ for (auto& index : _schema->indexes()) {
+ auto& where_clause = index->where_clause;
+ if (where_clause != nullptr) {
+ RETURN_IF_ERROR(where_clause->prepare(_state,
*output_row_desc));
+ RETURN_IF_ERROR(where_clause->open(_state));
+ }
+ }
+ return Status::OK();
+ }
+
// auto partition
// mv where clause
// v1 needs index->node->row_ids - tabletids
diff --git a/be/src/vec/sink/vtablet_sink_v2.cpp
b/be/src/vec/sink/vtablet_sink_v2.cpp
index 08ddfb03811..3aa23e10595 100644
--- a/be/src/vec/sink/vtablet_sink_v2.cpp
+++ b/be/src/vec/sink/vtablet_sink_v2.cpp
@@ -91,7 +91,7 @@ static Status on_partitions_created(void* writer,
TCreatePartitionResult* result
return
static_cast<VOlapTableSinkV2*>(writer)->on_partitions_created(result);
}
-void VOlapTableSinkV2::_init_row_distribution() {
+Status VOlapTableSinkV2::_init_row_distribution() {
VRowDistributionContext ctx;
ctx.state = _state;
@@ -108,6 +108,10 @@ void VOlapTableSinkV2::_init_row_distribution() {
ctx.schema = _schema;
_row_distribution.init(&ctx);
+
+ RETURN_IF_ERROR(_row_distribution.open(_output_row_desc));
+
+ return Status::OK();
}
Status VOlapTableSinkV2::init(const TDataSink& t_sink) {
@@ -174,6 +178,8 @@ Status VOlapTableSinkV2::prepare(RuntimeState* state) {
_block_convertor =
std::make_unique<OlapTableBlockConvertor>(_output_tuple_desc);
_block_convertor->init_autoinc_info(_schema->db_id(), _schema->table_id(),
_state->batch_size());
+ _output_row_desc = _pool->add(new RowDescriptor(_output_tuple_desc,
false));
+
// add all counter
_input_rows_counter = ADD_COUNTER(_profile, "RowsRead", TUnit::UNIT);
_output_rows_counter = ADD_COUNTER(_profile, "RowsReturned", TUnit::UNIT);
@@ -209,7 +215,7 @@ Status VOlapTableSinkV2::open(RuntimeState* state) {
_build_tablet_node_mapping();
RETURN_IF_ERROR(_open_streams(state->backend_id()));
- _init_row_distribution();
+ RETURN_IF_ERROR(_init_row_distribution());
return Status::OK();
}
diff --git a/be/src/vec/sink/vtablet_sink_v2.h
b/be/src/vec/sink/vtablet_sink_v2.h
index 1a67ea581ec..6f369286677 100644
--- a/be/src/vec/sink/vtablet_sink_v2.h
+++ b/be/src/vec/sink/vtablet_sink_v2.h
@@ -119,7 +119,7 @@ public:
Status on_partitions_created(TCreatePartitionResult* result);
private:
- void _init_row_distribution();
+ Status _init_row_distribution();
Status _open_streams(int64_t src_id);
@@ -149,6 +149,7 @@ private:
// this is tuple descriptor of destination OLAP table
TupleDescriptor* _output_tuple_desc = nullptr;
+ RowDescriptor* _output_row_desc = nullptr;
// number of senders used to insert into OlapTable, if we only support
single node insert,
// all data from select should collectted and then send to OlapTable.
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp
b/be/src/vec/sink/writer/vtablet_writer.cpp
index f3270c4b9ff..22b72f28cb4 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -1063,7 +1063,7 @@ static Status on_partitions_created(void* writer,
TCreatePartitionResult* result
return static_cast<VTabletWriter*>(writer)->on_partitions_created(result);
}
-void VTabletWriter::_init_row_distribution() {
+Status VTabletWriter::_init_row_distribution() {
VRowDistributionContext ctx;
ctx.state = _state;
@@ -1080,6 +1080,9 @@ void VTabletWriter::_init_row_distribution() {
ctx.schema = _schema;
_row_distribution.init(&ctx);
+
+ RETURN_IF_ERROR(_row_distribution.open(_output_row_desc));
+ return Status::OK();
}
Status VTabletWriter::_init(RuntimeState* state, RuntimeProfile* profile) {
@@ -1218,19 +1221,13 @@ Status VTabletWriter::_init(RuntimeState* state,
RuntimeProfile* profile) {
RETURN_IF_ERROR(_channels.back()->init(state, tablets));
}
- // prepare for auto partition functions
- if (_vpartition->is_auto_partition()) {
- auto [part_ctx, part_func] = _get_partition_function();
- RETURN_IF_ERROR(part_ctx->prepare(_state, *_output_row_desc));
- RETURN_IF_ERROR(part_ctx->open(_state));
- }
if (_group_commit) {
RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->add_wal_path(_db_id,
_tb_id, _wal_id,
_state->import_label()));
RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->create_wal_writer(_wal_id,
_wal_writer));
}
- _init_row_distribution();
+ RETURN_IF_ERROR(_init_row_distribution());
_inited = true;
return Status::OK();
@@ -1287,11 +1284,6 @@ Status VTabletWriter::_incremental_open_node_channel(
return Status::OK();
}
-std::pair<vectorized::VExprContextSPtr, vectorized::VExprSPtr>
-VTabletWriter::_get_partition_function() {
- return {_vpartition->get_part_func_ctx(),
_vpartition->get_partition_function()};
-}
-
Status VTabletWriter::_cancel_channel_and_check_intolerable_failure(
Status status, const std::string& err_msg, const
std::shared_ptr<IndexChannel> ich,
const std::shared_ptr<VNodeChannel> nch) {
diff --git a/be/src/vec/sink/writer/vtablet_writer.h
b/be/src/vec/sink/writer/vtablet_writer.h
index ea998a0f0b4..3b8df40beee 100644
--- a/be/src/vec/sink/writer/vtablet_writer.h
+++ b/be/src/vec/sink/writer/vtablet_writer.h
@@ -549,7 +549,7 @@ private:
using ChannelDistributionPayload = std::unordered_map<VNodeChannel*,
Payload>;
using ChannelDistributionPayloadVec =
std::vector<std::unordered_map<VNodeChannel*, Payload>>;
- void _init_row_distribution();
+ Status _init_row_distribution();
Status _init(RuntimeState* state, RuntimeProfile* profile);
@@ -564,8 +564,6 @@ private:
const
std::shared_ptr<IndexChannel> ich,
const
std::shared_ptr<VNodeChannel> nch);
- std::pair<vectorized::VExprContextSPtr, vectorized::VExprSPtr>
_get_partition_function();
-
void _cancel_all_channel(Status status);
void _save_missing_values(vectorized::ColumnPtr col,
vectorized::DataTypePtr value_type,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]