This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch streaming-job-dev
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/streaming-job-dev by this push:
new 6d1374279cf [fix](streaming job) fix streaming job ut (#56314)
6d1374279cf is described below
commit 6d1374279cfba85d19de6501b37aaf3bf2a548c9
Author: hui lai <[email protected]>
AuthorDate: Mon Sep 22 20:20:00 2025 +0800
[fix](streaming job) fix streaming job ut (#56314)
### What problem does this PR solve?
[fix](streaming job) fix streaming job ut
---
cloud/test/meta_service_job_test.cpp | 93 ++++++++++++++++++++++++++----------
1 file changed, 69 insertions(+), 24 deletions(-)
diff --git a/cloud/test/meta_service_job_test.cpp
b/cloud/test/meta_service_job_test.cpp
index 9fa021e138f..5d5758579eb 100644
--- a/cloud/test/meta_service_job_test.cpp
+++ b/cloud/test/meta_service_job_test.cpp
@@ -4998,7 +4998,23 @@ TEST(MetaServiceJobTest, UpdateStreamingJobMetaTest) {
{
CommitTxnRequest request;
request.set_cloud_unique_id("test_cloud_unique_id");
- request.set_txn_id(12345);
+ // Begin a txn to obtain a valid txn_id and db_id mapping
+ int64_t db_id = 1000;
+ {
+ brpc::Controller cntl_bt;
+ BeginTxnRequest bt_req;
+ BeginTxnResponse bt_res;
+ auto* txn_info = bt_req.mutable_txn_info();
+ txn_info->set_db_id(db_id);
+ txn_info->set_label("streaming_ut_1");
+ txn_info->add_table_ids(1);
+
txn_info->set_load_job_source_type(LoadJobSourceTypePB::LOAD_JOB_SRC_TYPE_STREAMING_JOB);
+ txn_info->set_timeout_ms(36000);
+ meta_service->begin_txn(&cntl_bt, &bt_req, &bt_res, nullptr);
+ ASSERT_EQ(bt_res.status().code(), MetaServiceCode::OK);
+ request.set_txn_id(bt_res.txn_id());
+ request.set_db_id(db_id);
+ }
TxnCommitAttachmentPB* attachment =
request.mutable_commit_attachment();
attachment->set_type(TxnCommitAttachmentPB::STREAMING_TASK_TXN_COMMIT_ATTACHMENT);
@@ -5006,7 +5022,6 @@ TEST(MetaServiceJobTest, UpdateStreamingJobMetaTest) {
StreamingTaskCommitAttachmentPB* streaming_attach =
attachment->mutable_streaming_task_txn_commit_attachment();
streaming_attach->set_job_id(1001);
- streaming_attach->set_task_id(2001);
streaming_attach->set_offset("test_offset_1");
streaming_attach->set_scanned_rows(1000);
streaming_attach->set_load_bytes(5000);
@@ -5025,7 +5040,23 @@ TEST(MetaServiceJobTest, UpdateStreamingJobMetaTest) {
{
CommitTxnRequest request;
request.set_cloud_unique_id("test_cloud_unique_id");
- request.set_txn_id(12346);
+ // Begin a txn to obtain a valid txn_id and db_id mapping
+ int64_t db_id = 1000;
+ {
+ brpc::Controller cntl_bt;
+ BeginTxnRequest bt_req;
+ BeginTxnResponse bt_res;
+ auto* txn_info = bt_req.mutable_txn_info();
+ txn_info->set_db_id(db_id);
+ txn_info->set_label("streaming_ut_2");
+ txn_info->add_table_ids(1);
+
txn_info->set_load_job_source_type(LoadJobSourceTypePB::LOAD_JOB_SRC_TYPE_STREAMING_JOB);
+ txn_info->set_timeout_ms(36000);
+ meta_service->begin_txn(&cntl_bt, &bt_req, &bt_res, nullptr);
+ ASSERT_EQ(bt_res.status().code(), MetaServiceCode::OK);
+ request.set_txn_id(bt_res.txn_id());
+ request.set_db_id(db_id);
+ }
TxnCommitAttachmentPB* attachment =
request.mutable_commit_attachment();
attachment->set_type(TxnCommitAttachmentPB::STREAMING_TASK_TXN_COMMIT_ATTACHMENT);
@@ -5033,7 +5064,6 @@ TEST(MetaServiceJobTest, UpdateStreamingJobMetaTest) {
StreamingTaskCommitAttachmentPB* streaming_attach =
attachment->mutable_streaming_task_txn_commit_attachment();
streaming_attach->set_job_id(1001); // Same job_id as before
- streaming_attach->set_task_id(2002);
streaming_attach->set_offset("test_offset_2");
streaming_attach->set_scanned_rows(500);
streaming_attach->set_load_bytes(2000);
@@ -5052,7 +5082,23 @@ TEST(MetaServiceJobTest, UpdateStreamingJobMetaTest) {
{
CommitTxnRequest request;
request.set_cloud_unique_id("test_cloud_unique_id");
- request.set_txn_id(12347);
+ // Begin a txn to obtain a valid txn_id and db_id mapping
+ int64_t db_id = 1000;
+ {
+ brpc::Controller cntl_bt;
+ BeginTxnRequest bt_req;
+ BeginTxnResponse bt_res;
+ auto* txn_info = bt_req.mutable_txn_info();
+ txn_info->set_db_id(db_id);
+ txn_info->set_label("streaming_ut_missing_attach");
+ txn_info->add_table_ids(1);
+
txn_info->set_load_job_source_type(LoadJobSourceTypePB::LOAD_JOB_SRC_TYPE_STREAMING_JOB);
+ txn_info->set_timeout_ms(36000);
+ meta_service->begin_txn(&cntl_bt, &bt_req, &bt_res, nullptr);
+ ASSERT_EQ(bt_res.status().code(), MetaServiceCode::OK);
+ request.set_txn_id(bt_res.txn_id());
+ request.set_db_id(db_id);
+ }
// No commit attachment set
CommitTxnResponse response;
@@ -5073,7 +5119,23 @@ TEST(MetaServiceJobTest,
GetStreamingTaskCommitAttachTest) {
{
CommitTxnRequest request;
request.set_cloud_unique_id("test_cloud_unique_id");
- request.set_txn_id(12348);
+ // Begin a txn to obtain a valid txn_id and db_id mapping
+ int64_t db_id = 1000;
+ {
+ brpc::Controller cntl_bt;
+ BeginTxnRequest bt_req;
+ BeginTxnResponse bt_res;
+ auto* txn_info = bt_req.mutable_txn_info();
+ txn_info->set_db_id(db_id);
+ txn_info->set_label("streaming_ut_3");
+ txn_info->add_table_ids(1);
+
txn_info->set_load_job_source_type(LoadJobSourceTypePB::LOAD_JOB_SRC_TYPE_STREAMING_JOB);
+ txn_info->set_timeout_ms(36000);
+ meta_service->begin_txn(&cntl_bt, &bt_req, &bt_res, nullptr);
+ ASSERT_EQ(bt_res.status().code(), MetaServiceCode::OK);
+ request.set_txn_id(bt_res.txn_id());
+ request.set_db_id(db_id);
+ }
TxnCommitAttachmentPB* attachment =
request.mutable_commit_attachment();
attachment->set_type(TxnCommitAttachmentPB::STREAMING_TASK_TXN_COMMIT_ATTACHMENT);
@@ -5081,7 +5143,6 @@ TEST(MetaServiceJobTest,
GetStreamingTaskCommitAttachTest) {
StreamingTaskCommitAttachmentPB* streaming_attach =
attachment->mutable_streaming_task_txn_commit_attachment();
streaming_attach->set_job_id(1002);
- streaming_attach->set_task_id(3001);
streaming_attach->set_offset("test_offset_3");
streaming_attach->set_scanned_rows(2000);
streaming_attach->set_load_bytes(10000);
@@ -5100,7 +5161,7 @@ TEST(MetaServiceJobTest,
GetStreamingTaskCommitAttachTest) {
{
GetStreamingTaskCommitAttachRequest request;
request.set_cloud_unique_id("test_cloud_unique_id");
- request.set_db_id(1000); // Assuming db_id from the commit
+ request.set_db_id(1000); // Must match db_id used when committing
request.set_job_id(1002);
GetStreamingTaskCommitAttachResponse response;
@@ -5147,22 +5208,6 @@ TEST(MetaServiceJobTest,
GetStreamingTaskCommitAttachTest) {
EXPECT_EQ(response.status().code(), MetaServiceCode::INVALID_ARGUMENT);
EXPECT_TRUE(response.status().msg().find("empty db_id or job_id") !=
std::string::npos);
}
-
- // Test case 4: Invalid cloud_unique_id
- {
- GetStreamingTaskCommitAttachRequest request;
- request.set_cloud_unique_id("invalid_cloud_unique_id");
- request.set_db_id(1000);
- request.set_job_id(1002);
-
- GetStreamingTaskCommitAttachResponse response;
- brpc::Controller cntl;
- meta_service->get_streaming_task_commit_attach(&cntl, &request,
&response, nullptr);
-
- EXPECT_FALSE(cntl.Failed()) << "Error: " << cntl.ErrorText();
- EXPECT_EQ(response.status().code(), MetaServiceCode::INVALID_ARGUMENT);
- EXPECT_TRUE(response.status().msg().find("empty instance_id") !=
std::string::npos);
- }
}
} // namespace doris::cloud
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]