This is an automated email from the ASF dual-hosted git repository.
zhaoliwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new b656d930 feat(bulk_load): add bulk load ingestion progress for shell
and admin-cli (#975)
b656d930 is described below
commit b656d930875c524aa6ef07d6f64d68bff14ed20f
Author: HeYuchen <[email protected]>
AuthorDate: Thu May 19 10:13:36 2022 +0800
feat(bulk_load): add bulk load ingestion progress for shell and admin-cli
(#975)
---
admin-cli/executor/bulk_load.go | 47 ++++++++++++++++++++++++++++++++++++++++
src/shell/commands/bulk_load.cpp | 27 +++++++++++++++--------
2 files changed, 65 insertions(+), 9 deletions(-)
diff --git a/admin-cli/executor/bulk_load.go b/admin-cli/executor/bulk_load.go
index fec3b7a4..99e69dc3 100644
--- a/admin-cli/executor/bulk_load.go
+++ b/admin-cli/executor/bulk_load.go
@@ -79,6 +79,8 @@ func QueryBulkLoad(client *Client, tableName string,
partitionIndex int, detaile
switch tableStatus {
case admin.BulkLoadStatus_BLS_DOWNLOADING:
PrintAllDownloading(client, resp, detailed)
+ case admin.BulkLoadStatus_BLS_INGESTING:
+ PrintAllIngestion(client, resp, detailed)
case admin.BulkLoadStatus_BLS_SUCCEED,
admin.BulkLoadStatus_BLS_FAILED, admin.BulkLoadStatus_BLS_CANCELED:
PrintAllCleanupFlag(client, resp, detailed)
default:
@@ -156,6 +158,51 @@ func PrintAllDownloading(client *Client, resp
*admin.QueryBulkLoadResponse, deta
}).Render()
}
+func PrintAllIngestion(client *Client, resp *admin.QueryBulkLoadResponse,
detailed bool) {
+ partitionCount := len(resp.GetPartitionsStatus())
+
+ var aList []interface{}
+ type allStruct struct {
+ Pidx int32 `json:"PartitionIndex"`
+ Status string `json:"PartitionStatus"`
+ Progress string `json:"Progress"`
+ }
+ var totalProgress int32 = 0
+ for i := 0; i < partitionCount; i++ {
+ aList = append(aList, allStruct{
+ Pidx: int32(i),
+ Status: resp.GetPartitionsStatus()[i].String(),
+ Progress: "",
+ })
+ if resp.GetPartitionsStatus()[i] ==
admin.BulkLoadStatus_BLS_SUCCEED {
+ totalProgress++
+ }
+ }
+ totalProgress = totalProgress * 100 / int32(partitionCount)
+ if !detailed {
+ var tList []interface{}
+ type summaryStruct struct {
+ TName string `json:"TableName"`
+ TStatus string `json:"TableStatus"`
+ Progress int32 `json:"TotalIngestionProgress"`
+ }
+ tList = append(tList, summaryStruct{
+ TName: resp.GetAppName(),
+ TStatus: resp.GetAppStatus().String(),
+ Progress: totalProgress,
+ })
+ tabular.Print(client, tList)
+ return
+ }
+ tabular.New(client, aList, func(tbWriter *tablewriter.Table) {
+ tbWriter.SetFooter([]string{
+ fmt.Sprintf("TableName(%s)", resp.GetAppName()),
+ fmt.Sprintf("Table(%s)", resp.GetAppStatus().String()),
+ fmt.Sprintf("%d", totalProgress),
+ })
+ }).Render()
+}
+
func PrintAllOthers(client *Client, resp *admin.QueryBulkLoadResponse,
detailed bool) {
partitionCount := len(resp.GetPartitionsStatus())
if !detailed {
diff --git a/src/shell/commands/bulk_load.cpp b/src/shell/commands/bulk_load.cpp
index 62200bd7..8fcbcf7f 100644
--- a/src/shell/commands/bulk_load.cpp
+++ b/src/shell/commands/bulk_load.cpp
@@ -297,11 +297,12 @@ bool query_bulk_load_status(command_executor *e,
shell_context *sc, arguments ar
dsn::utils::multi_table_printer mtp;
bool all_partitions = (pidx == -1);
- bool print_progress = (resp.app_status ==
bulk_load_status::BLS_DOWNLOADING);
+ bool print_ingestion_progress = (resp.app_status ==
bulk_load_status::BLS_INGESTING);
+ bool print_download_progress = (resp.app_status ==
bulk_load_status::BLS_DOWNLOADING);
std::unordered_map<int32_t, int32_t> partitions_progress;
- auto total_progress = 0;
- if (print_progress) {
+ auto total_download_progress = 0, total_ingestion_progress = 0;
+ if (print_download_progress) {
for (auto i = 0; i < partition_count; ++i) {
auto progress = 0;
for (const auto &kv : resp.bulk_load_states[i]) {
@@ -309,9 +310,9 @@ bool query_bulk_load_status(command_executor *e,
shell_context *sc, arguments ar
}
progress /= resp.max_replica_count;
partitions_progress.insert(std::make_pair(i, progress));
- total_progress += progress;
+ total_download_progress += progress;
}
- total_progress /= partition_count;
+ total_download_progress /= partition_count;
}
// print all partitions
@@ -322,7 +323,7 @@ bool query_bulk_load_status(command_executor *e,
shell_context *sc, arguments ar
dsn::utils::table_printer tp_all("all partitions");
tp_all.add_title("partition_index");
tp_all.add_column("partition_status");
- if (print_progress) {
+ if (print_download_progress) {
tp_all.add_column("download_progress(%)");
}
if (print_cleanup_flag) {
@@ -333,9 +334,13 @@ bool query_bulk_load_status(command_executor *e,
shell_context *sc, arguments ar
auto states = resp.bulk_load_states[i];
tp_all.add_row(i);
tp_all.append_data(get_short_status(resp.partitions_status[i]));
- if (print_progress) {
+ if (print_download_progress) {
tp_all.append_data(partitions_progress[i]);
}
+ if (print_ingestion_progress &&
+ resp.partitions_status[i] == bulk_load_status::BLS_SUCCEED) {
+ total_ingestion_progress += 1;
+ }
if (print_cleanup_flag) {
bool is_cleanup = (states.size() == resp.max_replica_count);
for (const auto &kv : states) {
@@ -409,8 +414,12 @@ bool query_bulk_load_status(command_executor *e,
shell_context *sc, arguments ar
if (bulk_load_status::BLS_FAILED == resp.app_status) {
tp_summary.add_row_name_and_data("bulk_load_err",
resp.err.to_string());
}
- if (print_progress) {
- tp_summary.add_row_name_and_data("app_total_download_progress",
total_progress);
+ if (print_download_progress) {
+ tp_summary.add_row_name_and_data("app_total_download_progress",
total_download_progress);
+ }
+ if (print_ingestion_progress) {
+ tp_summary.add_row_name_and_data("app_total_ingestion_progress",
+ total_ingestion_progress * 100 /
partition_count);
}
mtp.add(std::move(tp_summary));
mtp.output(std::cout, tp_output_format::kTabular);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]