github-actions[bot] commented on code in PR #17542:
URL: https://github.com/apache/doris/pull/17542#discussion_r1127907692
##########
be/src/olap/rowset/segment_v2/segment_writer.cpp:
##########
@@ -278,8 +282,208 @@ void SegmentWriter::_maybe_invalid_row_cache(const
std::string& key) {
}
}
+// for partial update, we should do following steps to fill content of block:
+// 1. set block data to data convertor, and get all key_column's converted
slice
+// 2. get pk of input block, and read missing columns
+// 2.1 first find key location{rowset_id, segment_id, row_id}
+// 2.2 build read plan to read by batch
+// 2.3 fill block
+// 3. set columns to data convertor and then write all columns
+Status SegmentWriter::append_block_with_partial_content(const
vectorized::Block* block,
+ size_t row_pos, size_t
num_rows) {
+ CHECK(block->columns() > _tablet_schema->num_key_columns() &&
+ block->columns() < _tablet_schema->num_columns());
+ CHECK(_tablet_schema->keys_type() == UNIQUE_KEYS &&
_opts.enable_unique_key_merge_on_write);
+
+ // find missing column cids
+ std::vector<uint32_t> missing_cids;
+ std::vector<uint32_t> including_cids;
+ for (uint32_t i = 0; i < _tablet_schema->num_columns(); ++i) {
+ if (_tablet_schema->is_column_missing(i)) {
+ missing_cids.push_back(i);
+ } else {
+ including_cids.push_back(i);
+ }
+ }
+ // create full block and fill with input columns
+ auto full_block = _tablet_schema->create_block();
+ size_t input_id = 0;
+ for (auto i : including_cids) {
+ full_block.replace_by_position(i,
block->get_by_position(input_id++).column);
+ }
+
_olap_data_convertor->set_source_content_with_specifid_columns(&full_block,
row_pos, num_rows,
+
including_cids);
+
+ // write including columns
+ std::vector<vectorized::IOlapColumnDataAccessor*> key_columns;
+ vectorized::IOlapColumnDataAccessor* seq_column = nullptr;
+ for (auto cid : including_cids) {
+ // olap data convertor alway start from id = 0
+ auto converted_result = _olap_data_convertor->convert_column_data(cid);
+ if (converted_result.first != Status::OK()) {
+ return converted_result.first;
+ }
+ if (cid < _num_key_columns) {
+ key_columns.push_back(converted_result.second);
+ } else if (cid == _tablet_schema->sequence_col_idx()) {
+ seq_column = converted_result.second;
+ }
+
RETURN_IF_ERROR(_column_writers[cid]->append(converted_result.second->get_nullmap(),
+
converted_result.second->get_data(),
+ num_rows));
+ }
+
+ bool has_default = false;
+ std::vector<bool> use_default_flag;
+ use_default_flag.reserve(num_rows);
+ for (size_t pos = 0; pos < num_rows; pos++) {
+ std::string key = _full_encode_keys(key_columns, pos);
+ if (_tablet_schema->has_sequence_col()) {
+ _encode_seq_column(seq_column, pos, &key);
+ }
+ RETURN_IF_ERROR(_primary_key_index_builder->add_item(key));
+ _maybe_invalid_row_cache(key);
+
+ RowLocation loc;
+ // save rowset shared ptr so this rowset wouldn't delete
+ RowsetSharedPtr rowset;
+ auto st = _tablet->lookup_row_key(key, &_mow_context->rowset_ids, &loc,
+ _mow_context->max_version, &rowset);
+ if (st.is<NOT_FOUND>()) {
+ if (!_tablet_schema->allow_key_not_exist_in_partial_update()) {
+ return Status::InternalError("partial update key not exist
before");
+ }
+ has_default = true;
+ use_default_flag.emplace_back(true);
+ continue;
+ }
+ if (!st.ok()) {
+ LOG(INFO) << "failed to lookup row key";
+ return st;
+ }
+ use_default_flag.emplace_back(false);
+ _rsid_to_rowset.emplace(rowset->rowset_id(), rowset);
+ prepare_to_read(loc, pos);
+ _mow_context->delete_bitmap->add({loc.rowset_id, loc.segment_id, 0},
loc.row_id);
+ }
+ CHECK(use_default_flag.size() == num_rows);
+
+ // read and fill block
+ auto mutable_full_columns = full_block.mutate_columns();
+ RETURN_IF_ERROR(fill_missing_columns(mutable_full_columns,
use_default_flag, has_default));
+
+ // convert missing columns and send to column writer
+ auto cids_missing = _tablet_schema->get_missing_column_ids();
+
_olap_data_convertor->set_source_content_with_specifid_columns(&full_block,
row_pos, num_rows,
+
cids_missing);
+ for (auto cid : cids_missing) {
+ auto converted_result = _olap_data_convertor->convert_column_data(cid);
+ if (converted_result.first != Status::OK()) {
+ return converted_result.first;
+ }
+
RETURN_IF_ERROR(_column_writers[cid]->append(converted_result.second->get_nullmap(),
+
converted_result.second->get_data(),
+ num_rows));
+ }
+
+ _num_rows_written += num_rows;
+ _olap_data_convertor->clear_source_content();
+ return Status::OK();
+}
+
+Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns&
mutable_full_columns,
+ const std::vector<bool>&
use_default_flag,
+ bool has_default) {
+ // create old value columns
+ auto old_value_block = _tablet_schema->create_missing_columns_block();
+ std::vector<uint32_t> cids_missing =
_tablet_schema->get_missing_column_ids();
+ CHECK(cids_missing.size() == old_value_block.columns());
+ auto mutable_old_columns = old_value_block.mutate_columns();
+ // record real pos, key is input line num, value is old_block line num
+ std::map<uint32_t, uint32_t> read_index;
+ size_t read_idx = 0;
+ for (auto rs_it : _rssid_to_rid) {
+ for (auto seg_it : rs_it.second) {
+ auto rowset = _rsid_to_rowset[rs_it.first];
+ CHECK(rowset);
+ std::vector<uint32_t> rids;
+ for (auto id_and_pos : seg_it.second) {
+ rids.emplace_back(id_and_pos.rid);
+ read_index[id_and_pos.pos] = read_idx++;
+ }
+ for (size_t cid = 0; cid < mutable_old_columns.size(); ++cid) {
+ auto st = _tablet->fetch_value_by_rowids(rowset, seg_it.first,
rids,
+
old_value_block.get_names()[cid],
+
mutable_old_columns[cid]);
+ // set read value to output block
+ if (!st.ok()) {
+ return st;
+ }
+ }
+ }
+ }
+ // build default value columns
+ auto default_value_block = old_value_block.clone_empty();
+ auto mutable_default_value_columns = default_value_block.mutate_columns();
+ if (has_default) {
+ for (auto i = 0; i < cids_missing.size(); ++i) {
+ auto default_value =
_tablet_schema->column(cids_missing[i]).default_value();
+ vectorized::ReadBuffer rb(const_cast<char*>(default_value.c_str()),
+ default_value.size());
+ old_value_block.get_by_position(i).type->from_string(
+ rb, mutable_default_value_columns[i].get());
+ }
+ }
+
+ // fill all missing value from mutable_old_columns, need consider default
value
+ for (auto idx = 0; idx < use_default_flag.size(); idx++) {
+ if (use_default_flag[idx]) {
+ // use default value
+ for (auto i = 0; i < cids_missing.size(); ++i) {
+
CHECK(_tablet_schema->column(cids_missing[i]).has_default_value());
+ mutable_full_columns[cids_missing[i]]->insert_from(
+ *mutable_default_value_columns[i].get(), 0);
+ }
+ continue;
+ }
+ auto pos_in_old_block = read_index[idx];
+ for (auto i = 0; i < cids_missing.size(); ++i) {
+ mutable_full_columns[cids_missing[i]]->insert_from(
+
*old_value_block.get_columns_with_type_and_name()[i].column.get(),
+ pos_in_old_block);
+ }
+ }
+ return Status::OK();
+}
+
+// group row_location to accelerate reading
+void SegmentWriter::prepare_to_read(const RowLocation& row_location, size_t
pos) {
+ auto rs_it = _rssid_to_rid.find(row_location.rowset_id);
+ if (rs_it == _rssid_to_rid.end()) {
+ std::map<uint32_t, std::vector<RidAndPos> > segid_to_rid;
+ std::vector<RidAndPos> rid_pos;
+ rid_pos.emplace_back(RidAndPos {row_location.row_id, pos});
+ segid_to_rid.emplace(row_location.segment_id, rid_pos);
+ _rssid_to_rid.emplace(row_location.rowset_id, segid_to_rid);
+ return;
+ }
+ auto seg_it = rs_it->second.find(row_location.segment_id);
+ if (seg_it == rs_it->second.end()) {
+ std::vector<RidAndPos> rid_pos;
+ rid_pos.emplace_back(RidAndPos {row_location.row_id, pos});
+ rs_it->second.emplace(row_location.segment_id, rid_pos);
+ return;
+ }
+ seg_it->second.emplace_back(RidAndPos {row_location.row_id, pos});
+ return;
+}
Review Comment:
warning: redundant return statement at the end of a function with a void
return type [readability-redundant-control-flow]
```suggestion
}
```
##########
be/src/olap/tablet_schema.h:
##########
@@ -265,6 +265,14 @@ class TabletSchema {
str += "]";
return str;
}
+ vectorized::Block create_missing_columns_block();
+ void set_partial_update_info(bool is_partial_update,
+ const std::set<string>&
partial_update_input_columns);
+ bool is_partial_update() const { return _is_partial_update; }
+ size_t partial_input_column_size() const { return
_partial_update_input_columns.size(); }
+ bool is_column_missing(size_t cid) const;
+ bool allow_key_not_exist_in_partial_update() { return
_allow_key_not_exist_in_partial_update; }
Review Comment:
warning: method 'allow_key_not_exist_in_partial_update' can be made const
[readability-make-member-function-const]
```suggestion
bool allow_key_not_exist_in_partial_update() const { return
_allow_key_not_exist_in_partial_update; }
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]