This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 331d2d36b71 [branch-2.0-pick] "[refactor](partial-update) Split
partial update infos from tablet schema #25147" (#25754)
331d2d36b71 is described below
commit 331d2d36b71842c8a55ab3fdee06d40baebace0a
Author: bobhan1 <[email protected]>
AuthorDate: Mon Oct 23 18:00:10 2023 +0800
[branch-2.0-pick] "[refactor](partial-update) Split partial update infos
from tablet schema #25147" (#25754)
---
be/src/olap/compaction.cpp | 2 +-
be/src/olap/delta_writer.cpp | 18 ++++---
be/src/olap/delta_writer.h | 7 +++
be/src/olap/memtable.cpp | 21 ++++----
be/src/olap/memtable.h | 3 ++
be/src/olap/partial_update_info.h | 54 +++++++++++++++++++
be/src/olap/rowset/beta_rowset_writer.h | 8 +++
be/src/olap/rowset/rowset_writer.h | 4 ++
be/src/olap/rowset/rowset_writer_context.h | 5 ++
be/src/olap/rowset/segment_v2/segment_writer.cpp | 23 ++++----
be/src/olap/tablet.cpp | 24 +++++----
be/src/olap/tablet.h | 7 ++-
be/src/olap/tablet_schema.cpp | 68 +-----------------------
be/src/olap/tablet_schema.h | 24 +--------
be/src/olap/txn_manager.cpp | 18 ++++---
be/src/olap/txn_manager.h | 5 +-
be/src/service/backend_service.cpp | 2 +-
gensrc/proto/olap_file.proto | 4 +-
18 files changed, 154 insertions(+), 143 deletions(-)
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index c7cdf887b15..a03b0e3fd17 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -684,7 +684,7 @@ Status Compaction::modify_rowsets(const Merger::Statistics*
stats) {
StorageEngine::instance()->txn_manager()->set_txn_related_delete_bitmap(
it.partition_id, it.transaction_id,
_tablet->tablet_id(),
_tablet->schema_hash(), _tablet->tablet_uid(),
true, it.delete_bitmap,
- it.rowset_ids);
+ it.rowset_ids, nullptr);
}
}
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 5d21d1b7344..87316c3cfba 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -218,6 +218,7 @@ Status DeltaWriter::init() {
context.write_type = DataWriteType::TYPE_DIRECT;
context.mow_context = std::make_shared<MowContext>(_cur_max_version,
_req.txn_id, _rowset_ids,
_delete_bitmap);
+ context.partial_update_info = _partial_update_info;
RETURN_IF_ERROR(_tablet->create_rowset_writer(context, &_rowset_writer));
_schema.reset(new Schema(_tablet_schema));
@@ -365,7 +366,8 @@ void DeltaWriter::_reset_mem_table() {
_delete_bitmap);
_mem_table.reset(new MemTable(_tablet, _schema.get(),
_tablet_schema.get(), _req.slots,
_req.tuple_desc, _rowset_writer.get(),
mow_context,
- mem_table_insert_tracker,
mem_table_flush_tracker));
+ _partial_update_info.get(),
mem_table_insert_tracker,
+ mem_table_flush_tracker));
COUNTER_UPDATE(_segment_num, 1);
_mem_table->set_callback([this](MemTableStat& stat) {
@@ -481,7 +483,7 @@ Status DeltaWriter::submit_calc_delete_bitmap_task() {
// For partial update, we need to fill in the entire row of data, during
the calculation
// of the delete bitmap. This operation is resource-intensive, and we need
to minimize
// the number of times it occurs. Therefore, we skip this operation here.
- if (_cur_rowset->tablet_schema()->is_partial_update()) {
+ if (_partial_update_info->is_partial_update) {
return Status::OK();
}
@@ -493,8 +495,7 @@ Status DeltaWriter::submit_calc_delete_bitmap_task() {
}
Status DeltaWriter::wait_calc_delete_bitmap() {
- if (!_tablet->enable_unique_key_merge_on_write() ||
- _cur_rowset->tablet_schema()->is_partial_update()) {
+ if (!_tablet->enable_unique_key_merge_on_write() ||
_partial_update_info->is_partial_update) {
return Status::OK();
}
std::lock_guard<std::mutex> l(_lock);
@@ -536,7 +537,7 @@ Status DeltaWriter::commit_txn(const PSlaveTabletNodes&
slave_tablet_nodes,
if (_tablet->enable_unique_key_merge_on_write()) {
_storage_engine->txn_manager()->set_txn_related_delete_bitmap(
_req.partition_id, _req.txn_id, _tablet->tablet_id(),
_tablet->schema_hash(),
- _tablet->tablet_uid(), true, _delete_bitmap, _rowset_ids);
+ _tablet->tablet_uid(), true, _delete_bitmap, _rowset_ids,
_partial_update_info);
}
_delta_written_success = true;
@@ -654,9 +655,10 @@ void DeltaWriter::_build_current_tablet_schema(int64_t
index_id,
_tablet_schema->set_table_id(table_schema_param->table_id());
// set partial update columns info
-
_tablet_schema->set_partial_update_info(table_schema_param->is_partial_update(),
-
table_schema_param->partial_update_input_columns());
- _tablet_schema->set_is_strict_mode(table_schema_param->is_strict_mode());
+ _partial_update_info = std::make_shared<PartialUpdateInfo>();
+ _partial_update_info->init(*_tablet_schema,
table_schema_param->is_partial_update(),
+
table_schema_param->partial_update_input_columns(),
+ table_schema_param->is_strict_mode());
}
void DeltaWriter::_request_slave_tablet_pull_rowset(PNodeInfo node_info) {
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index aeef1c68661..ebdd76ec471 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -32,6 +32,7 @@
#include "common/status.h"
#include "olap/memtable.h"
#include "olap/olap_common.h"
+#include "olap/partial_update_info.h"
#include "olap/rowset/rowset.h"
#include "olap/tablet.h"
#include "olap/tablet_meta.h"
@@ -137,6 +138,10 @@ public:
// For UT
DeleteBitmapPtr get_delete_bitmap() { return _delete_bitmap; }
+ std::shared_ptr<PartialUpdateInfo> get_partial_update_info() const {
+ return _partial_update_info;
+ }
+
private:
DeltaWriter(WriteRequest* req, StorageEngine* storage_engine,
RuntimeProfile* profile,
const UniqueId& load_id);
@@ -198,6 +203,8 @@ private:
// total rows num written by DeltaWriter
int64_t _total_received_rows = 0;
+ std::shared_ptr<PartialUpdateInfo> _partial_update_info;
+
RuntimeProfile* _profile = nullptr;
RuntimeProfile::Counter* _lock_timer = nullptr;
RuntimeProfile::Counter* _sort_timer = nullptr;
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 2457c026224..880609b053f 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -67,6 +67,7 @@ using namespace ErrorCode;
MemTable::MemTable(TabletSharedPtr tablet, Schema* schema, const TabletSchema*
tablet_schema,
const std::vector<SlotDescriptor*>* slot_descs,
TupleDescriptor* tuple_desc,
RowsetWriter* rowset_writer, std::shared_ptr<MowContext>
mow_context,
+ PartialUpdateInfo* partial_update_info,
const std::shared_ptr<MemTracker>& insert_mem_tracker,
const std::shared_ptr<MemTracker>& flush_mem_tracker)
: _tablet(std::move(tablet)),
@@ -96,8 +97,11 @@ MemTable::MemTable(TabletSharedPtr tablet, Schema* schema,
const TabletSchema* t
// TODO: Support ZOrderComparator in the future
_init_columns_offset_by_slot_descs(slot_descs, tuple_desc);
_num_columns = _tablet_schema->num_columns();
- if (_tablet_schema->is_partial_update()) {
- _num_columns = _tablet_schema->partial_input_column_size();
+ if (partial_update_info != nullptr) {
+ _is_partial_update = partial_update_info->is_partial_update;
+ if (_is_partial_update) {
+ _num_columns =
partial_update_info->partial_update_input_columns.size();
+ }
}
}
void MemTable::_init_columns_offset_by_slot_descs(const
std::vector<SlotDescriptor*>* slot_descs,
@@ -201,7 +205,7 @@ void MemTable::insert(const vectorized::Block* input_block,
const std::vector<in
_init_agg_functions(&target_block);
}
if (_tablet_schema->has_sequence_col()) {
- if (_tablet_schema->is_partial_update()) {
+ if (_is_partial_update) {
// for unique key partial update, sequence column index in
block
// may be different with the index in `_tablet_schema`
for (size_t i = 0; i < cloneBlock.columns(); i++) {
@@ -440,8 +444,8 @@ void MemTable::shrink_memtable_by_agg() {
bool MemTable::need_flush() const {
auto max_size = config::write_buffer_size;
- if (_tablet_schema->is_partial_update()) {
- auto update_columns_size = _tablet_schema->partial_input_column_size();
+ if (_is_partial_update) {
+ auto update_columns_size = _num_columns;
max_size = max_size * update_columns_size /
_tablet_schema->num_columns();
max_size = max_size > 1048576 ? max_size : 1048576;
}
@@ -451,11 +455,6 @@ bool MemTable::need_flush() const {
bool MemTable::need_agg() const {
if (_keys_type == KeysType::AGG_KEYS) {
auto max_size = config::write_buffer_size_for_agg;
- if (_tablet_schema->is_partial_update()) {
- auto update_columns_size =
_tablet_schema->partial_input_column_size();
- max_size = max_size * update_columns_size /
_tablet_schema->num_columns();
- max_size = max_size > 1048576 ? max_size : 1048576;
- }
return memory_usage() >= max_size;
}
return false;
@@ -530,7 +529,7 @@ Status MemTable::_do_flush() {
// Unfold variant column
RETURN_IF_ERROR(unfold_variant_column(block, &ctx));
}
- if (!_tablet_schema->is_partial_update()) {
+ if (!_is_partial_update) {
ctx.generate_delete_bitmap = [this](size_t segment_id) {
return _generate_delete_bitmap(segment_id);
};
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index caabee46e77..9e3103364a5 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -31,6 +31,7 @@
#include "common/status.h"
#include "gutil/integral_types.h"
#include "olap/olap_common.h"
+#include "olap/partial_update_info.h"
#include "olap/tablet.h"
#include "olap/tablet_meta.h"
#include "runtime/memory/mem_tracker.h"
@@ -176,6 +177,7 @@ public:
MemTable(TabletSharedPtr tablet, Schema* schema, const TabletSchema*
tablet_schema,
const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor*
tuple_desc,
RowsetWriter* rowset_writer, std::shared_ptr<MowContext>
mow_context,
+ PartialUpdateInfo* partial_update_info,
const std::shared_ptr<MemTracker>& insert_mem_tracker,
const std::shared_ptr<MemTracker>& flush_mem_tracker);
~MemTable();
@@ -232,6 +234,7 @@ private:
private:
TabletSharedPtr _tablet;
const KeysType _keys_type;
+ bool _is_partial_update = false;
Schema* _schema;
const TabletSchema* _tablet_schema;
diff --git a/be/src/olap/partial_update_info.h
b/be/src/olap/partial_update_info.h
new file mode 100644
index 00000000000..cdea698b20d
--- /dev/null
+++ b/be/src/olap/partial_update_info.h
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "olap/tablet_schema.h"
+
+namespace doris {
+
+struct PartialUpdateInfo {
+ void init(const TabletSchema& tablet_schema, bool partial_update,
+ const std::set<string>& partial_update_cols, bool
is_strict_mode) {
+ is_partial_update = partial_update;
+ partial_update_input_columns = partial_update_cols;
+ missing_cids.clear();
+ update_cids.clear();
+ for (auto i = 0; i < tablet_schema.num_columns(); ++i) {
+ auto tablet_column = tablet_schema.column(i);
+ if (!partial_update_input_columns.contains(tablet_column.name())) {
+ missing_cids.emplace_back(i);
+ if (!tablet_column.has_default_value() &&
!tablet_column.is_nullable()) {
+ can_insert_new_rows_in_partial_update = false;
+ }
+ } else {
+ update_cids.emplace_back(i);
+ }
+ }
+ this->is_strict_mode = is_strict_mode;
+ }
+
+ bool is_partial_update {false};
+ std::set<std::string> partial_update_input_columns;
+ std::vector<uint32_t> missing_cids;
+ std::vector<uint32_t> update_cids;
+ // if key not exist in old rowset, use default value or null value for the
unmentioned cols
+ // to generate a new row, only available in non-strict mode
+ bool can_insert_new_rows_in_partial_update {true};
+ bool is_strict_mode {false};
+};
+} // namespace doris
diff --git a/be/src/olap/rowset/beta_rowset_writer.h
b/be/src/olap/rowset/beta_rowset_writer.h
index 5c9b075c4e6..03b5e7d85ba 100644
--- a/be/src/olap/rowset/beta_rowset_writer.h
+++ b/be/src/olap/rowset/beta_rowset_writer.h
@@ -126,6 +126,14 @@ public:
Status wait_flying_segcompaction() override;
+ std::shared_ptr<PartialUpdateInfo> get_partial_update_info() override {
+ return _context.partial_update_info;
+ }
+
+ bool is_partial_update() override {
+ return _context.partial_update_info &&
_context.partial_update_info->is_partial_update;
+ }
+
private:
Status _do_add_block(const vectorized::Block* block,
std::unique_ptr<segment_v2::SegmentWriter>*
segment_writer,
diff --git a/be/src/olap/rowset/rowset_writer.h
b/be/src/olap/rowset/rowset_writer.h
index 2ee14abd6ab..2e38069bb89 100644
--- a/be/src/olap/rowset/rowset_writer.h
+++ b/be/src/olap/rowset/rowset_writer.h
@@ -122,6 +122,10 @@ public:
virtual vectorized::schema_util::LocalSchemaChangeRecorder*
mutable_schema_change_recorder() = 0;
+ virtual std::shared_ptr<PartialUpdateInfo> get_partial_update_info() = 0;
+
+ virtual bool is_partial_update() = 0;
+
private:
DISALLOW_COPY_AND_ASSIGN(RowsetWriter);
};
diff --git a/be/src/olap/rowset/rowset_writer_context.h
b/be/src/olap/rowset/rowset_writer_context.h
index 32925e617cd..f77108ee413 100644
--- a/be/src/olap/rowset/rowset_writer_context.h
+++ b/be/src/olap/rowset/rowset_writer_context.h
@@ -21,6 +21,7 @@
#include "io/fs/file_system.h"
#include "olap/olap_define.h"
+#include "olap/partial_update_info.h"
#include "olap/tablet.h"
#include "olap/tablet_schema.h"
@@ -92,6 +93,10 @@ struct RowsetWriterContext {
std::shared_ptr<MowContext> mow_context;
// segcompaction for this RowsetWriter, disable it for some transient
writers
bool enable_segcompaction = true;
+
+ std::shared_ptr<PartialUpdateInfo> partial_update_info;
+
+ bool is_transient_rowset_writer = false;
};
} // namespace doris
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index 2d344fdaf8e..47387eceec2 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -344,9 +344,10 @@ Status
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
}
DCHECK(_tablet_schema->keys_type() == UNIQUE_KEYS &&
_opts.enable_unique_key_merge_on_write);
+ DCHECK(_opts.rowset_ctx->partial_update_info);
// find missing column cids
- std::vector<uint32_t> missing_cids = _tablet_schema->get_missing_cids();
- std::vector<uint32_t> including_cids = _tablet_schema->get_update_cids();
+ const auto& missing_cids =
_opts.rowset_ctx->partial_update_info->missing_cids;
+ const auto& including_cids =
_opts.rowset_ctx->partial_update_info->update_cids;
// create full block and fill with input columns
auto full_block = _tablet_schema->create_block();
@@ -432,7 +433,7 @@ Status
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
auto st = _tablet->lookup_row_key(key, have_input_seq_column,
specified_rowsets, &loc,
_mow_context->max_version,
segment_caches, &rowset);
if (st.is<KEY_NOT_FOUND>()) {
- if (_tablet_schema->is_strict_mode()) {
+ if (_opts.rowset_ctx->partial_update_info->is_strict_mode) {
++num_rows_filtered;
// delete the invalid newly inserted row
_mow_context->delete_bitmap->add({_opts.rowset_ctx->rowset_id,
_segment_id,
@@ -440,7 +441,7 @@ Status
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
segment_pos);
}
- if (!_tablet_schema->can_insert_new_rows_in_partial_update()) {
+ if
(!_opts.rowset_ctx->partial_update_info->can_insert_new_rows_in_partial_update)
{
return Status::InternalError(
"the unmentioned columns should have default value or
be nullable for "
"newly inserted rows in non-strict mode partial
update");
@@ -503,10 +504,9 @@ Status
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
}
// convert missing columns and send to column writer
- auto cids_missing = _tablet_schema->get_missing_cids();
_olap_data_convertor->set_source_content_with_specifid_columns(&full_block,
row_pos, num_rows,
-
cids_missing);
- for (auto cid : cids_missing) {
+
missing_cids);
+ for (auto cid : missing_cids) {
auto converted_result = _olap_data_convertor->convert_column_data(cid);
if (converted_result.first != Status::OK()) {
return converted_result.first;
@@ -556,8 +556,8 @@ Status
SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f
bool has_default_or_nullable,
const size_t& segment_start_pos) {
// create old value columns
- auto old_value_block = _tablet_schema->create_missing_columns_block();
- std::vector<uint32_t> cids_missing = _tablet_schema->get_missing_cids();
+ const auto& cids_missing =
_opts.rowset_ctx->partial_update_info->missing_cids;
+ auto old_value_block = _tablet_schema->create_block_by_cids(cids_missing);
CHECK(cids_missing.size() == old_value_block.columns());
auto mutable_old_columns = old_value_block.mutate_columns();
bool has_row_column = _tablet_schema->store_row_column();
@@ -663,7 +663,10 @@ Status
SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f
Status SegmentWriter::append_block(const vectorized::Block* block, size_t
row_pos,
size_t num_rows) {
- if (_tablet_schema->is_partial_update() && _opts.write_type ==
DataWriteType::TYPE_DIRECT) {
+ if (_opts.rowset_ctx->partial_update_info &&
+ _opts.rowset_ctx->partial_update_info->is_partial_update &&
+ _opts.write_type == DataWriteType::TYPE_DIRECT &&
+ !_opts.rowset_ctx->is_transient_rowset_writer) {
RETURN_IF_ERROR(append_block_with_partial_content(block, row_pos,
num_rows));
return Status::OK();
}
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 76552ff28b0..37bfedc3293 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -2004,14 +2004,14 @@ Status
Tablet::create_rowset_writer(RowsetWriterContext& context,
// create a rowset writer with rowset_id and seg_id
// after writer, merge this transient rowset with original rowset
-Status Tablet::create_transient_rowset_writer(RowsetSharedPtr rowset_ptr,
- std::unique_ptr<RowsetWriter>*
rowset_writer) {
+Status Tablet::create_transient_rowset_writer(
+ RowsetSharedPtr rowset_ptr, std::unique_ptr<RowsetWriter>*
rowset_writer,
+ std::shared_ptr<PartialUpdateInfo> partial_update_info) {
RowsetWriterContext context;
context.rowset_state = PREPARED;
context.segments_overlap = OVERLAPPING;
context.tablet_schema = std::make_shared<TabletSchema>();
context.tablet_schema->copy_from(*(rowset_ptr->tablet_schema()));
- context.tablet_schema->set_partial_update_info(false,
std::set<std::string>());
context.newest_write_timestamp = UnixSeconds();
context.tablet_id = table_id();
context.enable_segcompaction = false;
@@ -2019,6 +2019,8 @@ Status
Tablet::create_transient_rowset_writer(RowsetSharedPtr rowset_ptr,
// get the shared_ptr from tablet_manager.
context.tablet =
StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id());
context.write_type = DataWriteType::TYPE_DIRECT;
+ context.partial_update_info = partial_update_info;
+ context.is_transient_rowset_writer = true;
RETURN_IF_ERROR(
create_transient_rowset_writer(context, rowset_ptr->rowset_id(),
rowset_writer));
(*rowset_writer)->set_segment_start_id(rowset_ptr->num_segments());
@@ -2962,7 +2964,7 @@ Status Tablet::calc_segment_delete_bitmap(RowsetSharedPtr
rowset,
auto rowset_id = rowset->rowset_id();
Version dummy_version(end_version + 1, end_version + 1);
auto rowset_schema = rowset->tablet_schema();
- bool is_partial_update = rowset_schema->is_partial_update();
+ bool is_partial_update = rowset_writer &&
rowset_writer->is_partial_update();
// use for partial update
PartialUpdateReadPlan read_plan_ori;
PartialUpdateReadPlan read_plan_update;
@@ -3083,8 +3085,11 @@ Status
Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
}
if (pos > 0) {
+ auto partial_update_info = rowset_writer->get_partial_update_info();
+ DCHECK(partial_update_info);
RETURN_IF_ERROR(generate_new_block_for_partial_update(
- rowset_schema, read_plan_ori, read_plan_update,
rsid_to_rowset, &block));
+ rowset_schema, partial_update_info->missing_cids,
partial_update_info->update_cids,
+ read_plan_ori, read_plan_update, rsid_to_rowset, &block));
sort_block(block, ordered_block);
int64_t size;
RETURN_IF_ERROR(rowset_writer->flush_single_memtable(&ordered_block,
&size));
@@ -3159,7 +3164,8 @@ std::vector<RowsetSharedPtr> Tablet::get_rowset_by_ids(
}
Status Tablet::generate_new_block_for_partial_update(
- TabletSchemaSPtr rowset_schema, const PartialUpdateReadPlan&
read_plan_ori,
+ TabletSchemaSPtr rowset_schema, const std::vector<uint32>&
missing_cids,
+ const std::vector<uint32>& update_cids, const PartialUpdateReadPlan&
read_plan_ori,
const PartialUpdateReadPlan& read_plan_update,
const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
vectorized::Block* output_block) {
@@ -3170,10 +3176,8 @@ Status Tablet::generate_new_block_for_partial_update(
// 4. mark current keys deleted
CHECK(output_block);
auto full_mutable_columns = output_block->mutate_columns();
- auto old_block = rowset_schema->create_missing_columns_block();
- auto missing_cids = rowset_schema->get_missing_cids();
- auto update_block = rowset_schema->create_update_columns_block();
- auto update_cids = rowset_schema->get_update_cids();
+ auto old_block = rowset_schema->create_block_by_cids(missing_cids);
+ auto update_block = rowset_schema->create_block_by_cids(update_cids);
std::map<uint32_t, uint32_t> read_index_old;
RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, missing_cids,
read_plan_ori, rsid_to_rowset,
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index d57b043857d..9e665927d31 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -44,6 +44,7 @@
#include "olap/binlog_config.h"
#include "olap/data_dir.h"
#include "olap/olap_common.h"
+#include "olap/partial_update_info.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_meta.h"
#include "olap/rowset/rowset_reader.h"
@@ -351,7 +352,8 @@ public:
std::unique_ptr<RowsetWriter>* rowset_writer);
Status create_transient_rowset_writer(RowsetSharedPtr rowset_ptr,
- std::unique_ptr<RowsetWriter>*
rowset_writer);
+ std::unique_ptr<RowsetWriter>*
rowset_writer,
+ std::shared_ptr<PartialUpdateInfo>
partial_update_info);
Status create_transient_rowset_writer(RowsetWriterContext& context, const
RowsetId& rowset_id,
std::unique_ptr<RowsetWriter>*
rowset_writer);
@@ -477,7 +479,8 @@ public:
void prepare_to_read(const RowLocation& row_location, size_t pos,
PartialUpdateReadPlan* read_plan);
Status generate_new_block_for_partial_update(
- TabletSchemaSPtr rowset_schema, const PartialUpdateReadPlan&
read_plan_ori,
+ TabletSchemaSPtr rowset_schema, const std::vector<uint32>&
missing_cids,
+ const std::vector<uint32>& update_cids, const
PartialUpdateReadPlan& read_plan_ori,
const PartialUpdateReadPlan& read_plan_update,
const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
vectorized::Block* output_block);
diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp
index d12405efaaf..9dc1bc20b9f 100644
--- a/be/src/olap/tablet_schema.cpp
+++ b/be/src/olap/tablet_schema.cpp
@@ -672,9 +672,6 @@ void TabletSchema::init_from_pb(const TabletSchemaPB&
schema) {
_indexes.clear();
_field_name_to_index.clear();
_field_id_to_index.clear();
- _partial_update_input_columns.clear();
- _missing_cids.clear();
- _update_cids.clear();
for (auto& column_pb : schema.column()) {
TabletColumn column;
column.init_from_pb(column_pb);
@@ -718,23 +715,6 @@ void TabletSchema::init_from_pb(const TabletSchemaPB&
schema) {
_sort_col_num = schema.sort_col_num();
_compression_type = schema.compression_type();
_schema_version = schema.schema_version();
- _is_partial_update = schema.is_partial_update();
- for (auto& col_name : schema.partial_update_input_columns()) {
- _partial_update_input_columns.emplace(col_name);
- }
- if (_is_partial_update) {
- for (auto i = 0; i < _cols.size(); ++i) {
- if (_partial_update_input_columns.count(_cols[i].name()) == 0) {
- _missing_cids.emplace_back(i);
- auto tablet_column = column(i);
- if (!tablet_column.has_default_value() &&
!tablet_column.is_nullable()) {
- _can_insert_new_rows_in_partial_update = false;
- }
- } else {
- _update_cids.emplace_back(i);
- }
- }
- }
}
void TabletSchema::copy_from(const TabletSchema& tablet_schema) {
@@ -873,10 +853,6 @@ void TabletSchema::to_schema_pb(TabletSchemaPB*
tablet_schema_pb) const {
tablet_schema_pb->set_compression_type(_compression_type);
tablet_schema_pb->set_is_dynamic_schema(_is_dynamic_schema);
tablet_schema_pb->set_version_col_idx(_version_col_idx);
- tablet_schema_pb->set_is_partial_update(_is_partial_update);
- for (auto& col : _partial_update_input_columns) {
- *tablet_schema_pb->add_partial_update_input_columns() = col;
- }
}
size_t TabletSchema::row_size() const {
@@ -1051,19 +1027,9 @@ vectorized::Block TabletSchema::create_block(bool
ignore_dropped_col) const {
return block;
}
-vectorized::Block TabletSchema::create_missing_columns_block() {
- vectorized::Block block;
- for (const auto& cid : _missing_cids) {
- auto col = _cols[cid];
- auto data_type =
vectorized::DataTypeFactory::instance().create_data_type(col);
- block.insert({data_type->create_column(), data_type, col.name()});
- }
- return block;
-}
-
-vectorized::Block TabletSchema::create_update_columns_block() {
+vectorized::Block TabletSchema::create_block_by_cids(const
std::vector<uint32_t>& cids) {
vectorized::Block block;
- for (const auto& cid : _update_cids) {
+ for (const auto& cid : cids) {
auto col = _cols[cid];
auto data_type =
vectorized::DataTypeFactory::instance().create_data_type(col);
block.insert({data_type->create_column(), data_type, col.name()});
@@ -1071,36 +1037,6 @@ vectorized::Block
TabletSchema::create_update_columns_block() {
return block;
}
-void TabletSchema::set_partial_update_info(bool is_partial_update,
- const std::set<string>&
partial_update_input_columns) {
- _is_partial_update = is_partial_update;
- _partial_update_input_columns = partial_update_input_columns;
- _missing_cids.clear();
- _update_cids.clear();
- for (auto i = 0; i < _cols.size(); ++i) {
- if (_partial_update_input_columns.count(_cols[i].name()) == 0) {
- _missing_cids.emplace_back(i);
- auto tablet_column = column(i);
- if (!tablet_column.has_default_value() &&
!tablet_column.is_nullable()) {
- _can_insert_new_rows_in_partial_update = false;
- }
- } else {
- _update_cids.emplace_back(i);
- }
- }
-}
-
-bool TabletSchema::is_column_missing(size_t cid) const {
- DCHECK(cid < _cols.size());
- if (!_is_partial_update) {
- return false;
- }
- if (_partial_update_input_columns.count(_cols[cid].name()) == 0) {
- return true;
- }
- return false;
-}
-
bool operator==(const TabletColumn& a, const TabletColumn& b) {
if (a._unique_id != b._unique_id) return false;
if (a._col_name != b._col_name) return false;
diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h
index d04b009cee6..af5d860d19a 100644
--- a/be/src/olap/tablet_schema.h
+++ b/be/src/olap/tablet_schema.h
@@ -306,20 +306,7 @@ public:
str += "]";
return str;
}
- vectorized::Block create_missing_columns_block();
- vectorized::Block create_update_columns_block();
- void set_partial_update_info(bool is_partial_update,
- const std::set<string>&
partial_update_input_columns);
- bool is_partial_update() const { return _is_partial_update; }
- size_t partial_input_column_size() const { return
_partial_update_input_columns.size(); }
- bool is_column_missing(size_t cid) const;
- bool can_insert_new_rows_in_partial_update() const {
- return _can_insert_new_rows_in_partial_update;
- }
- void set_is_strict_mode(bool is_strict_mode) { _is_strict_mode =
is_strict_mode; }
- bool is_strict_mode() const { return _is_strict_mode; }
- std::vector<uint32_t> get_missing_cids() { return _missing_cids; }
- std::vector<uint32_t> get_update_cids() { return _update_cids; }
+ vectorized::Block create_block_by_cids(const std::vector<uint32_t>& cids);
private:
friend bool operator==(const TabletSchema& a, const TabletSchema& b);
@@ -355,15 +342,6 @@ private:
int64_t _mem_size = 0;
bool _store_row_column = false;
bool _skip_write_index_on_load = false;
-
- bool _is_partial_update;
- std::set<std::string> _partial_update_input_columns;
- std::vector<uint32_t> _missing_cids;
- std::vector<uint32_t> _update_cids;
- // if key not exist in old rowset, use default value or null value for the
unmentioned cols
- // to generate a new row, only available in non-strict mode
- bool _can_insert_new_rows_in_partial_update = true;
- bool _is_strict_mode = false;
};
bool operator==(const TabletSchema& a, const TabletSchema& b);
diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp
index 5d71d4e59cd..ce62e0d65ed 100644
--- a/be/src/olap/txn_manager.cpp
+++ b/be/src/olap/txn_manager.cpp
@@ -179,12 +179,11 @@ Status TxnManager::delete_txn(TPartitionId partition_id,
const TabletSharedPtr&
tablet->tablet_id(), tablet->schema_hash(),
tablet->tablet_uid());
}
-void TxnManager::set_txn_related_delete_bitmap(TPartitionId partition_id,
- TTransactionId transaction_id,
TTabletId tablet_id,
- SchemaHash schema_hash,
TabletUid tablet_uid,
- bool unique_key_merge_on_write,
- DeleteBitmapPtr delete_bitmap,
- const RowsetIdUnorderedSet&
rowset_ids) {
+void TxnManager::set_txn_related_delete_bitmap(
+ TPartitionId partition_id, TTransactionId transaction_id, TTabletId
tablet_id,
+ SchemaHash schema_hash, TabletUid tablet_uid, bool
unique_key_merge_on_write,
+ DeleteBitmapPtr delete_bitmap, const RowsetIdUnorderedSet& rowset_ids,
+ std::shared_ptr<PartialUpdateInfo> partial_update_info) {
pair<int64_t, int64_t> key(partition_id, transaction_id);
TabletInfo tablet_info(tablet_id, schema_hash, tablet_uid);
@@ -210,6 +209,7 @@ void TxnManager::set_txn_related_delete_bitmap(TPartitionId
partition_id,
load_info.unique_key_merge_on_write = unique_key_merge_on_write;
load_info.delete_bitmap = delete_bitmap;
load_info.rowset_ids = rowset_ids;
+ load_info.partial_update_info = partial_update_info;
}
}
@@ -363,7 +363,8 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId
partition_id,
// update delete_bitmap
if (tablet_txn_info.unique_key_merge_on_write) {
std::unique_ptr<RowsetWriter> rowset_writer;
- tablet->create_transient_rowset_writer(rowset, &rowset_writer);
+ tablet->create_transient_rowset_writer(rowset, &rowset_writer,
+
tablet_txn_info.partial_update_info);
int64_t t2 = MonotonicMicros();
RETURN_IF_ERROR(tablet->update_delete_bitmap(rowset,
tablet_txn_info.rowset_ids,
@@ -371,7 +372,8 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId
partition_id,
rowset_writer.get()));
int64_t t3 = MonotonicMicros();
stats->calc_delete_bitmap_time_us = t3 - t2;
- if (rowset->tablet_schema()->is_partial_update()) {
+ if (tablet_txn_info.partial_update_info &&
+ tablet_txn_info.partial_update_info->is_partial_update) {
// build rowset writer and merge transient rowset
RETURN_IF_ERROR(rowset_writer->flush());
RowsetSharedPtr transient_rowset;
diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h
index c7f781a58a7..2cbb9509383 100644
--- a/be/src/olap/txn_manager.h
+++ b/be/src/olap/txn_manager.h
@@ -36,6 +36,7 @@
#include "common/status.h"
#include "olap/olap_common.h"
+#include "olap/partial_update_info.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_meta.h"
#include "olap/rowset/segment_v2/segment.h"
@@ -59,6 +60,7 @@ struct TabletTxnInfo {
RowsetIdUnorderedSet rowset_ids;
int64_t creation_time;
bool ingest {false};
+ std::shared_ptr<PartialUpdateInfo> partial_update_info;
TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset)
: load_id(load_id), rowset(rowset), creation_time(UnixSeconds()) {}
@@ -186,7 +188,8 @@ public:
TTabletId tablet_id, SchemaHash
schema_hash,
TabletUid tablet_uid, bool
unique_key_merge_on_write,
DeleteBitmapPtr delete_bitmap,
- const RowsetIdUnorderedSet& rowset_ids);
+ const RowsetIdUnorderedSet& rowset_ids,
+ std::shared_ptr<PartialUpdateInfo>
partial_update_info);
void get_all_commit_tablet_txn_info_by_tablet(
const TabletSharedPtr& tablet, CommitTabletTxnInfoVec*
commit_tablet_txn_info_vec);
diff --git a/be/src/service/backend_service.cpp
b/be/src/service/backend_service.cpp
index 6e09323dbdf..9fc43559980 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -714,7 +714,7 @@ void BackendService::ingest_binlog(TIngestBinlogResult&
result,
if (local_tablet->enable_unique_key_merge_on_write()) {
StorageEngine::instance()->txn_manager()->set_txn_related_delete_bitmap(
partition_id, txn_id, local_tablet_id,
local_tablet->schema_hash(),
- local_tablet->tablet_uid(), true, delete_bitmap,
pre_rowset_ids);
+ local_tablet->tablet_uid(), true, delete_bitmap,
pre_rowset_ids, nullptr);
}
tstatus.__set_status_code(TStatusCode::OK);
diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto
index dc20228ffdb..566af72d515 100644
--- a/gensrc/proto/olap_file.proto
+++ b/gensrc/proto/olap_file.proto
@@ -232,8 +232,8 @@ message TabletSchemaPB {
optional int32 version_col_idx = 17 [default = -1];
optional bool store_row_column = 18 [default=false]; // store tuplerow
oriented column
optional bool is_dynamic_schema = 19 [default=false];
- optional bool is_partial_update = 20 [default=false];
- repeated string partial_update_input_columns = 21;
+ optional bool is_partial_update = 20 [default=false]; // deprecated
+ repeated string partial_update_input_columns = 21; // deprecated
optional bool enable_single_replica_compaction = 22 [default=false];
optional bool skip_write_index_on_load = 23 [default=false];
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]