This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch streaming-job-dev
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/streaming-job-dev by this push:
     new 6d1374279cf [fix](streaming job) fix streaming job ut (#56314)
6d1374279cf is described below

commit 6d1374279cfba85d19de6501b37aaf3bf2a548c9
Author: hui lai <[email protected]>
AuthorDate: Mon Sep 22 20:20:00 2025 +0800

    [fix](streaming job) fix streaming job ut (#56314)
    
    ### What problem does this PR solve?
    
    [fix](streaming job) fix streaming job ut
---
 cloud/test/meta_service_job_test.cpp | 93 ++++++++++++++++++++++++++----------
 1 file changed, 69 insertions(+), 24 deletions(-)

diff --git a/cloud/test/meta_service_job_test.cpp 
b/cloud/test/meta_service_job_test.cpp
index 9fa021e138f..5d5758579eb 100644
--- a/cloud/test/meta_service_job_test.cpp
+++ b/cloud/test/meta_service_job_test.cpp
@@ -4998,7 +4998,23 @@ TEST(MetaServiceJobTest, UpdateStreamingJobMetaTest) {
     {
         CommitTxnRequest request;
         request.set_cloud_unique_id("test_cloud_unique_id");
-        request.set_txn_id(12345);
+        // Begin a txn to obtain a valid txn_id and db_id mapping
+        int64_t db_id = 1000;
+        {
+            brpc::Controller cntl_bt;
+            BeginTxnRequest bt_req;
+            BeginTxnResponse bt_res;
+            auto* txn_info = bt_req.mutable_txn_info();
+            txn_info->set_db_id(db_id);
+            txn_info->set_label("streaming_ut_1");
+            txn_info->add_table_ids(1);
+            
txn_info->set_load_job_source_type(LoadJobSourceTypePB::LOAD_JOB_SRC_TYPE_STREAMING_JOB);
+            txn_info->set_timeout_ms(36000);
+            meta_service->begin_txn(&cntl_bt, &bt_req, &bt_res, nullptr);
+            ASSERT_EQ(bt_res.status().code(), MetaServiceCode::OK);
+            request.set_txn_id(bt_res.txn_id());
+            request.set_db_id(db_id);
+        }
 
         TxnCommitAttachmentPB* attachment = 
request.mutable_commit_attachment();
         
attachment->set_type(TxnCommitAttachmentPB::STREAMING_TASK_TXN_COMMIT_ATTACHMENT);
@@ -5006,7 +5022,6 @@ TEST(MetaServiceJobTest, UpdateStreamingJobMetaTest) {
         StreamingTaskCommitAttachmentPB* streaming_attach =
                 attachment->mutable_streaming_task_txn_commit_attachment();
         streaming_attach->set_job_id(1001);
-        streaming_attach->set_task_id(2001);
         streaming_attach->set_offset("test_offset_1");
         streaming_attach->set_scanned_rows(1000);
         streaming_attach->set_load_bytes(5000);
@@ -5025,7 +5040,23 @@ TEST(MetaServiceJobTest, UpdateStreamingJobMetaTest) {
     {
         CommitTxnRequest request;
         request.set_cloud_unique_id("test_cloud_unique_id");
-        request.set_txn_id(12346);
+        // Begin a txn to obtain a valid txn_id and db_id mapping
+        int64_t db_id = 1000;
+        {
+            brpc::Controller cntl_bt;
+            BeginTxnRequest bt_req;
+            BeginTxnResponse bt_res;
+            auto* txn_info = bt_req.mutable_txn_info();
+            txn_info->set_db_id(db_id);
+            txn_info->set_label("streaming_ut_2");
+            txn_info->add_table_ids(1);
+            
txn_info->set_load_job_source_type(LoadJobSourceTypePB::LOAD_JOB_SRC_TYPE_STREAMING_JOB);
+            txn_info->set_timeout_ms(36000);
+            meta_service->begin_txn(&cntl_bt, &bt_req, &bt_res, nullptr);
+            ASSERT_EQ(bt_res.status().code(), MetaServiceCode::OK);
+            request.set_txn_id(bt_res.txn_id());
+            request.set_db_id(db_id);
+        }
 
         TxnCommitAttachmentPB* attachment = 
request.mutable_commit_attachment();
         
attachment->set_type(TxnCommitAttachmentPB::STREAMING_TASK_TXN_COMMIT_ATTACHMENT);
@@ -5033,7 +5064,6 @@ TEST(MetaServiceJobTest, UpdateStreamingJobMetaTest) {
         StreamingTaskCommitAttachmentPB* streaming_attach =
                 attachment->mutable_streaming_task_txn_commit_attachment();
         streaming_attach->set_job_id(1001); // Same job_id as before
-        streaming_attach->set_task_id(2002);
         streaming_attach->set_offset("test_offset_2");
         streaming_attach->set_scanned_rows(500);
         streaming_attach->set_load_bytes(2000);
@@ -5052,7 +5082,23 @@ TEST(MetaServiceJobTest, UpdateStreamingJobMetaTest) {
     {
         CommitTxnRequest request;
         request.set_cloud_unique_id("test_cloud_unique_id");
-        request.set_txn_id(12347);
+        // Begin a txn to obtain a valid txn_id and db_id mapping
+        int64_t db_id = 1000;
+        {
+            brpc::Controller cntl_bt;
+            BeginTxnRequest bt_req;
+            BeginTxnResponse bt_res;
+            auto* txn_info = bt_req.mutable_txn_info();
+            txn_info->set_db_id(db_id);
+            txn_info->set_label("streaming_ut_missing_attach");
+            txn_info->add_table_ids(1);
+            
txn_info->set_load_job_source_type(LoadJobSourceTypePB::LOAD_JOB_SRC_TYPE_STREAMING_JOB);
+            txn_info->set_timeout_ms(36000);
+            meta_service->begin_txn(&cntl_bt, &bt_req, &bt_res, nullptr);
+            ASSERT_EQ(bt_res.status().code(), MetaServiceCode::OK);
+            request.set_txn_id(bt_res.txn_id());
+            request.set_db_id(db_id);
+        }
         // No commit attachment set
 
         CommitTxnResponse response;
@@ -5073,7 +5119,23 @@ TEST(MetaServiceJobTest, 
GetStreamingTaskCommitAttachTest) {
     {
         CommitTxnRequest request;
         request.set_cloud_unique_id("test_cloud_unique_id");
-        request.set_txn_id(12348);
+        // Begin a txn to obtain a valid txn_id and db_id mapping
+        int64_t db_id = 1000;
+        {
+            brpc::Controller cntl_bt;
+            BeginTxnRequest bt_req;
+            BeginTxnResponse bt_res;
+            auto* txn_info = bt_req.mutable_txn_info();
+            txn_info->set_db_id(db_id);
+            txn_info->set_label("streaming_ut_3");
+            txn_info->add_table_ids(1);
+            
txn_info->set_load_job_source_type(LoadJobSourceTypePB::LOAD_JOB_SRC_TYPE_STREAMING_JOB);
+            txn_info->set_timeout_ms(36000);
+            meta_service->begin_txn(&cntl_bt, &bt_req, &bt_res, nullptr);
+            ASSERT_EQ(bt_res.status().code(), MetaServiceCode::OK);
+            request.set_txn_id(bt_res.txn_id());
+            request.set_db_id(db_id);
+        }
 
         TxnCommitAttachmentPB* attachment = 
request.mutable_commit_attachment();
         
attachment->set_type(TxnCommitAttachmentPB::STREAMING_TASK_TXN_COMMIT_ATTACHMENT);
@@ -5081,7 +5143,6 @@ TEST(MetaServiceJobTest, 
GetStreamingTaskCommitAttachTest) {
         StreamingTaskCommitAttachmentPB* streaming_attach =
                 attachment->mutable_streaming_task_txn_commit_attachment();
         streaming_attach->set_job_id(1002);
-        streaming_attach->set_task_id(3001);
         streaming_attach->set_offset("test_offset_3");
         streaming_attach->set_scanned_rows(2000);
         streaming_attach->set_load_bytes(10000);
@@ -5100,7 +5161,7 @@ TEST(MetaServiceJobTest, 
GetStreamingTaskCommitAttachTest) {
     {
         GetStreamingTaskCommitAttachRequest request;
         request.set_cloud_unique_id("test_cloud_unique_id");
-        request.set_db_id(1000); // Assuming db_id from the commit
+        request.set_db_id(1000); // Must match db_id used when committing
         request.set_job_id(1002);
 
         GetStreamingTaskCommitAttachResponse response;
@@ -5147,22 +5208,6 @@ TEST(MetaServiceJobTest, 
GetStreamingTaskCommitAttachTest) {
         EXPECT_EQ(response.status().code(), MetaServiceCode::INVALID_ARGUMENT);
         EXPECT_TRUE(response.status().msg().find("empty db_id or job_id") != 
std::string::npos);
     }
-
-    // Test case 4: Invalid cloud_unique_id
-    {
-        GetStreamingTaskCommitAttachRequest request;
-        request.set_cloud_unique_id("invalid_cloud_unique_id");
-        request.set_db_id(1000);
-        request.set_job_id(1002);
-
-        GetStreamingTaskCommitAttachResponse response;
-        brpc::Controller cntl;
-        meta_service->get_streaming_task_commit_attach(&cntl, &request, 
&response, nullptr);
-
-        EXPECT_FALSE(cntl.Failed()) << "Error: " << cntl.ErrorText();
-        EXPECT_EQ(response.status().code(), MetaServiceCode::INVALID_ARGUMENT);
-        EXPECT_TRUE(response.status().msg().find("empty instance_id") != 
std::string::npos);
-    }
 }
 
 } // namespace doris::cloud


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to