This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch streaming-job-dev
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/streaming-job-dev by this push:
     new 4f5f56a1976 [fix](streaming job) refactor some logic and add ut 
(#56292)
4f5f56a1976 is described below

commit 4f5f56a19769726af3d642ce34caba83c468b482
Author: hui lai <[email protected]>
AuthorDate: Mon Sep 22 14:49:48 2025 +0800

    [fix](streaming job) refactor some logic and add ut (#56292)
    
    ### What problem does this PR solve?
    
    Refactor some logic and add ut.
---
 cloud/src/meta-service/meta_service_txn.cpp        |  79 +++++-----
 cloud/src/meta-store/keys.cpp                      |  16 +-
 cloud/src/meta-store/keys.h                        |   8 +-
 cloud/test/keys_test.cpp                           |  52 ++++++
 cloud/test/meta_service_job_test.cpp               | 175 +++++++++++++++++++++
 .../apache/doris/cloud/transaction/TxnUtil.java    |   5 +-
 .../insert/streaming/StreamingInsertJob.java       |   4 +-
 .../StreamingTaskTxnCommitAttachment.java          |  18 +--
 gensrc/proto/cloud.proto                           |  11 +-
 9 files changed, 294 insertions(+), 74 deletions(-)

diff --git a/cloud/src/meta-service/meta_service_txn.cpp 
b/cloud/src/meta-service/meta_service_txn.cpp
index 22b2a449231..e327e2bf473 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -626,59 +626,54 @@ void update_streaming_job_meta(MetaServiceCode& code, 
std::string& msg,
             txn_commit_attachment.streaming_task_txn_commit_attachment();
     int64_t job_id = commit_attachment.job_id();
 
-    std::string streaming_meta_key;
-    std::string streaming_meta_val;
-    bool prev_meta_existed = true;
-    StreamingJobMetaKeyInfo streaming_meta_key_info {instance_id, db_id, 
job_id};
-    streaming_job_meta_key_info(streaming_meta_key_info, &streaming_meta_key);
-    TxnErrorCode err = txn->get(streaming_meta_key, &streaming_meta_val);
-    if (err != TxnErrorCode::TXN_OK) {
-        if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
-            prev_meta_existed = false;
-        } else {
-            code = cast_as<ErrCategory::READ>(err);
-            ss << "failed to get streaming job meta, db_id=" << db_id << " 
txn_id=" << txn_id
-               << " err=" << err;
-            msg = ss.str();
-            return;
-        }
+    std::string streaming_job_val;
+    bool prev_existed = true;
+    std::string streaming_job_key_str = streaming_job_key({instance_id, db_id, 
job_id});
+    TxnErrorCode err = txn->get(streaming_job_key_str, &streaming_job_val);
+    if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+        prev_existed = false;
+    } else if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::READ>(err);
+        ss << "failed to get streaming job, db_id=" << db_id << " txn_id=" << 
txn_id
+           << " err=" << err;
+        msg = ss.str();
+        return;
     }
 
-    StreamingTaskCommitAttachmentPB new_meta_info;
-    if (prev_meta_existed) {
-        if (!new_meta_info.ParseFromString(streaming_meta_val)) {
+    StreamingTaskCommitAttachmentPB new_job_info;
+    if (prev_existed) {
+        if (!new_job_info.ParseFromString(streaming_job_val)) {
             code = MetaServiceCode::PROTOBUF_PARSE_ERR;
             ss << "failed to parse streaming job meta, db_id=" << db_id << " 
txn_id=" << txn_id;
             msg = ss.str();
             return;
         }
-        new_meta_info.set_scanned_rows(new_meta_info.scanned_rows() +
-                                       commit_attachment.scanned_rows());
-        new_meta_info.set_load_bytes(new_meta_info.load_bytes() + 
commit_attachment.load_bytes());
-        new_meta_info.set_file_number(new_meta_info.file_number() +
-                                      commit_attachment.file_number());
-        new_meta_info.set_file_size(new_meta_info.file_size() + 
commit_attachment.file_size());
+        new_job_info.set_scanned_rows(new_job_info.scanned_rows() +
+                                      commit_attachment.scanned_rows());
+        new_job_info.set_load_bytes(new_job_info.load_bytes() + 
commit_attachment.load_bytes());
+        new_job_info.set_num_files(new_job_info.num_files() + 
commit_attachment.num_files());
+        new_job_info.set_file_bytes(new_job_info.file_bytes() + 
commit_attachment.file_bytes());
     } else {
-        new_meta_info.set_job_id(commit_attachment.job_id());
-        new_meta_info.set_scanned_rows(commit_attachment.scanned_rows());
-        new_meta_info.set_load_bytes(commit_attachment.load_bytes());
-        new_meta_info.set_file_number(commit_attachment.file_number());
-        new_meta_info.set_file_size(commit_attachment.file_size());
+        new_job_info.set_job_id(commit_attachment.job_id());
+        new_job_info.set_scanned_rows(commit_attachment.scanned_rows());
+        new_job_info.set_load_bytes(commit_attachment.load_bytes());
+        new_job_info.set_num_files(commit_attachment.num_files());
+        new_job_info.set_file_bytes(commit_attachment.file_bytes());
     }
     if (commit_attachment.has_offset()) {
-        new_meta_info.set_offset(commit_attachment.offset());
+        new_job_info.set_offset(commit_attachment.offset());
     }
-    std::string new_meta_val;
-    if (!new_meta_info.SerializeToString(&new_meta_val)) {
+    std::string new_job_val;
+    if (!new_job_info.SerializeToString(&new_job_val)) {
         code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
-        ss << "failed to serialize new streaming meta val, txn_id=" << txn_id;
+        ss << "failed to serialize new streaming job val, txn_id=" << txn_id;
         msg = ss.str();
         return;
     }
 
-    txn->put(streaming_meta_key, new_meta_val);
-    LOG(INFO) << "put streaming_meta_key key=" << hex(streaming_meta_key)
-              << " streaming job new meta: " << 
new_meta_info.ShortDebugString();
+    txn->put(streaming_job_key_str, new_job_val);
+    LOG(INFO) << "put streaming_job_key key=" << hex(streaming_job_key_str)
+              << " streaming job new meta: " << 
new_job_info.ShortDebugString();
 }
 
 void 
MetaServiceImpl::get_rl_task_commit_attach(::google::protobuf::RpcController* 
controller,
@@ -780,11 +775,9 @@ void MetaServiceImpl::get_streaming_task_commit_attach(
 
     int64_t db_id = request->db_id();
     int64_t job_id = request->job_id();
-    std::string streaming_meta_key;
-    std::string streaming_meta_val;
-    StreamingJobMetaKeyInfo streaming_meta_key_info {instance_id, db_id, 
job_id};
-    streaming_job_meta_key_info(streaming_meta_key_info, &streaming_meta_key);
-    err = txn->get(streaming_meta_key, &streaming_meta_val);
+    std::string streaming_job_val;
+    std::string streaming_job_key_str = streaming_job_key({instance_id, db_id, 
job_id});
+    err = txn->get(streaming_job_key_str, &streaming_job_val);
     if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
         code = MetaServiceCode::STREAMING_JOB_PROGRESS_NOT_FOUND;
         ss << "progress info not found, db_id=" << db_id << " job_id=" << 
job_id << " err=" << err;
@@ -799,7 +792,7 @@ void MetaServiceImpl::get_streaming_task_commit_attach(
     }
 
     StreamingTaskCommitAttachmentPB* commit_attach = 
response->mutable_commit_attach();
-    if (!commit_attach->ParseFromString(streaming_meta_val)) {
+    if (!commit_attach->ParseFromString(streaming_job_val)) {
         code = MetaServiceCode::PROTOBUF_PARSE_ERR;
         ss << "failed to parse meta info, db_id=" << db_id << " job_id=" << 
job_id;
         msg = ss.str();
diff --git a/cloud/src/meta-store/keys.cpp b/cloud/src/meta-store/keys.cpp
index 8a6fed10f28..91b9266c815 100644
--- a/cloud/src/meta-store/keys.cpp
+++ b/cloud/src/meta-store/keys.cpp
@@ -65,7 +65,7 @@ static const char* STATS_KEY_INFIX_TABLET               = 
"tablet";
 
 static const char* JOB_KEY_INFIX_TABLET                 = "tablet";
 static const char* JOB_KEY_INFIX_RL_PROGRESS            = 
"routine_load_progress";
-static const char* JOB_KEY_INFIX_STREAMING_JOB_META     = "streaming_job_meta";
+static const char* JOB_KEY_INFIX_STREAMING_JOB          = "streaming_job";
 static const char* JOB_KEY_INFIX_RESTORE_TABLET         = "restore_tablet";
 static const char* JOB_KEY_INFIX_RESTORE_ROWSET         = "restore_rowset";
 
@@ -146,7 +146,7 @@ static void encode_prefix(const T& t, std::string* key) {
         MetaDeleteBitmapInfo, MetaDeleteBitmapUpdateLockInfo, 
MetaPendingDeleteBitmapInfo, PartitionVersionKeyInfo,
         RecycleIndexKeyInfo, RecyclePartKeyInfo, RecycleRowsetKeyInfo, 
RecycleTxnKeyInfo, RecycleStageKeyInfo,
         StatsTabletKeyInfo, TableVersionKeyInfo, JobRestoreTabletKeyInfo, 
JobRestoreRowsetKeyInfo,
-        JobTabletKeyInfo, JobRecycleKeyInfo, RLJobProgressKeyInfo, 
StreamingJobMetaKeyInfo,
+        JobTabletKeyInfo, JobRecycleKeyInfo, RLJobProgressKeyInfo, 
StreamingJobKeyInfo,
         CopyJobKeyInfo, CopyFileKeyInfo,  StorageVaultKeyInfo, 
MetaSchemaPBDictionaryInfo,
         MowTabletJobInfo>);
 
@@ -184,7 +184,7 @@ static void encode_prefix(const T& t, std::string* key) {
     } else if constexpr (std::is_same_v<T, JobTabletKeyInfo>
                       || std::is_same_v<T, JobRecycleKeyInfo>
                       || std::is_same_v<T, RLJobProgressKeyInfo>
-                      || std::is_same_v<T, StreamingJobMetaKeyInfo>) {
+                      || std::is_same_v<T, StreamingJobKeyInfo>) {
         encode_bytes(JOB_KEY_PREFIX, key);
     } else if constexpr (std::is_same_v<T, CopyJobKeyInfo>
                       || std::is_same_v<T, CopyFileKeyInfo>) {
@@ -466,11 +466,11 @@ void rl_job_progress_key_info(const RLJobProgressKeyInfo& 
in, std::string* out)
     encode_int64(std::get<2>(in), out);           // job_id
 }
 
-void streaming_job_meta_key_info(const StreamingJobMetaKeyInfo& in, 
std::string* out) {
-    encode_prefix(in, out);                              // 0x01 "job" 
${instance_id}
-    encode_bytes(JOB_KEY_INFIX_STREAMING_JOB_META, out); // 
"streaming_job_meta"
-    encode_int64(std::get<1>(in), out);                  // db_id
-    encode_int64(std::get<2>(in), out);                  // job_id
+void streaming_job_key(const StreamingJobKeyInfo& in, std::string* out) {
+    encode_prefix(in, out);                         // 0x01 "job" 
${instance_id}
+    encode_bytes(JOB_KEY_INFIX_STREAMING_JOB, out); // "streaming_job"
+    encode_int64(std::get<1>(in), out);             // db_id
+    encode_int64(std::get<2>(in), out);             // job_id
 }
 
 void job_restore_tablet_key(const JobRestoreTabletKeyInfo& in, std::string* 
out) {
diff --git a/cloud/src/meta-store/keys.h b/cloud/src/meta-store/keys.h
index 0a7cdccdb6d..020702982d7 100644
--- a/cloud/src/meta-store/keys.h
+++ b/cloud/src/meta-store/keys.h
@@ -66,6 +66,8 @@
 // 0x01 "job" ${instance_id} "tablet" ${table_id} ${index_id} ${partition_id} 
${tablet_id} -> TabletJobInfoPB
 // 0x01 "job" ${instance_id} "recycle"                                         
            -> JobRecyclePB
 // 0x01 "job" ${instance_id} "check"                                           
            -> JobRecyclePB
+// 0x01 "job" ${instance_id} "streaming_job" ${db_id} ${job_id}                
            -> StreamingJobPB
+
 //
 // 0x01 "copy" ${instance_id} "job" ${stage_id} ${table_id} ${copy_id} 
${group_id}         -> CopyJobPB
 // 0x01 "copy" ${instance_id} "loading_file" ${stage_id} ${table_id} 
${obj_name} ${etag}   -> CopyFilePB
@@ -219,7 +221,7 @@ using MetaPendingDeleteBitmapInfo = BasicKeyInfo<24 , 
std::tuple<std::string, in
 using RLJobProgressKeyInfo = BasicKeyInfo<25, std::tuple<std::string, int64_t, 
int64_t>>;
 
 //                                                      0:instance_id 1:db_id  
2:job_id
-using StreamingJobMetaKeyInfo = BasicKeyInfo<52, std::tuple<std::string, 
int64_t, int64_t>>;
+using StreamingJobKeyInfo = BasicKeyInfo<52, std::tuple<std::string, int64_t, 
int64_t>>;
 
 //                                                      0:instance_id 
1:vault_id
 using StorageVaultKeyInfo = BasicKeyInfo<26, std::tuple<std::string, 
std::string>>;
@@ -410,8 +412,8 @@ void job_tablet_key(const JobTabletKeyInfo& in, 
std::string* out);
 static inline std::string job_tablet_key(const JobTabletKeyInfo& in) { 
std::string s; job_tablet_key(in, &s); return s; }
 void rl_job_progress_key_info(const RLJobProgressKeyInfo& in, std::string* 
out);
 static inline std::string rl_job_progress_key_info(const RLJobProgressKeyInfo& 
in) { std::string s; rl_job_progress_key_info(in, &s); return s; }
-void streaming_job_meta_key_info(const StreamingJobMetaKeyInfo& in, 
std::string* out);
-static inline std::string streaming_job_meta_key_info(const 
StreamingJobMetaKeyInfo& in) { std::string s; streaming_job_meta_key_info(in, 
&s); return s; }
+void streaming_job_key(const StreamingJobKeyInfo& in, std::string* out);
+static inline std::string streaming_job_key(const StreamingJobKeyInfo& in) { 
std::string s; streaming_job_key(in, &s); return s; }
 
 std::string copy_key_prefix(std::string_view instance_id);
 void copy_job_key(const CopyJobKeyInfo& in, std::string* out);
diff --git a/cloud/test/keys_test.cpp b/cloud/test/keys_test.cpp
index 94d331fcf27..fee168843c8 100644
--- a/cloud/test/keys_test.cpp
+++ b/cloud/test/keys_test.cpp
@@ -79,6 +79,8 @@ int decode_bytes(std::string_view* in, std::string* out);
 // 0x01 "trash" ${instacne_id} "table" -> TableTrashPB
 // 
 // 0x01 "node_status" ${instance_id} "compute" ${backend_id} -> 
ComputeNodeStatusPB
+//
+// 0x01 "job" ${instance_id} "streaming_job" ${db_id} ${job_id} -> 
StreamingJobPB
 // clang-format on
 
 TEST(KeysTest, InstanceKeyTest) {
@@ -1963,6 +1965,56 @@ TEST(KeysTest, RestoreJobKeysTest) {
     }
 }
 
+TEST(KeysTest, StreamingJobKeysTest) {
+    using namespace doris::cloud;
+    std::string instance_id = "instance_id_deadbeef";
+
+    // 0x01 "job" ${instance_id} "streaming_job" ${db_id} ${job_id} -> 
StreamingJobPB
+    {
+        int64_t db_id = 123;
+        int64_t job_id = 456;
+        StreamingJobKeyInfo streaming_key {instance_id, db_id, job_id};
+        std::string encoded_streaming_key0;
+        streaming_job_key(streaming_key, &encoded_streaming_key0);
+        std::cout << hex(encoded_streaming_key0) << std::endl;
+
+        std::string dec_instance_id;
+        int64_t dec_db_id = 0;
+        int64_t dec_job_id = 0;
+
+        std::string_view key_sv(encoded_streaming_key0);
+        std::string dec_job_prefix;
+        std::string dec_streaming_infix;
+        remove_user_space_prefix(&key_sv);
+        ASSERT_EQ(decode_bytes(&key_sv, &dec_job_prefix), 0);
+        ASSERT_EQ(decode_bytes(&key_sv, &dec_instance_id), 0);
+        ASSERT_EQ(decode_bytes(&key_sv, &dec_streaming_infix), 0);
+        ASSERT_EQ(decode_int64(&key_sv, &dec_db_id), 0);
+        ASSERT_EQ(decode_int64(&key_sv, &dec_job_id), 0);
+        ASSERT_TRUE(key_sv.empty());
+
+        EXPECT_EQ("job", dec_job_prefix);
+        EXPECT_EQ("streaming_job", dec_streaming_infix);
+        EXPECT_EQ(instance_id, dec_instance_id);
+        EXPECT_EQ(db_id, dec_db_id);
+        EXPECT_EQ(job_id, dec_job_id);
+
+        std::get<2>(streaming_key) = job_id + 1;
+        std::string encoded_streaming_key1;
+        streaming_job_key(streaming_key, &encoded_streaming_key1);
+        std::cout << hex(encoded_streaming_key1) << std::endl;
+
+        ASSERT_GT(encoded_streaming_key1, encoded_streaming_key0);
+
+        std::get<1>(streaming_key) = db_id + 1;
+        std::string encoded_streaming_key2;
+        streaming_job_key(streaming_key, &encoded_streaming_key2);
+        std::cout << hex(encoded_streaming_key2) << std::endl;
+
+        ASSERT_GT(encoded_streaming_key2, encoded_streaming_key0);
+    }
+}
+
 TEST(KeysTest, VersionedKeyPrefixTest) {
     using namespace doris::cloud;
     using namespace doris::cloud::versioned;
diff --git a/cloud/test/meta_service_job_test.cpp 
b/cloud/test/meta_service_job_test.cpp
index c30be90e1da..9fa021e138f 100644
--- a/cloud/test/meta_service_job_test.cpp
+++ b/cloud/test/meta_service_job_test.cpp
@@ -4990,4 +4990,179 @@ TEST(MetaServiceJobTest, IdempotentCompactionJob) {
     }
 }
 
+TEST(MetaServiceJobTest, UpdateStreamingJobMetaTest) {
+    auto meta_service = get_meta_service();
+    ASSERT_NE(meta_service, nullptr);
+
+    // Test case 1: First time update (job doesn't exist)
+    {
+        CommitTxnRequest request;
+        request.set_cloud_unique_id("test_cloud_unique_id");
+        request.set_txn_id(12345);
+
+        TxnCommitAttachmentPB* attachment = 
request.mutable_commit_attachment();
+        
attachment->set_type(TxnCommitAttachmentPB::STREAMING_TASK_TXN_COMMIT_ATTACHMENT);
+
+        StreamingTaskCommitAttachmentPB* streaming_attach =
+                attachment->mutable_streaming_task_txn_commit_attachment();
+        streaming_attach->set_job_id(1001);
+        streaming_attach->set_task_id(2001);
+        streaming_attach->set_offset("test_offset_1");
+        streaming_attach->set_scanned_rows(1000);
+        streaming_attach->set_load_bytes(5000);
+        streaming_attach->set_num_files(10);
+        streaming_attach->set_file_bytes(8000);
+
+        CommitTxnResponse response;
+        brpc::Controller cntl;
+        meta_service->commit_txn(&cntl, &request, &response, nullptr);
+
+        EXPECT_FALSE(cntl.Failed()) << "Error: " << cntl.ErrorText();
+        EXPECT_EQ(response.status().code(), MetaServiceCode::OK);
+    }
+
+    // Test case 2: Update existing job (accumulate values)
+    {
+        CommitTxnRequest request;
+        request.set_cloud_unique_id("test_cloud_unique_id");
+        request.set_txn_id(12346);
+
+        TxnCommitAttachmentPB* attachment = 
request.mutable_commit_attachment();
+        
attachment->set_type(TxnCommitAttachmentPB::STREAMING_TASK_TXN_COMMIT_ATTACHMENT);
+
+        StreamingTaskCommitAttachmentPB* streaming_attach =
+                attachment->mutable_streaming_task_txn_commit_attachment();
+        streaming_attach->set_job_id(1001); // Same job_id as before
+        streaming_attach->set_task_id(2002);
+        streaming_attach->set_offset("test_offset_2");
+        streaming_attach->set_scanned_rows(500);
+        streaming_attach->set_load_bytes(2000);
+        streaming_attach->set_num_files(5);
+        streaming_attach->set_file_bytes(3000);
+
+        CommitTxnResponse response;
+        brpc::Controller cntl;
+        meta_service->commit_txn(&cntl, &request, &response, nullptr);
+
+        EXPECT_FALSE(cntl.Failed()) << "Error: " << cntl.ErrorText();
+        EXPECT_EQ(response.status().code(), MetaServiceCode::OK);
+    }
+
+    // Test case 3: Missing commit attachment
+    {
+        CommitTxnRequest request;
+        request.set_cloud_unique_id("test_cloud_unique_id");
+        request.set_txn_id(12347);
+        // No commit attachment set
+
+        CommitTxnResponse response;
+        brpc::Controller cntl;
+        meta_service->commit_txn(&cntl, &request, &response, nullptr);
+
+        EXPECT_FALSE(cntl.Failed()) << "Error: " << cntl.ErrorText();
+        EXPECT_EQ(response.status().code(), MetaServiceCode::INVALID_ARGUMENT);
+        EXPECT_TRUE(response.status().msg().find("missing commit attachment") 
!= std::string::npos);
+    }
+}
+
+TEST(MetaServiceJobTest, GetStreamingTaskCommitAttachTest) {
+    auto meta_service = get_meta_service();
+    ASSERT_NE(meta_service, nullptr);
+
+    // First, create a streaming job by committing a txn
+    {
+        CommitTxnRequest request;
+        request.set_cloud_unique_id("test_cloud_unique_id");
+        request.set_txn_id(12348);
+
+        TxnCommitAttachmentPB* attachment = 
request.mutable_commit_attachment();
+        
attachment->set_type(TxnCommitAttachmentPB::STREAMING_TASK_TXN_COMMIT_ATTACHMENT);
+
+        StreamingTaskCommitAttachmentPB* streaming_attach =
+                attachment->mutable_streaming_task_txn_commit_attachment();
+        streaming_attach->set_job_id(1002);
+        streaming_attach->set_task_id(3001);
+        streaming_attach->set_offset("test_offset_3");
+        streaming_attach->set_scanned_rows(2000);
+        streaming_attach->set_load_bytes(10000);
+        streaming_attach->set_num_files(20);
+        streaming_attach->set_file_bytes(15000);
+
+        CommitTxnResponse response;
+        brpc::Controller cntl;
+        meta_service->commit_txn(&cntl, &request, &response, nullptr);
+
+        EXPECT_FALSE(cntl.Failed()) << "Error: " << cntl.ErrorText();
+        EXPECT_EQ(response.status().code(), MetaServiceCode::OK);
+    }
+
+    // Test case 1: Get existing streaming job
+    {
+        GetStreamingTaskCommitAttachRequest request;
+        request.set_cloud_unique_id("test_cloud_unique_id");
+        request.set_db_id(1000); // Assuming db_id from the commit
+        request.set_job_id(1002);
+
+        GetStreamingTaskCommitAttachResponse response;
+        brpc::Controller cntl;
+        meta_service->get_streaming_task_commit_attach(&cntl, &request, 
&response, nullptr);
+
+        EXPECT_FALSE(cntl.Failed()) << "Error: " << cntl.ErrorText();
+        EXPECT_EQ(response.status().code(), MetaServiceCode::OK);
+        EXPECT_TRUE(response.has_commit_attach());
+        EXPECT_EQ(response.commit_attach().job_id(), 1002);
+        EXPECT_EQ(response.commit_attach().scanned_rows(), 2000);
+        EXPECT_EQ(response.commit_attach().load_bytes(), 10000);
+        EXPECT_EQ(response.commit_attach().num_files(), 20);
+        EXPECT_EQ(response.commit_attach().file_bytes(), 15000);
+    }
+
+    // Test case 2: Get non-existent streaming job
+    {
+        GetStreamingTaskCommitAttachRequest request;
+        request.set_cloud_unique_id("test_cloud_unique_id");
+        request.set_db_id(1000);
+        request.set_job_id(9999); // Non-existent job_id
+
+        GetStreamingTaskCommitAttachResponse response;
+        brpc::Controller cntl;
+        meta_service->get_streaming_task_commit_attach(&cntl, &request, 
&response, nullptr);
+
+        EXPECT_FALSE(cntl.Failed()) << "Error: " << cntl.ErrorText();
+        EXPECT_EQ(response.status().code(), 
MetaServiceCode::STREAMING_JOB_PROGRESS_NOT_FOUND);
+        EXPECT_TRUE(response.status().msg().find("progress info not found") != 
std::string::npos);
+    }
+
+    // Test case 3: Missing required fields
+    {
+        GetStreamingTaskCommitAttachRequest request;
+        request.set_cloud_unique_id("test_cloud_unique_id");
+        // Missing db_id and job_id
+
+        GetStreamingTaskCommitAttachResponse response;
+        brpc::Controller cntl;
+        meta_service->get_streaming_task_commit_attach(&cntl, &request, 
&response, nullptr);
+
+        EXPECT_FALSE(cntl.Failed()) << "Error: " << cntl.ErrorText();
+        EXPECT_EQ(response.status().code(), MetaServiceCode::INVALID_ARGUMENT);
+        EXPECT_TRUE(response.status().msg().find("empty db_id or job_id") != 
std::string::npos);
+    }
+
+    // Test case 4: Invalid cloud_unique_id
+    {
+        GetStreamingTaskCommitAttachRequest request;
+        request.set_cloud_unique_id("invalid_cloud_unique_id");
+        request.set_db_id(1000);
+        request.set_job_id(1002);
+
+        GetStreamingTaskCommitAttachResponse response;
+        brpc::Controller cntl;
+        meta_service->get_streaming_task_commit_attach(&cntl, &request, 
&response, nullptr);
+
+        EXPECT_FALSE(cntl.Failed()) << "Error: " << cntl.ErrorText();
+        EXPECT_EQ(response.status().code(), MetaServiceCode::INVALID_ARGUMENT);
+        EXPECT_TRUE(response.status().msg().find("empty instance_id") != 
std::string::npos);
+    }
+}
+
 } // namespace doris::cloud
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java
index 424ce238544..06cd25d1357 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java
@@ -282,11 +282,10 @@ public class TxnUtil {
                 StreamingTaskCommitAttachmentPB.newBuilder();
 
         builder.setJobId(streamingTaskTxnCommitAttachment.getJobId())
-                .setTaskId(streamingTaskTxnCommitAttachment.getTaskId())
                 
.setScannedRows(streamingTaskTxnCommitAttachment.getScannedRows())
                 .setLoadBytes(streamingTaskTxnCommitAttachment.getLoadBytes())
-                
.setFileNumber(streamingTaskTxnCommitAttachment.getFileNumber())
-                .setFileSize(streamingTaskTxnCommitAttachment.getFileSize());
+                .setNumFiles(streamingTaskTxnCommitAttachment.getNumFiles())
+                .setFileBytes(streamingTaskTxnCommitAttachment.getFileBytes());
 
         if (streamingTaskTxnCommitAttachment.getOffset() != null) {
             builder.setOffset(streamingTaskTxnCommitAttachment.getOffset());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 479698b6f44..e8fd03c8de4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -305,8 +305,8 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         }
         this.jobStatistic.setScannedRows(this.jobStatistic.getScannedRows() + 
attachment.getScannedRows());
         this.jobStatistic.setLoadBytes(this.jobStatistic.getLoadBytes() + 
attachment.getLoadBytes());
-        this.jobStatistic.setFileNumber(this.jobStatistic.getFileNumber() + 
attachment.getFileNumber());
-        this.jobStatistic.setFileSize(this.jobStatistic.getFileSize() + 
attachment.getFileSize());
+        this.jobStatistic.setFileNumber(this.jobStatistic.getFileNumber() + 
attachment.getNumFiles());
+        this.jobStatistic.setFileSize(this.jobStatistic.getFileSize() + 
attachment.getFileBytes());
         
offsetProvider.updateOffset(offsetProvider.deserializeOffset(attachment.getOffset()));
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java
index 4b7590824b4..b3f6a2c0b1b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java
@@ -27,14 +27,14 @@ import lombok.Getter;
 public class StreamingTaskTxnCommitAttachment extends TxnCommitAttachment {
 
     public StreamingTaskTxnCommitAttachment(long jobId, long taskId,
-                long scannedRows, long loadBytes, long fileNumber, long 
fileSize, String offset) {
+                long scannedRows, long loadBytes, long numFiles, long 
fileBytes, String offset) {
         super(TransactionState.LoadJobSourceType.STREAMING_JOB);
         this.jobId = jobId;
         this.taskId = taskId;
         this.scannedRows = scannedRows;
         this.loadBytes = loadBytes;
-        this.fileNumber = fileNumber;
-        this.fileSize = fileSize;
+        this.numFiles = numFiles;
+        this.fileBytes = fileBytes;
         this.offset = offset;
     }
 
@@ -42,8 +42,8 @@ public class StreamingTaskTxnCommitAttachment extends 
TxnCommitAttachment {
         super(TransactionState.LoadJobSourceType.STREAMING_JOB);
         this.scannedRows = pb.getScannedRows();
         this.loadBytes = pb.getLoadBytes();
-        this.fileNumber = pb.getFileNumber();
-        this.fileSize = pb.getFileSize();
+        this.numFiles = pb.getNumFiles();
+        this.fileBytes = pb.getFileBytes();
         this.offset = pb.getOffset();
     }
 
@@ -59,10 +59,10 @@ public class StreamingTaskTxnCommitAttachment extends 
TxnCommitAttachment {
     private long loadBytes;
     @SerializedName(value = "fn")
     @Getter
-    private long fileNumber;
+    private long numFiles;
     @SerializedName(value = "fs")
     @Getter
-    private long fileSize;
+    private long fileBytes;
     @SerializedName(value = "of")
     @Getter
     private String offset;
@@ -72,8 +72,8 @@ public class StreamingTaskTxnCommitAttachment extends 
TxnCommitAttachment {
         return "StreamingTaskTxnCommitAttachment: ["
                 + "scannedRows=" + scannedRows
                 + ", loadBytes=" + loadBytes
-                + ", fileNumber=" + fileNumber
-                + ", fileSize=" + fileSize
+                + ", numFiles=" + numFiles
+                + ", fileBytes=" + fileBytes
                 + ", offset=" + offset
                 + "]";
     }
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 6259d6d0da3..3428cff6cab 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -368,12 +368,11 @@ message RoutineLoadJobStatisticPB {
 
 message StreamingTaskCommitAttachmentPB {
     optional int64 job_id = 1;
-    optional int64 task_id = 2;
-    optional string offset = 3;
-    optional int64 scanned_rows = 4;
-    optional int64 load_bytes = 5;
-    optional int64 file_number = 6;
-    optional int64 file_size = 7;
+    optional string offset = 2;
+    optional int64 scanned_rows = 3;
+    optional int64 load_bytes = 4;
+    optional int64 num_files = 5;
+    optional int64 file_bytes = 6;
 }
 
 message TxnCommitAttachmentPB {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to