gavinchou commented on code in PR #41782:
URL: https://github.com/apache/doris/pull/41782#discussion_r1808396743
##########
cloud/src/meta-service/meta_service.cpp:
##########
@@ -2188,4 +2188,238 @@ std::pair<MetaServiceCode, std::string>
MetaServiceImpl::get_instance_info(
return {code, std::move(msg)};
}
+MetaServiceResponseStatus MetaServiceImpl::fix_tablet_stats(std::string
cloud_unique_id_str,
+ std::string
table_id_str) {
+ MetaServiceCode code;
+ std::string msg;
+ MetaServiceResponseStatus st;
+
+ // parse params
+ int64_t table_id;
+ try {
+ table_id = std::stoll(table_id_str);
+ } catch (...) {
+ st.set_code(MetaServiceCode::INVALID_ARGUMENT);
+ st.set_msg("Invalid table_id, table_id: " + table_id_str);
+ return st;
+ }
+
+ std::string instance_id = get_instance_id(resource_mgr_,
cloud_unique_id_str);
+ if (instance_id.empty()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "empty instance_id";
+ LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id_str;
+ st.set_code(code);
+ st.set_msg(msg);
+ return st;
+ }
+
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::CREATE>(err);
+ msg = fmt::format("failed to create txn");
+ st.set_code(code);
+ st.set_msg(msg);
+ return st;
+ }
+
+ // fix tablet stats code
+ std::string key, val;
+ int64_t start = 0;
+ int64_t end = std::numeric_limits<int64_t>::max() - 1;
+ auto begin_key = stats_tablet_key({instance_id, table_id, start, start,
start});
+ auto end_key = stats_tablet_key({instance_id, table_id, end, end, end});
+ std::vector<std::pair<std::string, std::string>> stats_kvs;
+ int64_t sub_txn_id = 0;
+
+ std::unique_ptr<RangeGetIterator> it;
+ do {
+ TxnErrorCode err = txn->get(begin_key, end_key, &it, true);
+ if (err != TxnErrorCode::TXN_OK) {
+ st.set_code(cast_as<ErrCategory::READ>(err));
+ st.set_msg(fmt::format("failed to get tablet stats, err={}
instance_id={} table_id={} ",
+ err, instance_id, table_id));
+ return st;
+ }
+ while (it->has_next()) {
+ auto [k, v] = it->next();
+ auto k1 = k;
+ k1.remove_prefix(1);
+ std::vector<std::tuple<std::variant<int64_t, std::string>, int,
int>> out;
+ decode_key(&k1, &out);
+ // 0x01 "stats" ${instance_id} "tablet" ${table_id} ${index_id}
${partition_id} ${tablet_id} -> TabletStatsPB
+ if (out.size() == 7) {
+ sub_txn_id++;
+ // =======================================================
+ // phase 1: read tablet's rowsets data and accumulate them
+ auto tablet_id = std::get<int64_t>(std::get<0>(out[6]));
+ TabletStatsPB tablet_stat;
+ tablet_stat.ParseFromArray(v.data(), v.size());
+ LOG(INFO) << fmt::format(
+ "[Sub txn id {}, Tablet id {} fix tablet stats phase
1]: read original "
+ "tabletPB, tabletPB info: {}",
+ sub_txn_id, tablet_id, tablet_stat.DebugString());
+
+ // =======================================================
+ // phase 2: read tablet's rowsets data and accumulate them
+ GetRowsetResponse resp;
+ internal_get_rowset(txn.get(), start, end, instance_id,
tablet_id, code, msg,
+ &resp);
+ if (code != MetaServiceCode::OK) {
+ st.set_code(code);
+ st.set_msg(msg);
+ return st;
+ }
+ int64_t total_disk_size = 0;
+ for (const auto& rs_meta : resp.rowset_meta()) {
+ rs_meta.rowset_id();
+ total_disk_size += rs_meta.total_disk_size();
+ LOG(INFO) << fmt::format(
+ "[Sub txn id {}, Tablet id {} fix tablet stats
phase 2]: read "
+ "rowsetsPB, total disk size: {}, rowsetPB info:
{}",
+ sub_txn_id, tablet_id, total_disk_size,
rs_meta.DebugString());
+ }
+
+ // =======================================================
+ // phase 3: set new data to tabletPB and write it back
+ tablet_stat.set_data_size(total_disk_size);
+ std::string tablet_stat_key;
+ std::string tablet_stat_value;
+ tablet_stat_key = stats_tablet_key(
+ {instance_id, table_id, tablet_stat.idx().index_id(),
+ tablet_stat.idx().partition_id(),
tablet_stat.idx().tablet_id()});
+ if (!tablet_stat.SerializeToString(&tablet_stat_value)) {
+ st.set_code(MetaServiceCode::PROTOBUF_SERIALIZE_ERR);
+ st.set_msg("failed to serialize tablet stat");
+ return st;
+ }
+ txn->put(tablet_stat_key, tablet_stat_value);
+ LOG(INFO) << fmt::format(
+ "[Sub txn id{}, Tablet id {} fix tablet stats phase
3]: write new "
+ "tabletPB, tabletPB info: {}",
+ sub_txn_id, tablet_id, tablet_stat.DebugString());
+
+ // =======================================================
+ // 0x01 "stats" ${instance_id} "tablet" ${table_id}
${index_id} ${partition_id} ${tablet_id} "data_size" -> int64
+ // phase 4: set tablet stats data_size = 0
+ std::string tablet_stat_data_size_key;
+ stats_tablet_data_size_key(
+ {instance_id, table_id, tablet_stat.idx().index_id(),
+ tablet_stat.idx().partition_id(),
tablet_stat.idx().tablet_id()},
+ &tablet_stat_data_size_key);
+ // set tablet stats data size = 0
+ int64_t tablet_stat_data_size = 0;
+ std::string
tablet_stat_data_size_value(sizeof(tablet_stat_data_size), '\0');
+ memcpy(tablet_stat_data_size_value.data(),
&tablet_stat_data_size,
+ sizeof(tablet_stat_data_size));
+ txn->put(tablet_stat_data_size_key,
tablet_stat_data_size_value);
+ LOG(INFO) << fmt::format(
+ "[Sub txn id {} Tablet id {} fix tablet stats phase
4]: set tablet stats "
+ "data size = 0, data size : {}",
+ sub_txn_id, tablet_id, tablet_stat_data_size_value);
+
+ // =======================================================
+ // phase 5: get tabletPB to check correctness
+ err = txn->get(tablet_stat_key, &tablet_stat_value, true);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::READ>(err);
+ LOG(INFO) << fmt::format(
+ "[Sub txn id {} Tablet id {} fix tablet stats
phase 5]: get tablet "
+ "stats failed, err {}",
+ sub_txn_id, tablet_id, err);
+ }
+ TabletStatsPB tablet_stat_check;
+ tablet_stat_check.ParseFromArray(tablet_stat_value.data(),
+ tablet_stat_value.size());
+ LOG(INFO) << fmt::format(
+ "[Sub txn id {} Tablet id {} fix tablet stats phase
5]: check correctness "
+ "get tabletPB {}",
+ sub_txn_id, tablet_id,
tablet_stat_check.DebugString());
+ if (tablet_stat_check.DebugString() !=
tablet_stat.DebugString()) {
+ LOG(WARNING) << fmt::format(
+ "[Sub txn id {} Tablet id {} fix tablet stats
phase 5]: check "
+ "correctness get tabletPB failed, tablet_stat {},
tablet_stat_check {}",
+ sub_txn_id, tablet_id, tablet_stat.DebugString(),
+ tablet_stat_check.DebugString());
+ }
+
+ // =======================================================
+ // phase 6: get tablet data size to check correctness
+ err = txn->get(tablet_stat_data_size_key,
&tablet_stat_data_size_value, true);
Review Comment:
create a new txn to get
##########
cloud/src/meta-service/meta_service.cpp:
##########
@@ -2188,4 +2188,238 @@ std::pair<MetaServiceCode, std::string>
MetaServiceImpl::get_instance_info(
return {code, std::move(msg)};
}
+MetaServiceResponseStatus MetaServiceImpl::fix_tablet_stats(std::string
cloud_unique_id_str,
+ std::string
table_id_str) {
+ MetaServiceCode code;
+ std::string msg;
+ MetaServiceResponseStatus st;
+
+ // parse params
+ int64_t table_id;
+ try {
+ table_id = std::stoll(table_id_str);
+ } catch (...) {
+ st.set_code(MetaServiceCode::INVALID_ARGUMENT);
+ st.set_msg("Invalid table_id, table_id: " + table_id_str);
+ return st;
+ }
+
+ std::string instance_id = get_instance_id(resource_mgr_,
cloud_unique_id_str);
+ if (instance_id.empty()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "empty instance_id";
+ LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id_str;
+ st.set_code(code);
+ st.set_msg(msg);
+ return st;
+ }
+
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::CREATE>(err);
+ msg = fmt::format("failed to create txn");
+ st.set_code(code);
+ st.set_msg(msg);
+ return st;
+ }
+
+ // fix tablet stats code
+ std::string key, val;
+ int64_t start = 0;
+ int64_t end = std::numeric_limits<int64_t>::max() - 1;
+ auto begin_key = stats_tablet_key({instance_id, table_id, start, start,
start});
+ auto end_key = stats_tablet_key({instance_id, table_id, end, end, end});
+ std::vector<std::pair<std::string, std::string>> stats_kvs;
+ int64_t sub_txn_id = 0;
+
+ std::unique_ptr<RangeGetIterator> it;
+ do {
+ TxnErrorCode err = txn->get(begin_key, end_key, &it, true);
+ if (err != TxnErrorCode::TXN_OK) {
+ st.set_code(cast_as<ErrCategory::READ>(err));
+ st.set_msg(fmt::format("failed to get tablet stats, err={}
instance_id={} table_id={} ",
+ err, instance_id, table_id));
+ return st;
+ }
+ while (it->has_next()) {
+ auto [k, v] = it->next();
+ auto k1 = k;
+ k1.remove_prefix(1);
+ std::vector<std::tuple<std::variant<int64_t, std::string>, int,
int>> out;
+ decode_key(&k1, &out);
+ // 0x01 "stats" ${instance_id} "tablet" ${table_id} ${index_id}
${partition_id} ${tablet_id} -> TabletStatsPB
+ if (out.size() == 7) {
+ sub_txn_id++;
+ // =======================================================
+ // phase 1: read tablet's rowsets data and accumulate them
+ auto tablet_id = std::get<int64_t>(std::get<0>(out[6]));
+ TabletStatsPB tablet_stat;
+ tablet_stat.ParseFromArray(v.data(), v.size());
+ LOG(INFO) << fmt::format(
+ "[Sub txn id {}, Tablet id {} fix tablet stats phase
1]: read original "
+ "tabletPB, tabletPB info: {}",
+ sub_txn_id, tablet_id, tablet_stat.DebugString());
+
+ // =======================================================
+ // phase 2: read tablet's rowsets data and accumulate them
+ GetRowsetResponse resp;
+ internal_get_rowset(txn.get(), start, end, instance_id,
tablet_id, code, msg,
+ &resp);
+ if (code != MetaServiceCode::OK) {
+ st.set_code(code);
+ st.set_msg(msg);
+ return st;
+ }
+ int64_t total_disk_size = 0;
+ for (const auto& rs_meta : resp.rowset_meta()) {
+ rs_meta.rowset_id();
+ total_disk_size += rs_meta.total_disk_size();
+ LOG(INFO) << fmt::format(
+ "[Sub txn id {}, Tablet id {} fix tablet stats
phase 2]: read "
+ "rowsetsPB, total disk size: {}, rowsetPB info:
{}",
+ sub_txn_id, tablet_id, total_disk_size,
rs_meta.DebugString());
+ }
+
+ // =======================================================
+ // phase 3: set new data to tabletPB and write it back
+ tablet_stat.set_data_size(total_disk_size);
+ std::string tablet_stat_key;
+ std::string tablet_stat_value;
+ tablet_stat_key = stats_tablet_key(
+ {instance_id, table_id, tablet_stat.idx().index_id(),
+ tablet_stat.idx().partition_id(),
tablet_stat.idx().tablet_id()});
+ if (!tablet_stat.SerializeToString(&tablet_stat_value)) {
+ st.set_code(MetaServiceCode::PROTOBUF_SERIALIZE_ERR);
+ st.set_msg("failed to serialize tablet stat");
+ return st;
+ }
+ txn->put(tablet_stat_key, tablet_stat_value);
+ LOG(INFO) << fmt::format(
+ "[Sub txn id{}, Tablet id {} fix tablet stats phase
3]: write new "
+ "tabletPB, tabletPB info: {}",
+ sub_txn_id, tablet_id, tablet_stat.DebugString());
+
+ // =======================================================
+ // 0x01 "stats" ${instance_id} "tablet" ${table_id}
${index_id} ${partition_id} ${tablet_id} "data_size" -> int64
+ // phase 4: set tablet stats data_size = 0
+ std::string tablet_stat_data_size_key;
+ stats_tablet_data_size_key(
+ {instance_id, table_id, tablet_stat.idx().index_id(),
+ tablet_stat.idx().partition_id(),
tablet_stat.idx().tablet_id()},
+ &tablet_stat_data_size_key);
+ // set tablet stats data size = 0
+ int64_t tablet_stat_data_size = 0;
+ std::string
tablet_stat_data_size_value(sizeof(tablet_stat_data_size), '\0');
+ memcpy(tablet_stat_data_size_value.data(),
&tablet_stat_data_size,
+ sizeof(tablet_stat_data_size));
+ txn->put(tablet_stat_data_size_key,
tablet_stat_data_size_value);
+ LOG(INFO) << fmt::format(
+ "[Sub txn id {} Tablet id {} fix tablet stats phase
4]: set tablet stats "
+ "data size = 0, data size : {}",
+ sub_txn_id, tablet_id, tablet_stat_data_size_value);
+
+ // =======================================================
+ // phase 5: get tabletPB to check correctness
+ err = txn->get(tablet_stat_key, &tablet_stat_value, true);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::READ>(err);
+ LOG(INFO) << fmt::format(
+ "[Sub txn id {} Tablet id {} fix tablet stats
phase 5]: get tablet "
+ "stats failed, err {}",
+ sub_txn_id, tablet_id, err);
+ }
+ TabletStatsPB tablet_stat_check;
+ tablet_stat_check.ParseFromArray(tablet_stat_value.data(),
+ tablet_stat_value.size());
+ LOG(INFO) << fmt::format(
+ "[Sub txn id {} Tablet id {} fix tablet stats phase
5]: check correctness "
+ "get tabletPB {}",
+ sub_txn_id, tablet_id,
tablet_stat_check.DebugString());
+ if (tablet_stat_check.DebugString() !=
tablet_stat.DebugString()) {
+ LOG(WARNING) << fmt::format(
+ "[Sub txn id {} Tablet id {} fix tablet stats
phase 5]: check "
+ "correctness get tabletPB failed, tablet_stat {},
tablet_stat_check {}",
+ sub_txn_id, tablet_id, tablet_stat.DebugString(),
+ tablet_stat_check.DebugString());
+ }
+
+ // =======================================================
+ // phase 6: get tablet data size to check correctness
+ err = txn->get(tablet_stat_data_size_key,
&tablet_stat_data_size_value, true);
Review Comment:
create a new txn to check
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]