w41ter commented on code in PR #54094:
URL: https://github.com/apache/doris/pull/54094#discussion_r2261810587
##########
cloud/src/meta-service/meta_service_job.cpp:
##########
@@ -984,8 +1034,14 @@ void process_compaction_job(MetaServiceCode& code,
std::string& msg, std::string
recycle_rowset.set_creation_time(now);
recycle_rowset.mutable_rowset_meta()->CopyFrom(rs);
recycle_rowset.set_type(RecycleRowsetPB::COMPACT);
- auto recycle_val = recycle_rowset.SerializeAsString();
- txn->put(recycle_key, recycle_val);
+
+ if (is_versioned_read) {
Review Comment:
```suggestion
if (is_versioned_write) {
```
##########
cloud/src/meta-service/meta_service_job.cpp:
##########
@@ -1577,15 +1654,16 @@ void
MetaServiceImpl::finish_tablet_job(::google::protobuf::RpcController* contr
delete_bitmap_lock_white_list_->get_delete_bitmap_lock_version(instance_id);
LOG(INFO) << "finish_tablet_job instance_id=" << instance_id
<< " use_version=" << use_version;
+ bool is_version_write = is_version_write_enabled(instance_id);
if (!request->job().compaction().empty()) {
// Process compaction commit
process_compaction_job(code, msg, ss, txn, request, response,
recorded_job, instance_id,
- job_key, need_commit, use_version,
is_versioned_read);
Review Comment:
`is_versioned_read` and `is_versioned_write` are different.
The progress to switch:
1. Write and read a single version
2. Write single & multi versions but read the single version
3. Write both versions but read multiple versions
4. Write & read multiple versions
##########
cloud/src/meta-service/meta_service_job.cpp:
##########
@@ -806,56 +869,40 @@ void process_compaction_job(MetaServiceCode& code,
std::string& msg, std::string
// with `config::split_tablet_stats = true` can meet the condition.
internal_get_tablet_stats(code, msg, txn.get(), instance_id,
request->job().idx(), *stats,
detached_stats,
config::snapshot_get_tablet_stats);
- if (compaction.type() == TabletCompactionJobPB::EMPTY_CUMULATIVE) {
-
stats->set_cumulative_compaction_cnt(stats->cumulative_compaction_cnt() + 1);
- stats->set_cumulative_point(compaction.output_cumulative_point());
- stats->set_last_cumu_compaction_time_ms(now * 1000);
- } else if (compaction.type() == TabletCompactionJobPB::CUMULATIVE) {
- // clang-format off
-
stats->set_cumulative_compaction_cnt(stats->cumulative_compaction_cnt() + 1);
- if (compaction.output_cumulative_point() > stats->cumulative_point()) {
- // After supporting parallel cumu compaction, compaction with
older cumu point may be committed after
- // new cumu point has been set, MUST NOT set cumu point back to
old value
- stats->set_cumulative_point(compaction.output_cumulative_point());
+
+ if (is_versioned_read) {
+ // read old TabletCompactStatsKey -> TabletStatsPB
+ std::string tablet_compact_stats_version_key =
+ versioned::tablet_compact_stats_key({instance_id, tablet_id});
+ std::string tablet_compact_stats_version_value;
+ Versionstamp* versionstamp = nullptr;
+ TxnErrorCode err = versioned_get(txn.get(),
tablet_compact_stats_version_key, versionstamp,
+ &tablet_compact_stats_version_value);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ?
MetaServiceCode::TXN_ID_NOT_FOUND
+ :
cast_as<ErrCategory::READ>(err);
+ msg = fmt::format(
+ "failed to get tablet compact stats version, tablet_id={},
key={} err={}",
+ tablet_id, hex(tablet_compact_stats_version_key), err);
+ return;
}
- stats->set_num_rows(stats->num_rows() + (compaction.num_output_rows()
- compaction.num_input_rows()));
- stats->set_data_size(stats->data_size() +
(compaction.size_output_rowsets() - compaction.size_input_rowsets()));
- stats->set_num_rowsets(stats->num_rowsets() +
(compaction.num_output_rowsets() - compaction.num_input_rowsets()));
- stats->set_num_segments(stats->num_segments() +
(compaction.num_output_segments() - compaction.num_input_segments()));
- stats->set_index_size(stats->index_size() +
(compaction.index_size_output_rowsets() -
compaction.index_size_input_rowsets()));
- stats->set_segment_size(stats->segment_size() +
(compaction.segment_size_output_rowsets() -
compaction.segment_size_input_rowsets()));
- stats->set_last_cumu_compaction_time_ms(now * 1000);
- // clang-format on
- } else if (compaction.type() == TabletCompactionJobPB::BASE) {
- // clang-format off
- stats->set_base_compaction_cnt(stats->base_compaction_cnt() + 1);
- stats->set_num_rows(stats->num_rows() + (compaction.num_output_rows()
- compaction.num_input_rows()));
- stats->set_data_size(stats->data_size() +
(compaction.size_output_rowsets() - compaction.size_input_rowsets()));
- stats->set_num_rowsets(stats->num_rowsets() +
(compaction.num_output_rowsets() - compaction.num_input_rowsets()));
- stats->set_num_segments(stats->num_segments() +
(compaction.num_output_segments() - compaction.num_input_segments()));
- stats->set_index_size(stats->index_size() +
(compaction.index_size_output_rowsets() -
compaction.index_size_input_rowsets()));
- stats->set_segment_size(stats->segment_size() +
(compaction.segment_size_output_rowsets() -
compaction.segment_size_input_rowsets()));
- stats->set_last_base_compaction_time_ms(now * 1000);
- // clang-format on
- } else if (compaction.type() == TabletCompactionJobPB::FULL) {
- // clang-format off
- stats->set_base_compaction_cnt(stats->base_compaction_cnt() + 1);
- if (compaction.output_cumulative_point() > stats->cumulative_point()) {
- // After supporting parallel cumu compaction, compaction with
older cumu point may be committed after
- // new cumu point has been set, MUST NOT set cumu point back to
old value
- stats->set_cumulative_point(compaction.output_cumulative_point());
+ TabletStatsPB tablet_compact_stats;
+
tablet_compact_stats.ParseFromString(tablet_compact_stats_version_value);
+
+ // update TabletStatsPB
Review Comment:
The `MetaReader` provides helper functions to accomplish this.
##########
cloud/src/meta-service/meta_service_job.cpp:
##########
@@ -1088,6 +1149,22 @@ void process_compaction_job(MetaServiceCode& code,
std::string& msg, std::string
?
recorded_job.schema_change().alter_version()
: -1);
need_commit = true;
+
+ if (!compaction_log.recycle_rowsets().empty() && is_versioned_read) {
+ std::string operation_log_key = versioned::log_key({instance_id});
+ std::string operation_log_value;
+ OperationLogPB operation_log;
+ operation_log.mutable_compaction()->Swap(&compaction_log);
+ if (!operation_log.SerializeToString(&operation_log_value)) {
+ code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+ msg = fmt::format("failed to serialize OperationLogPB: {}",
hex(operation_log_key));
+ LOG_WARNING(msg)
+ .tag("instance_id", instance_id)
+ .tag("table_id", request->job().idx().table_id());
+ return;
+ }
+ versioned_put(txn.get(), operation_log_key, operation_log_value);
Review Comment:
Would you consider adding logs about all the versioned keys?
##########
cloud/src/recycler/recycler.cpp:
##########
@@ -2861,6 +2894,11 @@ int InstanceRecycler::recycle_rowsets() {
LOG(WARNING) << "failed to delete rowset data, instance_id="
<< instance_id_;
return;
}
+ if (txn_remove_versioned_keys(txn_kv_.get(),
rowset_compact_keys_to_delete)) {
Review Comment:
The `meta_rowset_compact_key` should be removed in `recycle_compaction_log`
##########
cloud/src/meta-service/meta_service_job.cpp:
##########
@@ -1071,6 +1127,11 @@ void process_compaction_job(MetaServiceCode& code,
std::string& msg, std::string
int64_t version = compaction.output_versions(0);
auto rowset_key = meta_rowset_key({instance_id, tablet_id, version});
txn->put(rowset_key, tmp_rowset_val);
+ if (is_versioned_read) {
Review Comment:
```suggestion
if (is_versioned_write) {
```
##########
cloud/src/meta-service/meta_service_job.cpp:
##########
@@ -1071,6 +1127,11 @@ void process_compaction_job(MetaServiceCode& code,
std::string& msg, std::string
int64_t version = compaction.output_versions(0);
auto rowset_key = meta_rowset_key({instance_id, tablet_id, version});
txn->put(rowset_key, tmp_rowset_val);
+ if (is_versioned_read) {
+ std::string meta_rowset_compact_key =
+ versioned::meta_rowset_compact_key({instance_id, tablet_id,
version});
+ versioned_put(txn.get(), meta_rowset_compact_key, tmp_rowset_val);
Review Comment:
Some fields of the `RowsetMetaCloudPB` will be split into separate keys, so
here we must use `versioned::document_put`
##########
cloud/src/meta-service/meta_service_job.cpp:
##########
@@ -984,8 +1034,14 @@ void process_compaction_job(MetaServiceCode& code,
std::string& msg, std::string
recycle_rowset.set_creation_time(now);
recycle_rowset.mutable_rowset_meta()->CopyFrom(rs);
recycle_rowset.set_type(RecycleRowsetPB::COMPACT);
- auto recycle_val = recycle_rowset.SerializeAsString();
- txn->put(recycle_key, recycle_val);
+
+ if (is_versioned_read) {
+ compaction_log.add_recycle_rowsets()->CopyFrom(recycle_rowset);
Review Comment:
`Swap` might be better
##########
cloud/src/recycler/recycler_operation_log.cpp:
##########
@@ -238,6 +240,22 @@ int OperationLogRecycler::recycle_update_tablet_log(const
UpdateTabletLogPB& upd
return 0;
}
+int OperationLogRecycler::recycle_compaction_log(const CompactionLogPB&
compaction_log) {
+ for (const RecycleRowsetPB& recycle_rowset_pb :
compaction_log.recycle_rowsets()) {
+ std::string recycle_rowset_value;
+ if (!recycle_rowset_pb.SerializeToString(&recycle_rowset_value)) {
+ LOG_WARNING("failed to serialize RecycleRowsetPB")
+ .tag("recycle rowset pb",
recycle_rowset_pb.ShortDebugString());
+ return -1;
+ }
+ std::string recycle_key =
+ recycle_rowset_key({instance_id_, compaction_log.tablet_id(),
+
recycle_rowset_pb.rowset_meta().rowset_id_v2()});
+ kvs_.emplace_back(recycle_key, recycle_rowset_value);
Review Comment:
The input rowset keys, `meta_rowset_load_key` and `meta_rowset_compact_key`,
should be removed here.
##########
cloud/src/meta-service/meta_service_job.cpp:
##########
@@ -806,56 +869,40 @@ void process_compaction_job(MetaServiceCode& code,
std::string& msg, std::string
// with `config::split_tablet_stats = true` can meet the condition.
internal_get_tablet_stats(code, msg, txn.get(), instance_id,
request->job().idx(), *stats,
detached_stats,
config::snapshot_get_tablet_stats);
- if (compaction.type() == TabletCompactionJobPB::EMPTY_CUMULATIVE) {
-
stats->set_cumulative_compaction_cnt(stats->cumulative_compaction_cnt() + 1);
- stats->set_cumulative_point(compaction.output_cumulative_point());
- stats->set_last_cumu_compaction_time_ms(now * 1000);
- } else if (compaction.type() == TabletCompactionJobPB::CUMULATIVE) {
- // clang-format off
-
stats->set_cumulative_compaction_cnt(stats->cumulative_compaction_cnt() + 1);
- if (compaction.output_cumulative_point() > stats->cumulative_point()) {
- // After supporting parallel cumu compaction, compaction with
older cumu point may be committed after
- // new cumu point has been set, MUST NOT set cumu point back to
old value
- stats->set_cumulative_point(compaction.output_cumulative_point());
+
+ if (is_versioned_read) {
+ // read old TabletCompactStatsKey -> TabletStatsPB
+ std::string tablet_compact_stats_version_key =
+ versioned::tablet_compact_stats_key({instance_id, tablet_id});
+ std::string tablet_compact_stats_version_value;
+ Versionstamp* versionstamp = nullptr;
+ TxnErrorCode err = versioned_get(txn.get(),
tablet_compact_stats_version_key, versionstamp,
+ &tablet_compact_stats_version_value);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ?
MetaServiceCode::TXN_ID_NOT_FOUND
Review Comment:
The first time we switch to double write from single write, the
`tablet_compact_stats_version_key` does not exist, and the `err` must be
`TXN_KEY_NOT_FOUND`, so treating it as an error is wrong.
In this case, we need to copy the fields such as cumulative_point ... from
the single version tablet stats key, and clear the fields data_size/index_size
... to zero.
##########
cloud/src/meta-service/meta_service_job.cpp:
##########
@@ -1088,6 +1149,22 @@ void process_compaction_job(MetaServiceCode& code,
std::string& msg, std::string
?
recorded_job.schema_change().alter_version()
: -1);
need_commit = true;
+
+ if (!compaction_log.recycle_rowsets().empty() && is_versioned_read) {
Review Comment:
```suggestion
if (!compaction_log.recycle_rowsets().empty() && is_versioned_write) {
```
##########
cloud/src/recycler/recycler.cpp:
##########
@@ -2840,6 +2870,8 @@ int InstanceRecycler::recycle_rowsets() {
num_compacted += rowset.type() == RecycleRowsetPB::COMPACT;
rowset_keys.emplace_back(k);
rowsets.emplace(rowset_meta->rowset_id_v2(),
std::move(*rowset_meta));
+
rowset_compact_keys.emplace_back(versioned::meta_rowset_compact_key(
Review Comment:
The `meta_rowset_compact_key` should be removed in `recycle_compaction_log`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]