This is an automated email from the ASF dual-hosted git repository.
laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 1ccdb85a8 refactor: optimize the style of `partition_ingestion()` in
`bulk_load_service` (#2319)
1ccdb85a8 is described below
commit 1ccdb85a81e46d10dd3cfef237e4af1ed081216f
Author: Dan Wang <[email protected]>
AuthorDate: Wed Nov 19 00:04:49 2025 +0800
refactor: optimize the style of `partition_ingestion()` in
`bulk_load_service` (#2319)
---
src/meta/meta_bulk_load_service.cpp | 23 ++++++++++++-----------
1 file changed, 12 insertions(+), 11 deletions(-)
diff --git a/src/meta/meta_bulk_load_service.cpp
b/src/meta/meta_bulk_load_service.cpp
index fd2d15a91..40aa6d90d 100644
--- a/src/meta/meta_bulk_load_service.cpp
+++ b/src/meta/meta_bulk_load_service.cpp
@@ -1230,7 +1230,7 @@ void bulk_load_service::partition_ingestion(const
std::string &app_name, const g
{
FAIL_POINT_INJECT_F("meta_bulk_load_partition_ingestion",
[](std::string_view) {});
- auto app_status = get_app_bulk_load_status(pid.get_app_id());
+ const auto app_status = get_app_bulk_load_status(pid.get_app_id());
if (app_status != bulk_load_status::BLS_INGESTING) {
LOG_WARNING("app({}) current status is {}, partition({}), ignore it",
app_name,
@@ -1263,7 +1263,7 @@ void bulk_load_service::partition_ingestion(const
std::string &app_name, const g
return;
}
- auto app = get_app(pid.get_app_id());
+ const auto app = get_app(pid.get_app_id());
if (!try_partition_ingestion(pc,
app->helpers->contexts[pid.get_partition_index()])) {
LOG_WARNING(
"app({}) partition({}) couldn't execute ingestion, wait and try
later", app_name, pid);
@@ -1276,15 +1276,16 @@ void bulk_load_service::partition_ingestion(const
std::string &app_name, const g
return;
}
- const auto &primary = pc.hp_primary;
- ballot meta_ballot = pc.ballot;
- tasking::enqueue(
- LPC_BULK_LOAD_INGESTION,
- _meta_svc->tracker(),
- std::bind(
- &bulk_load_service::send_ingestion_request, this, app_name, pid,
primary, meta_ballot),
- 0,
-
std::chrono::milliseconds(bulk_load_constant::BULK_LOAD_REQUEST_INTERVAL));
+ tasking::enqueue(LPC_BULK_LOAD_INGESTION,
+ _meta_svc->tracker(),
+ std::bind(&bulk_load_service::send_ingestion_request,
+ this,
+ app_name,
+ pid,
+ pc.hp_primary,
+ pc.ballot),
+ 0,
+
std::chrono::milliseconds(bulk_load_constant::BULK_LOAD_REQUEST_INTERVAL));
}
// ThreadPool: THREAD_POOL_DEFAULT
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]