w41ter commented on code in PR #53796:
URL: https://github.com/apache/doris/pull/53796#discussion_r2227265636
##########
cloud/src/meta-service/meta_service_partition.cpp:
##########
@@ -276,20 +319,27 @@ void
MetaServiceImpl::drop_index(::google::protobuf::RpcController* controller,
std::string to_save_val;
{
RecycleIndexPB pb;
+ pb.set_db_id(request->db_id());
pb.set_table_id(request->table_id());
pb.set_creation_time(::time(nullptr));
pb.set_expiration(request->expiration());
pb.set_state(RecycleIndexPB::DROPPED);
pb.SerializeToString(&to_save_val);
}
bool need_commit = false;
+ DropIndexLogPB drop_index_log;
+ drop_index_log.set_db_id(request->db_id());
+ drop_index_log.set_table_id(request->table_id());
+ drop_index_log.set_expiration(request->expiration());
+
for (auto index_id : request->index_ids()) {
auto key = recycle_index_key({instance_id, index_id});
std::string val;
err = txn->get(key, &val);
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { // UNKNOWN
LOG_INFO("put recycle index").tag("key", hex(key));
txn->put(key, to_save_val);
+ drop_index_log.add_index_ids(index_id);
Review Comment:
It appears that the recycle index key should not be written if versioned
writes are enabled.
The drop_partition implementation is wrong.
##########
cloud/test/meta_service_operation_log_test.cpp:
##########
@@ -355,4 +355,309 @@ TEST(MetaServiceOperationLogTest, DropPartitionLog) {
ASSERT_EQ(version1, version2);
}
+TEST(MetaServiceOperationLogTest, CommitIndexLog) {
+ auto meta_service = get_meta_service(false);
+ std::string instance_id = "commit_index_log";
+ auto* sp = SyncPoint::get_instance();
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
+ sp->set_call_back("get_instance_id", [&](auto&& args) {
+ auto* ret = try_any_cast_ret<std::string>(args);
+ ret->first = instance_id;
+ ret->second = true;
+ });
+ sp->enable_processing();
+
+ constexpr int64_t db_id = 123;
+ constexpr int64_t table_id = 10001;
+ constexpr int64_t index_id = 10002;
+
+ {
+ // write instance
+ InstanceInfoPB instance_info;
+ instance_info.set_instance_id(instance_id);
+ instance_info.set_multi_version_status(MULTI_VERSION_WRITE_ONLY);
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn),
TxnErrorCode::TXN_OK);
+ txn->put(instance_key(instance_id), instance_info.SerializeAsString());
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+ meta_service->resource_mgr()->refresh_instance(instance_id);
+
ASSERT_TRUE(meta_service->resource_mgr()->is_version_write_enabled(instance_id));
+ }
+
+ {
+ // Prepare index
+ brpc::Controller ctrl;
+ IndexRequest req;
+ IndexResponse res;
+ req.set_db_id(db_id);
+ req.set_table_id(table_id);
+ req.add_index_ids(index_id);
+ meta_service->prepare_index(&ctrl, &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK) <<
res.status().DebugString();
+ }
+
+ {
+ // Commit index
+ brpc::Controller ctrl;
+ IndexRequest req;
+ IndexResponse res;
+ req.set_db_id(db_id);
+ req.set_table_id(table_id);
+ req.add_index_ids(index_id);
+ req.set_is_new_table(true);
+ meta_service->commit_index(&ctrl, &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK) <<
res.status().DebugString();
+ }
+
+ auto txn_kv = meta_service->txn_kv();
+ Versionstamp version1;
+ {
+ // Verify index meta/index/inverted indexes are exists
+ std::string index_meta_key = versioned::meta_index_key({instance_id,
index_id});
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ std::string value;
+ ASSERT_EQ(versioned_get(txn.get(), index_meta_key, &version1, &value),
+ TxnErrorCode::TXN_OK);
+
+ std::string index_inverted_key =
+ versioned::index_inverted_key({instance_id, db_id, table_id,
index_id});
+ ASSERT_EQ(txn->get(index_inverted_key, &value), TxnErrorCode::TXN_OK);
+
+ std::string index_index_key = versioned::index_index_key({instance_id,
index_id});
+ ASSERT_EQ(txn->get(index_index_key, &value), TxnErrorCode::TXN_OK);
+ IndexIndexPB index_index;
+ ASSERT_TRUE(index_index.ParseFromString(value));
+ ASSERT_EQ(index_index.db_id(), db_id);
+ ASSERT_EQ(index_index.table_id(), table_id);
+ }
+
+ Versionstamp version2;
+ {
+ // Verify table version exists
+ MetaReader meta_reader(instance_id, txn_kv.get());
+ ASSERT_EQ(meta_reader.get_table_version(table_id, &version2),
TxnErrorCode::TXN_OK);
+ }
+
+ ASSERT_EQ(version1, version2);
+
+ {
+ // verify commit index log
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ std::string log_key = versioned::log_key({instance_id});
+ std::string value;
+ ASSERT_EQ(versioned_get(txn.get(), log_key, &version2, &value),
TxnErrorCode::TXN_OK);
+ OperationLogPB operation_log;
+ ASSERT_TRUE(operation_log.ParseFromString(value));
+ ASSERT_TRUE(operation_log.has_commit_index());
+ }
+
+ ASSERT_EQ(version1, version2);
+
+ {
+ // Prepare index2
+ brpc::Controller ctrl;
+ IndexRequest req;
+ IndexResponse res;
+ req.set_db_id(db_id);
+ req.set_table_id(table_id);
+ req.add_index_ids(index_id + 1);
+ meta_service->prepare_index(&ctrl, &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK) <<
res.status().DebugString();
+ }
+
+ {
+ // Commit index2 without set_is_new_table(true)
+ brpc::Controller ctrl;
+ IndexRequest req;
+ IndexResponse res;
+ req.set_db_id(db_id);
+ req.set_table_id(table_id);
+ req.add_index_ids(index_id + 1);
+ meta_service->commit_index(&ctrl, &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK) <<
res.status().DebugString();
+ }
+
+ {
+ // Verify index1 meta/index/inverted indexes are exists
+ std::string index_meta_key = versioned::meta_index_key({instance_id,
index_id + 1});
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ std::string value;
+ ASSERT_EQ(versioned_get(txn.get(), index_meta_key, &version1, &value),
+ TxnErrorCode::TXN_OK);
+
+ std::string index_inverted_key =
+ versioned::index_inverted_key({instance_id, db_id, table_id,
index_id + 1});
+ ASSERT_EQ(txn->get(index_inverted_key, &value), TxnErrorCode::TXN_OK);
+
+ std::string index_index_key = versioned::index_index_key({instance_id,
index_id + 1});
+ ASSERT_EQ(txn->get(index_index_key, &value), TxnErrorCode::TXN_OK);
+ IndexIndexPB index_index;
+ ASSERT_TRUE(index_index.ParseFromString(value));
+ ASSERT_EQ(index_index.db_id(), db_id);
+ ASSERT_EQ(index_index.table_id(), table_id);
+ }
+
+ Versionstamp version3;
+ {
+ // Verify table version exists and does not update
+ MetaReader meta_reader(instance_id, txn_kv.get());
+ ASSERT_EQ(meta_reader.get_table_version(table_id, &version3),
TxnErrorCode::TXN_OK);
+ }
+
+ ASSERT_EQ(version3, version2);
+ ASSERT_NE(version3, version1);
+}
+
+TEST(MetaServiceOperationLogTest, DropIndexLog) {
+ auto meta_service = get_meta_service(false);
+ std::string instance_id = "drop_index_log";
+ auto* sp = SyncPoint::get_instance();
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
+ sp->set_call_back("get_instance_id", [&](auto&& args) {
+ auto* ret = try_any_cast_ret<std::string>(args);
+ ret->first = instance_id;
+ ret->second = true;
+ });
+ sp->enable_processing();
+
+ constexpr int64_t db_id = 123;
+ constexpr int64_t table_id = 10001;
+ constexpr int64_t index_id = 10002;
+
+ {
+ // write instance
+ InstanceInfoPB instance_info;
+ instance_info.set_instance_id(instance_id);
+ instance_info.set_multi_version_status(MULTI_VERSION_WRITE_ONLY);
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn),
TxnErrorCode::TXN_OK);
+ txn->put(instance_key(instance_id), instance_info.SerializeAsString());
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+ meta_service->resource_mgr()->refresh_instance(instance_id);
+
ASSERT_TRUE(meta_service->resource_mgr()->is_version_write_enabled(instance_id));
+ }
+
+ {
+ // Prepare index 0,1,2,3
+ brpc::Controller ctrl;
+ IndexRequest req;
+ IndexResponse res;
+ req.set_db_id(db_id);
+ req.set_table_id(table_id);
+ req.add_index_ids(index_id);
+ req.add_index_ids(index_id + 1);
+ req.add_index_ids(index_id + 2);
+ req.add_index_ids(index_id + 3);
+ meta_service->prepare_index(&ctrl, &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK) <<
res.status().DebugString();
+ }
+
+ {
+ // Commit index 2,3
+ brpc::Controller ctrl;
+ IndexRequest req;
+ IndexResponse res;
+ req.set_db_id(db_id);
+ req.set_table_id(table_id);
+ req.add_index_ids(index_id + 2);
+ req.add_index_ids(index_id + 3);
+ meta_service->commit_index(&ctrl, &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK) <<
res.status().DebugString();
+ }
+
+ auto txn_kv = meta_service->txn_kv();
+ size_t num_logs = count_range(txn_kv.get(),
versioned::log_key(instance_id),
+ versioned::log_key(instance_id) + "\xFF");
+
+ {
+ // Drop index 0
+ brpc::Controller ctrl;
+ IndexRequest req;
+ IndexResponse res;
+ req.set_db_id(db_id);
+ req.set_table_id(table_id);
+ req.add_index_ids(index_id);
+ meta_service->drop_index(&ctrl, &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK) <<
res.status().DebugString();
+
+ // No operation log are generated for drop index 0 (it is not
committed).
+ size_t new_num_logs = count_range(txn_kv.get(),
versioned::log_key(instance_id),
+ versioned::log_key(instance_id) +
"\xFF");
+ ASSERT_EQ(new_num_logs, num_logs)
+ << "Expected no new operation logs for drop index 0, but found
"
+ << new_num_logs - num_logs << dump_range(txn_kv.get());
+ }
+
+ {
+ // Drop index 1,2, it should generate operation logs.
Review Comment:
Please verify that the recycle index key does not exist if the operation log
is generated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]