This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch streaming-job-dev
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/streaming-job-dev by this push:
new 4f5f56a1976 [fix](streaming job) refactor some logic and add ut
(#56292)
4f5f56a1976 is described below
commit 4f5f56a19769726af3d642ce34caba83c468b482
Author: hui lai <[email protected]>
AuthorDate: Mon Sep 22 14:49:48 2025 +0800
[fix](streaming job) refactor some logic and add ut (#56292)
### What problem does this PR solve?
Refactor some logic and add ut.
---
cloud/src/meta-service/meta_service_txn.cpp | 79 +++++-----
cloud/src/meta-store/keys.cpp | 16 +-
cloud/src/meta-store/keys.h | 8 +-
cloud/test/keys_test.cpp | 52 ++++++
cloud/test/meta_service_job_test.cpp | 175 +++++++++++++++++++++
.../apache/doris/cloud/transaction/TxnUtil.java | 5 +-
.../insert/streaming/StreamingInsertJob.java | 4 +-
.../StreamingTaskTxnCommitAttachment.java | 18 +--
gensrc/proto/cloud.proto | 11 +-
9 files changed, 294 insertions(+), 74 deletions(-)
diff --git a/cloud/src/meta-service/meta_service_txn.cpp
b/cloud/src/meta-service/meta_service_txn.cpp
index 22b2a449231..e327e2bf473 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -626,59 +626,54 @@ void update_streaming_job_meta(MetaServiceCode& code,
std::string& msg,
txn_commit_attachment.streaming_task_txn_commit_attachment();
int64_t job_id = commit_attachment.job_id();
- std::string streaming_meta_key;
- std::string streaming_meta_val;
- bool prev_meta_existed = true;
- StreamingJobMetaKeyInfo streaming_meta_key_info {instance_id, db_id,
job_id};
- streaming_job_meta_key_info(streaming_meta_key_info, &streaming_meta_key);
- TxnErrorCode err = txn->get(streaming_meta_key, &streaming_meta_val);
- if (err != TxnErrorCode::TXN_OK) {
- if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
- prev_meta_existed = false;
- } else {
- code = cast_as<ErrCategory::READ>(err);
- ss << "failed to get streaming job meta, db_id=" << db_id << "
txn_id=" << txn_id
- << " err=" << err;
- msg = ss.str();
- return;
- }
+ std::string streaming_job_val;
+ bool prev_existed = true;
+ std::string streaming_job_key_str = streaming_job_key({instance_id, db_id,
job_id});
+ TxnErrorCode err = txn->get(streaming_job_key_str, &streaming_job_val);
+ if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+ prev_existed = false;
+ } else if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::READ>(err);
+ ss << "failed to get streaming job, db_id=" << db_id << " txn_id=" <<
txn_id
+ << " err=" << err;
+ msg = ss.str();
+ return;
}
- StreamingTaskCommitAttachmentPB new_meta_info;
- if (prev_meta_existed) {
- if (!new_meta_info.ParseFromString(streaming_meta_val)) {
+ StreamingTaskCommitAttachmentPB new_job_info;
+ if (prev_existed) {
+ if (!new_job_info.ParseFromString(streaming_job_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "failed to parse streaming job meta, db_id=" << db_id << "
txn_id=" << txn_id;
msg = ss.str();
return;
}
- new_meta_info.set_scanned_rows(new_meta_info.scanned_rows() +
- commit_attachment.scanned_rows());
- new_meta_info.set_load_bytes(new_meta_info.load_bytes() +
commit_attachment.load_bytes());
- new_meta_info.set_file_number(new_meta_info.file_number() +
- commit_attachment.file_number());
- new_meta_info.set_file_size(new_meta_info.file_size() +
commit_attachment.file_size());
+ new_job_info.set_scanned_rows(new_job_info.scanned_rows() +
+ commit_attachment.scanned_rows());
+ new_job_info.set_load_bytes(new_job_info.load_bytes() +
commit_attachment.load_bytes());
+ new_job_info.set_num_files(new_job_info.num_files() +
commit_attachment.num_files());
+ new_job_info.set_file_bytes(new_job_info.file_bytes() +
commit_attachment.file_bytes());
} else {
- new_meta_info.set_job_id(commit_attachment.job_id());
- new_meta_info.set_scanned_rows(commit_attachment.scanned_rows());
- new_meta_info.set_load_bytes(commit_attachment.load_bytes());
- new_meta_info.set_file_number(commit_attachment.file_number());
- new_meta_info.set_file_size(commit_attachment.file_size());
+ new_job_info.set_job_id(commit_attachment.job_id());
+ new_job_info.set_scanned_rows(commit_attachment.scanned_rows());
+ new_job_info.set_load_bytes(commit_attachment.load_bytes());
+ new_job_info.set_num_files(commit_attachment.num_files());
+ new_job_info.set_file_bytes(commit_attachment.file_bytes());
}
if (commit_attachment.has_offset()) {
- new_meta_info.set_offset(commit_attachment.offset());
+ new_job_info.set_offset(commit_attachment.offset());
}
- std::string new_meta_val;
- if (!new_meta_info.SerializeToString(&new_meta_val)) {
+ std::string new_job_val;
+ if (!new_job_info.SerializeToString(&new_job_val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
- ss << "failed to serialize new streaming meta val, txn_id=" << txn_id;
+ ss << "failed to serialize new streaming job val, txn_id=" << txn_id;
msg = ss.str();
return;
}
- txn->put(streaming_meta_key, new_meta_val);
- LOG(INFO) << "put streaming_meta_key key=" << hex(streaming_meta_key)
- << " streaming job new meta: " <<
new_meta_info.ShortDebugString();
+ txn->put(streaming_job_key_str, new_job_val);
+ LOG(INFO) << "put streaming_job_key key=" << hex(streaming_job_key_str)
+ << " streaming job new meta: " <<
new_job_info.ShortDebugString();
}
void
MetaServiceImpl::get_rl_task_commit_attach(::google::protobuf::RpcController*
controller,
@@ -780,11 +775,9 @@ void MetaServiceImpl::get_streaming_task_commit_attach(
int64_t db_id = request->db_id();
int64_t job_id = request->job_id();
- std::string streaming_meta_key;
- std::string streaming_meta_val;
- StreamingJobMetaKeyInfo streaming_meta_key_info {instance_id, db_id,
job_id};
- streaming_job_meta_key_info(streaming_meta_key_info, &streaming_meta_key);
- err = txn->get(streaming_meta_key, &streaming_meta_val);
+ std::string streaming_job_val;
+ std::string streaming_job_key_str = streaming_job_key({instance_id, db_id,
job_id});
+ err = txn->get(streaming_job_key_str, &streaming_job_val);
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
code = MetaServiceCode::STREAMING_JOB_PROGRESS_NOT_FOUND;
ss << "progress info not found, db_id=" << db_id << " job_id=" <<
job_id << " err=" << err;
@@ -799,7 +792,7 @@ void MetaServiceImpl::get_streaming_task_commit_attach(
}
StreamingTaskCommitAttachmentPB* commit_attach =
response->mutable_commit_attach();
- if (!commit_attach->ParseFromString(streaming_meta_val)) {
+ if (!commit_attach->ParseFromString(streaming_job_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "failed to parse meta info, db_id=" << db_id << " job_id=" <<
job_id;
msg = ss.str();
diff --git a/cloud/src/meta-store/keys.cpp b/cloud/src/meta-store/keys.cpp
index 8a6fed10f28..91b9266c815 100644
--- a/cloud/src/meta-store/keys.cpp
+++ b/cloud/src/meta-store/keys.cpp
@@ -65,7 +65,7 @@ static const char* STATS_KEY_INFIX_TABLET =
"tablet";
static const char* JOB_KEY_INFIX_TABLET = "tablet";
static const char* JOB_KEY_INFIX_RL_PROGRESS =
"routine_load_progress";
-static const char* JOB_KEY_INFIX_STREAMING_JOB_META = "streaming_job_meta";
+static const char* JOB_KEY_INFIX_STREAMING_JOB = "streaming_job";
static const char* JOB_KEY_INFIX_RESTORE_TABLET = "restore_tablet";
static const char* JOB_KEY_INFIX_RESTORE_ROWSET = "restore_rowset";
@@ -146,7 +146,7 @@ static void encode_prefix(const T& t, std::string* key) {
MetaDeleteBitmapInfo, MetaDeleteBitmapUpdateLockInfo,
MetaPendingDeleteBitmapInfo, PartitionVersionKeyInfo,
RecycleIndexKeyInfo, RecyclePartKeyInfo, RecycleRowsetKeyInfo,
RecycleTxnKeyInfo, RecycleStageKeyInfo,
StatsTabletKeyInfo, TableVersionKeyInfo, JobRestoreTabletKeyInfo,
JobRestoreRowsetKeyInfo,
- JobTabletKeyInfo, JobRecycleKeyInfo, RLJobProgressKeyInfo,
StreamingJobMetaKeyInfo,
+ JobTabletKeyInfo, JobRecycleKeyInfo, RLJobProgressKeyInfo,
StreamingJobKeyInfo,
CopyJobKeyInfo, CopyFileKeyInfo, StorageVaultKeyInfo,
MetaSchemaPBDictionaryInfo,
MowTabletJobInfo>);
@@ -184,7 +184,7 @@ static void encode_prefix(const T& t, std::string* key) {
} else if constexpr (std::is_same_v<T, JobTabletKeyInfo>
|| std::is_same_v<T, JobRecycleKeyInfo>
|| std::is_same_v<T, RLJobProgressKeyInfo>
- || std::is_same_v<T, StreamingJobMetaKeyInfo>) {
+ || std::is_same_v<T, StreamingJobKeyInfo>) {
encode_bytes(JOB_KEY_PREFIX, key);
} else if constexpr (std::is_same_v<T, CopyJobKeyInfo>
|| std::is_same_v<T, CopyFileKeyInfo>) {
@@ -466,11 +466,11 @@ void rl_job_progress_key_info(const RLJobProgressKeyInfo&
in, std::string* out)
encode_int64(std::get<2>(in), out); // job_id
}
-void streaming_job_meta_key_info(const StreamingJobMetaKeyInfo& in,
std::string* out) {
- encode_prefix(in, out); // 0x01 "job"
${instance_id}
- encode_bytes(JOB_KEY_INFIX_STREAMING_JOB_META, out); //
"streaming_job_meta"
- encode_int64(std::get<1>(in), out); // db_id
- encode_int64(std::get<2>(in), out); // job_id
+void streaming_job_key(const StreamingJobKeyInfo& in, std::string* out) {
+ encode_prefix(in, out); // 0x01 "job"
${instance_id}
+ encode_bytes(JOB_KEY_INFIX_STREAMING_JOB, out); // "streaming_job"
+ encode_int64(std::get<1>(in), out); // db_id
+ encode_int64(std::get<2>(in), out); // job_id
}
void job_restore_tablet_key(const JobRestoreTabletKeyInfo& in, std::string*
out) {
diff --git a/cloud/src/meta-store/keys.h b/cloud/src/meta-store/keys.h
index 0a7cdccdb6d..020702982d7 100644
--- a/cloud/src/meta-store/keys.h
+++ b/cloud/src/meta-store/keys.h
@@ -66,6 +66,8 @@
// 0x01 "job" ${instance_id} "tablet" ${table_id} ${index_id} ${partition_id}
${tablet_id} -> TabletJobInfoPB
// 0x01 "job" ${instance_id} "recycle"
-> JobRecyclePB
// 0x01 "job" ${instance_id} "check"
-> JobRecyclePB
+// 0x01 "job" ${instance_id} "streaming_job" ${db_id} ${job_id}
-> StreamingJobPB
+
//
// 0x01 "copy" ${instance_id} "job" ${stage_id} ${table_id} ${copy_id}
${group_id} -> CopyJobPB
// 0x01 "copy" ${instance_id} "loading_file" ${stage_id} ${table_id}
${obj_name} ${etag} -> CopyFilePB
@@ -219,7 +221,7 @@ using MetaPendingDeleteBitmapInfo = BasicKeyInfo<24 ,
std::tuple<std::string, in
using RLJobProgressKeyInfo = BasicKeyInfo<25, std::tuple<std::string, int64_t,
int64_t>>;
// 0:instance_id 1:db_id
2:job_id
-using StreamingJobMetaKeyInfo = BasicKeyInfo<52, std::tuple<std::string,
int64_t, int64_t>>;
+using StreamingJobKeyInfo = BasicKeyInfo<52, std::tuple<std::string, int64_t,
int64_t>>;
// 0:instance_id
1:vault_id
using StorageVaultKeyInfo = BasicKeyInfo<26, std::tuple<std::string,
std::string>>;
@@ -410,8 +412,8 @@ void job_tablet_key(const JobTabletKeyInfo& in,
std::string* out);
static inline std::string job_tablet_key(const JobTabletKeyInfo& in) {
std::string s; job_tablet_key(in, &s); return s; }
void rl_job_progress_key_info(const RLJobProgressKeyInfo& in, std::string*
out);
static inline std::string rl_job_progress_key_info(const RLJobProgressKeyInfo&
in) { std::string s; rl_job_progress_key_info(in, &s); return s; }
-void streaming_job_meta_key_info(const StreamingJobMetaKeyInfo& in,
std::string* out);
-static inline std::string streaming_job_meta_key_info(const
StreamingJobMetaKeyInfo& in) { std::string s; streaming_job_meta_key_info(in,
&s); return s; }
+void streaming_job_key(const StreamingJobKeyInfo& in, std::string* out);
+static inline std::string streaming_job_key(const StreamingJobKeyInfo& in) {
std::string s; streaming_job_key(in, &s); return s; }
std::string copy_key_prefix(std::string_view instance_id);
void copy_job_key(const CopyJobKeyInfo& in, std::string* out);
diff --git a/cloud/test/keys_test.cpp b/cloud/test/keys_test.cpp
index 94d331fcf27..fee168843c8 100644
--- a/cloud/test/keys_test.cpp
+++ b/cloud/test/keys_test.cpp
@@ -79,6 +79,8 @@ int decode_bytes(std::string_view* in, std::string* out);
// 0x01 "trash" ${instacne_id} "table" -> TableTrashPB
//
// 0x01 "node_status" ${instance_id} "compute" ${backend_id} ->
ComputeNodeStatusPB
+//
+// 0x01 "job" ${instance_id} "streaming_job" ${db_id} ${job_id} ->
StreamingJobPB
// clang-format on
TEST(KeysTest, InstanceKeyTest) {
@@ -1963,6 +1965,56 @@ TEST(KeysTest, RestoreJobKeysTest) {
}
}
+TEST(KeysTest, StreamingJobKeysTest) {
+ using namespace doris::cloud;
+ std::string instance_id = "instance_id_deadbeef";
+
+ // 0x01 "job" ${instance_id} "streaming_job" ${db_id} ${job_id} ->
StreamingJobPB
+ {
+ int64_t db_id = 123;
+ int64_t job_id = 456;
+ StreamingJobKeyInfo streaming_key {instance_id, db_id, job_id};
+ std::string encoded_streaming_key0;
+ streaming_job_key(streaming_key, &encoded_streaming_key0);
+ std::cout << hex(encoded_streaming_key0) << std::endl;
+
+ std::string dec_instance_id;
+ int64_t dec_db_id = 0;
+ int64_t dec_job_id = 0;
+
+ std::string_view key_sv(encoded_streaming_key0);
+ std::string dec_job_prefix;
+ std::string dec_streaming_infix;
+ remove_user_space_prefix(&key_sv);
+ ASSERT_EQ(decode_bytes(&key_sv, &dec_job_prefix), 0);
+ ASSERT_EQ(decode_bytes(&key_sv, &dec_instance_id), 0);
+ ASSERT_EQ(decode_bytes(&key_sv, &dec_streaming_infix), 0);
+ ASSERT_EQ(decode_int64(&key_sv, &dec_db_id), 0);
+ ASSERT_EQ(decode_int64(&key_sv, &dec_job_id), 0);
+ ASSERT_TRUE(key_sv.empty());
+
+ EXPECT_EQ("job", dec_job_prefix);
+ EXPECT_EQ("streaming_job", dec_streaming_infix);
+ EXPECT_EQ(instance_id, dec_instance_id);
+ EXPECT_EQ(db_id, dec_db_id);
+ EXPECT_EQ(job_id, dec_job_id);
+
+ std::get<2>(streaming_key) = job_id + 1;
+ std::string encoded_streaming_key1;
+ streaming_job_key(streaming_key, &encoded_streaming_key1);
+ std::cout << hex(encoded_streaming_key1) << std::endl;
+
+ ASSERT_GT(encoded_streaming_key1, encoded_streaming_key0);
+
+ std::get<1>(streaming_key) = db_id + 1;
+ std::string encoded_streaming_key2;
+ streaming_job_key(streaming_key, &encoded_streaming_key2);
+ std::cout << hex(encoded_streaming_key2) << std::endl;
+
+ ASSERT_GT(encoded_streaming_key2, encoded_streaming_key0);
+ }
+}
+
TEST(KeysTest, VersionedKeyPrefixTest) {
using namespace doris::cloud;
using namespace doris::cloud::versioned;
diff --git a/cloud/test/meta_service_job_test.cpp
b/cloud/test/meta_service_job_test.cpp
index c30be90e1da..9fa021e138f 100644
--- a/cloud/test/meta_service_job_test.cpp
+++ b/cloud/test/meta_service_job_test.cpp
@@ -4990,4 +4990,179 @@ TEST(MetaServiceJobTest, IdempotentCompactionJob) {
}
}
+TEST(MetaServiceJobTest, UpdateStreamingJobMetaTest) {
+ auto meta_service = get_meta_service();
+ ASSERT_NE(meta_service, nullptr);
+
+ // Test case 1: First time update (job doesn't exist)
+ {
+ CommitTxnRequest request;
+ request.set_cloud_unique_id("test_cloud_unique_id");
+ request.set_txn_id(12345);
+
+ TxnCommitAttachmentPB* attachment =
request.mutable_commit_attachment();
+
attachment->set_type(TxnCommitAttachmentPB::STREAMING_TASK_TXN_COMMIT_ATTACHMENT);
+
+ StreamingTaskCommitAttachmentPB* streaming_attach =
+ attachment->mutable_streaming_task_txn_commit_attachment();
+ streaming_attach->set_job_id(1001);
+ streaming_attach->set_task_id(2001);
+ streaming_attach->set_offset("test_offset_1");
+ streaming_attach->set_scanned_rows(1000);
+ streaming_attach->set_load_bytes(5000);
+ streaming_attach->set_num_files(10);
+ streaming_attach->set_file_bytes(8000);
+
+ CommitTxnResponse response;
+ brpc::Controller cntl;
+ meta_service->commit_txn(&cntl, &request, &response, nullptr);
+
+ EXPECT_FALSE(cntl.Failed()) << "Error: " << cntl.ErrorText();
+ EXPECT_EQ(response.status().code(), MetaServiceCode::OK);
+ }
+
+ // Test case 2: Update existing job (accumulate values)
+ {
+ CommitTxnRequest request;
+ request.set_cloud_unique_id("test_cloud_unique_id");
+ request.set_txn_id(12346);
+
+ TxnCommitAttachmentPB* attachment =
request.mutable_commit_attachment();
+
attachment->set_type(TxnCommitAttachmentPB::STREAMING_TASK_TXN_COMMIT_ATTACHMENT);
+
+ StreamingTaskCommitAttachmentPB* streaming_attach =
+ attachment->mutable_streaming_task_txn_commit_attachment();
+ streaming_attach->set_job_id(1001); // Same job_id as before
+ streaming_attach->set_task_id(2002);
+ streaming_attach->set_offset("test_offset_2");
+ streaming_attach->set_scanned_rows(500);
+ streaming_attach->set_load_bytes(2000);
+ streaming_attach->set_num_files(5);
+ streaming_attach->set_file_bytes(3000);
+
+ CommitTxnResponse response;
+ brpc::Controller cntl;
+ meta_service->commit_txn(&cntl, &request, &response, nullptr);
+
+ EXPECT_FALSE(cntl.Failed()) << "Error: " << cntl.ErrorText();
+ EXPECT_EQ(response.status().code(), MetaServiceCode::OK);
+ }
+
+ // Test case 3: Missing commit attachment
+ {
+ CommitTxnRequest request;
+ request.set_cloud_unique_id("test_cloud_unique_id");
+ request.set_txn_id(12347);
+ // No commit attachment set
+
+ CommitTxnResponse response;
+ brpc::Controller cntl;
+ meta_service->commit_txn(&cntl, &request, &response, nullptr);
+
+ EXPECT_FALSE(cntl.Failed()) << "Error: " << cntl.ErrorText();
+ EXPECT_EQ(response.status().code(), MetaServiceCode::INVALID_ARGUMENT);
+ EXPECT_TRUE(response.status().msg().find("missing commit attachment")
!= std::string::npos);
+ }
+}
+
+TEST(MetaServiceJobTest, GetStreamingTaskCommitAttachTest) {
+ auto meta_service = get_meta_service();
+ ASSERT_NE(meta_service, nullptr);
+
+ // First, create a streaming job by committing a txn
+ {
+ CommitTxnRequest request;
+ request.set_cloud_unique_id("test_cloud_unique_id");
+ request.set_txn_id(12348);
+
+ TxnCommitAttachmentPB* attachment =
request.mutable_commit_attachment();
+
attachment->set_type(TxnCommitAttachmentPB::STREAMING_TASK_TXN_COMMIT_ATTACHMENT);
+
+ StreamingTaskCommitAttachmentPB* streaming_attach =
+ attachment->mutable_streaming_task_txn_commit_attachment();
+ streaming_attach->set_job_id(1002);
+ streaming_attach->set_task_id(3001);
+ streaming_attach->set_offset("test_offset_3");
+ streaming_attach->set_scanned_rows(2000);
+ streaming_attach->set_load_bytes(10000);
+ streaming_attach->set_num_files(20);
+ streaming_attach->set_file_bytes(15000);
+
+ CommitTxnResponse response;
+ brpc::Controller cntl;
+ meta_service->commit_txn(&cntl, &request, &response, nullptr);
+
+ EXPECT_FALSE(cntl.Failed()) << "Error: " << cntl.ErrorText();
+ EXPECT_EQ(response.status().code(), MetaServiceCode::OK);
+ }
+
+ // Test case 1: Get existing streaming job
+ {
+ GetStreamingTaskCommitAttachRequest request;
+ request.set_cloud_unique_id("test_cloud_unique_id");
+ request.set_db_id(1000); // Assuming db_id from the commit
+ request.set_job_id(1002);
+
+ GetStreamingTaskCommitAttachResponse response;
+ brpc::Controller cntl;
+ meta_service->get_streaming_task_commit_attach(&cntl, &request,
&response, nullptr);
+
+ EXPECT_FALSE(cntl.Failed()) << "Error: " << cntl.ErrorText();
+ EXPECT_EQ(response.status().code(), MetaServiceCode::OK);
+ EXPECT_TRUE(response.has_commit_attach());
+ EXPECT_EQ(response.commit_attach().job_id(), 1002);
+ EXPECT_EQ(response.commit_attach().scanned_rows(), 2000);
+ EXPECT_EQ(response.commit_attach().load_bytes(), 10000);
+ EXPECT_EQ(response.commit_attach().num_files(), 20);
+ EXPECT_EQ(response.commit_attach().file_bytes(), 15000);
+ }
+
+ // Test case 2: Get non-existent streaming job
+ {
+ GetStreamingTaskCommitAttachRequest request;
+ request.set_cloud_unique_id("test_cloud_unique_id");
+ request.set_db_id(1000);
+ request.set_job_id(9999); // Non-existent job_id
+
+ GetStreamingTaskCommitAttachResponse response;
+ brpc::Controller cntl;
+ meta_service->get_streaming_task_commit_attach(&cntl, &request,
&response, nullptr);
+
+ EXPECT_FALSE(cntl.Failed()) << "Error: " << cntl.ErrorText();
+ EXPECT_EQ(response.status().code(),
MetaServiceCode::STREAMING_JOB_PROGRESS_NOT_FOUND);
+ EXPECT_TRUE(response.status().msg().find("progress info not found") !=
std::string::npos);
+ }
+
+ // Test case 3: Missing required fields
+ {
+ GetStreamingTaskCommitAttachRequest request;
+ request.set_cloud_unique_id("test_cloud_unique_id");
+ // Missing db_id and job_id
+
+ GetStreamingTaskCommitAttachResponse response;
+ brpc::Controller cntl;
+ meta_service->get_streaming_task_commit_attach(&cntl, &request,
&response, nullptr);
+
+ EXPECT_FALSE(cntl.Failed()) << "Error: " << cntl.ErrorText();
+ EXPECT_EQ(response.status().code(), MetaServiceCode::INVALID_ARGUMENT);
+ EXPECT_TRUE(response.status().msg().find("empty db_id or job_id") !=
std::string::npos);
+ }
+
+ // Test case 4: Invalid cloud_unique_id
+ {
+ GetStreamingTaskCommitAttachRequest request;
+ request.set_cloud_unique_id("invalid_cloud_unique_id");
+ request.set_db_id(1000);
+ request.set_job_id(1002);
+
+ GetStreamingTaskCommitAttachResponse response;
+ brpc::Controller cntl;
+ meta_service->get_streaming_task_commit_attach(&cntl, &request,
&response, nullptr);
+
+ EXPECT_FALSE(cntl.Failed()) << "Error: " << cntl.ErrorText();
+ EXPECT_EQ(response.status().code(), MetaServiceCode::INVALID_ARGUMENT);
+ EXPECT_TRUE(response.status().msg().find("empty instance_id") !=
std::string::npos);
+ }
+}
+
} // namespace doris::cloud
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java
index 424ce238544..06cd25d1357 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java
@@ -282,11 +282,10 @@ public class TxnUtil {
StreamingTaskCommitAttachmentPB.newBuilder();
builder.setJobId(streamingTaskTxnCommitAttachment.getJobId())
- .setTaskId(streamingTaskTxnCommitAttachment.getTaskId())
.setScannedRows(streamingTaskTxnCommitAttachment.getScannedRows())
.setLoadBytes(streamingTaskTxnCommitAttachment.getLoadBytes())
-
.setFileNumber(streamingTaskTxnCommitAttachment.getFileNumber())
- .setFileSize(streamingTaskTxnCommitAttachment.getFileSize());
+ .setNumFiles(streamingTaskTxnCommitAttachment.getNumFiles())
+ .setFileBytes(streamingTaskTxnCommitAttachment.getFileBytes());
if (streamingTaskTxnCommitAttachment.getOffset() != null) {
builder.setOffset(streamingTaskTxnCommitAttachment.getOffset());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 479698b6f44..e8fd03c8de4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -305,8 +305,8 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
}
this.jobStatistic.setScannedRows(this.jobStatistic.getScannedRows() +
attachment.getScannedRows());
this.jobStatistic.setLoadBytes(this.jobStatistic.getLoadBytes() +
attachment.getLoadBytes());
- this.jobStatistic.setFileNumber(this.jobStatistic.getFileNumber() +
attachment.getFileNumber());
- this.jobStatistic.setFileSize(this.jobStatistic.getFileSize() +
attachment.getFileSize());
+ this.jobStatistic.setFileNumber(this.jobStatistic.getFileNumber() +
attachment.getNumFiles());
+ this.jobStatistic.setFileSize(this.jobStatistic.getFileSize() +
attachment.getFileBytes());
offsetProvider.updateOffset(offsetProvider.deserializeOffset(attachment.getOffset()));
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java
index 4b7590824b4..b3f6a2c0b1b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java
@@ -27,14 +27,14 @@ import lombok.Getter;
public class StreamingTaskTxnCommitAttachment extends TxnCommitAttachment {
public StreamingTaskTxnCommitAttachment(long jobId, long taskId,
- long scannedRows, long loadBytes, long fileNumber, long
fileSize, String offset) {
+ long scannedRows, long loadBytes, long numFiles, long
fileBytes, String offset) {
super(TransactionState.LoadJobSourceType.STREAMING_JOB);
this.jobId = jobId;
this.taskId = taskId;
this.scannedRows = scannedRows;
this.loadBytes = loadBytes;
- this.fileNumber = fileNumber;
- this.fileSize = fileSize;
+ this.numFiles = numFiles;
+ this.fileBytes = fileBytes;
this.offset = offset;
}
@@ -42,8 +42,8 @@ public class StreamingTaskTxnCommitAttachment extends
TxnCommitAttachment {
super(TransactionState.LoadJobSourceType.STREAMING_JOB);
this.scannedRows = pb.getScannedRows();
this.loadBytes = pb.getLoadBytes();
- this.fileNumber = pb.getFileNumber();
- this.fileSize = pb.getFileSize();
+ this.numFiles = pb.getNumFiles();
+ this.fileBytes = pb.getFileBytes();
this.offset = pb.getOffset();
}
@@ -59,10 +59,10 @@ public class StreamingTaskTxnCommitAttachment extends
TxnCommitAttachment {
private long loadBytes;
@SerializedName(value = "fn")
@Getter
- private long fileNumber;
+ private long numFiles;
@SerializedName(value = "fs")
@Getter
- private long fileSize;
+ private long fileBytes;
@SerializedName(value = "of")
@Getter
private String offset;
@@ -72,8 +72,8 @@ public class StreamingTaskTxnCommitAttachment extends
TxnCommitAttachment {
return "StreamingTaskTxnCommitAttachment: ["
+ "scannedRows=" + scannedRows
+ ", loadBytes=" + loadBytes
- + ", fileNumber=" + fileNumber
- + ", fileSize=" + fileSize
+ + ", numFiles=" + numFiles
+ + ", fileBytes=" + fileBytes
+ ", offset=" + offset
+ "]";
}
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 6259d6d0da3..3428cff6cab 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -368,12 +368,11 @@ message RoutineLoadJobStatisticPB {
message StreamingTaskCommitAttachmentPB {
optional int64 job_id = 1;
- optional int64 task_id = 2;
- optional string offset = 3;
- optional int64 scanned_rows = 4;
- optional int64 load_bytes = 5;
- optional int64 file_number = 6;
- optional int64 file_size = 7;
+ optional string offset = 2;
+ optional int64 scanned_rows = 3;
+ optional int64 load_bytes = 4;
+ optional int64 num_files = 5;
+ optional int64 file_bytes = 6;
}
message TxnCommitAttachmentPB {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]