w41ter commented on code in PR #54094:
URL: https://github.com/apache/doris/pull/54094#discussion_r2261810587


##########
cloud/src/meta-service/meta_service_job.cpp:
##########
@@ -984,8 +1034,14 @@ void process_compaction_job(MetaServiceCode& code, 
std::string& msg, std::string
             recycle_rowset.set_creation_time(now);
             recycle_rowset.mutable_rowset_meta()->CopyFrom(rs);
             recycle_rowset.set_type(RecycleRowsetPB::COMPACT);
-            auto recycle_val = recycle_rowset.SerializeAsString();
-            txn->put(recycle_key, recycle_val);
+
+            if (is_versioned_read) {

Review Comment:
   
   ```suggestion
               if (is_versioned_write) {
   ```



##########
cloud/src/meta-service/meta_service_job.cpp:
##########
@@ -1577,15 +1654,16 @@ void 
MetaServiceImpl::finish_tablet_job(::google::protobuf::RpcController* contr
                 
delete_bitmap_lock_white_list_->get_delete_bitmap_lock_version(instance_id);
         LOG(INFO) << "finish_tablet_job instance_id=" << instance_id
                   << " use_version=" << use_version;
+        bool is_version_write = is_version_write_enabled(instance_id);
         if (!request->job().compaction().empty()) {
             // Process compaction commit
             process_compaction_job(code, msg, ss, txn, request, response, 
recorded_job, instance_id,
-                                   job_key, need_commit, use_version, 
is_versioned_read);

Review Comment:
   `is_versioned_read` and `is_versioned_write` are different.
   
   The progress to switch:
   1. Write and read a single version
   2. Write single & multi versions but read the single version
   3. Write both versions but read multiple versions
   4. Write & read multiple versions
   
   



##########
cloud/src/meta-service/meta_service_job.cpp:
##########
@@ -806,56 +869,40 @@ void process_compaction_job(MetaServiceCode& code, 
std::string& msg, std::string
     //  with `config::split_tablet_stats = true` can meet the condition.
     internal_get_tablet_stats(code, msg, txn.get(), instance_id, 
request->job().idx(), *stats,
                               detached_stats, 
config::snapshot_get_tablet_stats);
-    if (compaction.type() == TabletCompactionJobPB::EMPTY_CUMULATIVE) {
-        
stats->set_cumulative_compaction_cnt(stats->cumulative_compaction_cnt() + 1);
-        stats->set_cumulative_point(compaction.output_cumulative_point());
-        stats->set_last_cumu_compaction_time_ms(now * 1000);
-    } else if (compaction.type() == TabletCompactionJobPB::CUMULATIVE) {
-        // clang-format off
-        
stats->set_cumulative_compaction_cnt(stats->cumulative_compaction_cnt() + 1);
-        if (compaction.output_cumulative_point() > stats->cumulative_point()) {
-            // After supporting parallel cumu compaction, compaction with 
older cumu point may be committed after
-            // new cumu point has been set, MUST NOT set cumu point back to 
old value
-            stats->set_cumulative_point(compaction.output_cumulative_point());
+
+    if (is_versioned_read) {
+        // read old TabletCompactStatsKey -> TabletStatsPB
+        std::string tablet_compact_stats_version_key =
+                versioned::tablet_compact_stats_key({instance_id, tablet_id});
+        std::string tablet_compact_stats_version_value;
+        Versionstamp* versionstamp = nullptr;
+        TxnErrorCode err = versioned_get(txn.get(), 
tablet_compact_stats_version_key, versionstamp,
+                                         &tablet_compact_stats_version_value);
+        if (err != TxnErrorCode::TXN_OK) {
+            code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? 
MetaServiceCode::TXN_ID_NOT_FOUND
+                                                          : 
cast_as<ErrCategory::READ>(err);
+            msg = fmt::format(
+                    "failed to get tablet compact stats version, tablet_id={}, 
key={} err={}",
+                    tablet_id, hex(tablet_compact_stats_version_key), err);
+            return;
         }
-        stats->set_num_rows(stats->num_rows() + (compaction.num_output_rows() 
- compaction.num_input_rows()));
-        stats->set_data_size(stats->data_size() + 
(compaction.size_output_rowsets() - compaction.size_input_rowsets()));
-        stats->set_num_rowsets(stats->num_rowsets() + 
(compaction.num_output_rowsets() - compaction.num_input_rowsets()));
-        stats->set_num_segments(stats->num_segments() + 
(compaction.num_output_segments() - compaction.num_input_segments()));
-        stats->set_index_size(stats->index_size() + 
(compaction.index_size_output_rowsets() - 
compaction.index_size_input_rowsets()));
-        stats->set_segment_size(stats->segment_size() + 
(compaction.segment_size_output_rowsets() - 
compaction.segment_size_input_rowsets()));
-        stats->set_last_cumu_compaction_time_ms(now * 1000);
-        // clang-format on
-    } else if (compaction.type() == TabletCompactionJobPB::BASE) {
-        // clang-format off
-        stats->set_base_compaction_cnt(stats->base_compaction_cnt() + 1);
-        stats->set_num_rows(stats->num_rows() + (compaction.num_output_rows() 
- compaction.num_input_rows()));
-        stats->set_data_size(stats->data_size() + 
(compaction.size_output_rowsets() - compaction.size_input_rowsets()));
-        stats->set_num_rowsets(stats->num_rowsets() + 
(compaction.num_output_rowsets() - compaction.num_input_rowsets()));
-        stats->set_num_segments(stats->num_segments() + 
(compaction.num_output_segments() - compaction.num_input_segments()));
-        stats->set_index_size(stats->index_size() + 
(compaction.index_size_output_rowsets() - 
compaction.index_size_input_rowsets()));
-        stats->set_segment_size(stats->segment_size() + 
(compaction.segment_size_output_rowsets() - 
compaction.segment_size_input_rowsets()));
-        stats->set_last_base_compaction_time_ms(now * 1000);
-        // clang-format on
-    } else if (compaction.type() == TabletCompactionJobPB::FULL) {
-        // clang-format off
-        stats->set_base_compaction_cnt(stats->base_compaction_cnt() + 1);
-        if (compaction.output_cumulative_point() > stats->cumulative_point()) {
-            // After supporting parallel cumu compaction, compaction with 
older cumu point may be committed after
-            // new cumu point has been set, MUST NOT set cumu point back to 
old value
-            stats->set_cumulative_point(compaction.output_cumulative_point());
+        TabletStatsPB tablet_compact_stats;
+        
tablet_compact_stats.ParseFromString(tablet_compact_stats_version_value);
+
+        // update TabletStatsPB

Review Comment:
   The `MetaReader` provides helper functions to accomplish this.



##########
cloud/src/meta-service/meta_service_job.cpp:
##########
@@ -1088,6 +1149,22 @@ void process_compaction_job(MetaServiceCode& code, 
std::string& msg, std::string
                                         ? 
recorded_job.schema_change().alter_version()
                                         : -1);
     need_commit = true;
+
+    if (!compaction_log.recycle_rowsets().empty() && is_versioned_read) {
+        std::string operation_log_key = versioned::log_key({instance_id});
+        std::string operation_log_value;
+        OperationLogPB operation_log;
+        operation_log.mutable_compaction()->Swap(&compaction_log);
+        if (!operation_log.SerializeToString(&operation_log_value)) {
+            code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+            msg = fmt::format("failed to serialize OperationLogPB: {}", 
hex(operation_log_key));
+            LOG_WARNING(msg)
+                    .tag("instance_id", instance_id)
+                    .tag("table_id", request->job().idx().table_id());
+            return;
+        }
+        versioned_put(txn.get(), operation_log_key, operation_log_value);

Review Comment:
   Would you consider adding logs about all the versioned keys?



##########
cloud/src/recycler/recycler.cpp:
##########
@@ -2861,6 +2894,11 @@ int InstanceRecycler::recycle_rowsets() {
                 LOG(WARNING) << "failed to delete rowset data, instance_id=" 
<< instance_id_;
                 return;
             }
+            if (txn_remove_versioned_keys(txn_kv_.get(), 
rowset_compact_keys_to_delete)) {

Review Comment:
   The `meta_rowset_compact_key` should be removed in `recycle_compaction_log`



##########
cloud/src/meta-service/meta_service_job.cpp:
##########
@@ -1071,6 +1127,11 @@ void process_compaction_job(MetaServiceCode& code, 
std::string& msg, std::string
     int64_t version = compaction.output_versions(0);
     auto rowset_key = meta_rowset_key({instance_id, tablet_id, version});
     txn->put(rowset_key, tmp_rowset_val);
+    if (is_versioned_read) {

Review Comment:
   ```suggestion
       if (is_versioned_write) {
   ```



##########
cloud/src/meta-service/meta_service_job.cpp:
##########
@@ -1071,6 +1127,11 @@ void process_compaction_job(MetaServiceCode& code, 
std::string& msg, std::string
     int64_t version = compaction.output_versions(0);
     auto rowset_key = meta_rowset_key({instance_id, tablet_id, version});
     txn->put(rowset_key, tmp_rowset_val);
+    if (is_versioned_read) {
+        std::string meta_rowset_compact_key =
+                versioned::meta_rowset_compact_key({instance_id, tablet_id, 
version});
+        versioned_put(txn.get(), meta_rowset_compact_key, tmp_rowset_val);

Review Comment:
   Some fields of the `RowsetMetaCloudPB` will be split into separate keys, so 
here we must use `versioned::document_put`



##########
cloud/src/meta-service/meta_service_job.cpp:
##########
@@ -984,8 +1034,14 @@ void process_compaction_job(MetaServiceCode& code, 
std::string& msg, std::string
             recycle_rowset.set_creation_time(now);
             recycle_rowset.mutable_rowset_meta()->CopyFrom(rs);
             recycle_rowset.set_type(RecycleRowsetPB::COMPACT);
-            auto recycle_val = recycle_rowset.SerializeAsString();
-            txn->put(recycle_key, recycle_val);
+
+            if (is_versioned_read) {
+                compaction_log.add_recycle_rowsets()->CopyFrom(recycle_rowset);

Review Comment:
   `Swap` might be better



##########
cloud/src/recycler/recycler_operation_log.cpp:
##########
@@ -238,6 +240,22 @@ int OperationLogRecycler::recycle_update_tablet_log(const 
UpdateTabletLogPB& upd
     return 0;
 }
 
+int OperationLogRecycler::recycle_compaction_log(const CompactionLogPB& 
compaction_log) {
+    for (const RecycleRowsetPB& recycle_rowset_pb : 
compaction_log.recycle_rowsets()) {
+        std::string recycle_rowset_value;
+        if (!recycle_rowset_pb.SerializeToString(&recycle_rowset_value)) {
+            LOG_WARNING("failed to serialize RecycleRowsetPB")
+                    .tag("recycle rowset pb", 
recycle_rowset_pb.ShortDebugString());
+            return -1;
+        }
+        std::string recycle_key =
+                recycle_rowset_key({instance_id_, compaction_log.tablet_id(),
+                                    
recycle_rowset_pb.rowset_meta().rowset_id_v2()});
+        kvs_.emplace_back(recycle_key, recycle_rowset_value);

Review Comment:
   The input rowset keys, `meta_rowset_load_key` and `meta_rowset_compact_key`, 
should be removed here.



##########
cloud/src/meta-service/meta_service_job.cpp:
##########
@@ -806,56 +869,40 @@ void process_compaction_job(MetaServiceCode& code, 
std::string& msg, std::string
     //  with `config::split_tablet_stats = true` can meet the condition.
     internal_get_tablet_stats(code, msg, txn.get(), instance_id, 
request->job().idx(), *stats,
                               detached_stats, 
config::snapshot_get_tablet_stats);
-    if (compaction.type() == TabletCompactionJobPB::EMPTY_CUMULATIVE) {
-        
stats->set_cumulative_compaction_cnt(stats->cumulative_compaction_cnt() + 1);
-        stats->set_cumulative_point(compaction.output_cumulative_point());
-        stats->set_last_cumu_compaction_time_ms(now * 1000);
-    } else if (compaction.type() == TabletCompactionJobPB::CUMULATIVE) {
-        // clang-format off
-        
stats->set_cumulative_compaction_cnt(stats->cumulative_compaction_cnt() + 1);
-        if (compaction.output_cumulative_point() > stats->cumulative_point()) {
-            // After supporting parallel cumu compaction, compaction with 
older cumu point may be committed after
-            // new cumu point has been set, MUST NOT set cumu point back to 
old value
-            stats->set_cumulative_point(compaction.output_cumulative_point());
+
+    if (is_versioned_read) {
+        // read old TabletCompactStatsKey -> TabletStatsPB
+        std::string tablet_compact_stats_version_key =
+                versioned::tablet_compact_stats_key({instance_id, tablet_id});
+        std::string tablet_compact_stats_version_value;
+        Versionstamp* versionstamp = nullptr;
+        TxnErrorCode err = versioned_get(txn.get(), 
tablet_compact_stats_version_key, versionstamp,
+                                         &tablet_compact_stats_version_value);
+        if (err != TxnErrorCode::TXN_OK) {
+            code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? 
MetaServiceCode::TXN_ID_NOT_FOUND

Review Comment:
   The first time we switch to double write from single write, the 
`tablet_compact_stats_version_key` does not exist, and the `err` must be 
`TXN_KEY_NOT_FOUND`, so treating it as an error is wrong.
   
   In this case, we need to copy the fields such as cumulative_point ... from 
the single version tablet stats key, and clear the fields data_size/index_size 
... to zero.



##########
cloud/src/meta-service/meta_service_job.cpp:
##########
@@ -1088,6 +1149,22 @@ void process_compaction_job(MetaServiceCode& code, 
std::string& msg, std::string
                                         ? 
recorded_job.schema_change().alter_version()
                                         : -1);
     need_commit = true;
+
+    if (!compaction_log.recycle_rowsets().empty() && is_versioned_read) {

Review Comment:
   ```suggestion
       if (!compaction_log.recycle_rowsets().empty() && is_versioned_write) {
   ```



##########
cloud/src/recycler/recycler.cpp:
##########
@@ -2840,6 +2870,8 @@ int InstanceRecycler::recycle_rowsets() {
             num_compacted += rowset.type() == RecycleRowsetPB::COMPACT;
             rowset_keys.emplace_back(k);
             rowsets.emplace(rowset_meta->rowset_id_v2(), 
std::move(*rowset_meta));
+            
rowset_compact_keys.emplace_back(versioned::meta_rowset_compact_key(

Review Comment:
   The `meta_rowset_compact_key` should be removed in `recycle_compaction_log`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to