This is an automated email from the ASF dual-hosted git repository.

zhaoliwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new b656d930 feat(bulk_load): add bulk load ingestion progress for shell 
and admin-cli (#975)
b656d930 is described below

commit b656d930875c524aa6ef07d6f64d68bff14ed20f
Author: HeYuchen <[email protected]>
AuthorDate: Thu May 19 10:13:36 2022 +0800

    feat(bulk_load): add bulk load ingestion progress for shell and admin-cli 
(#975)
---
 admin-cli/executor/bulk_load.go  | 47 ++++++++++++++++++++++++++++++++++++++++
 src/shell/commands/bulk_load.cpp | 27 +++++++++++++++--------
 2 files changed, 65 insertions(+), 9 deletions(-)

diff --git a/admin-cli/executor/bulk_load.go b/admin-cli/executor/bulk_load.go
index fec3b7a4..99e69dc3 100644
--- a/admin-cli/executor/bulk_load.go
+++ b/admin-cli/executor/bulk_load.go
@@ -79,6 +79,8 @@ func QueryBulkLoad(client *Client, tableName string, 
partitionIndex int, detaile
                switch tableStatus {
                case admin.BulkLoadStatus_BLS_DOWNLOADING:
                        PrintAllDownloading(client, resp, detailed)
+               case admin.BulkLoadStatus_BLS_INGESTING:
+                       PrintAllIngestion(client, resp, detailed)
                case admin.BulkLoadStatus_BLS_SUCCEED, 
admin.BulkLoadStatus_BLS_FAILED, admin.BulkLoadStatus_BLS_CANCELED:
                        PrintAllCleanupFlag(client, resp, detailed)
                default:
@@ -156,6 +158,51 @@ func PrintAllDownloading(client *Client, resp 
*admin.QueryBulkLoadResponse, deta
        }).Render()
 }
 
+func PrintAllIngestion(client *Client, resp *admin.QueryBulkLoadResponse, 
detailed bool) {
+       partitionCount := len(resp.GetPartitionsStatus())
+
+       var aList []interface{}
+       type allStruct struct {
+               Pidx     int32  `json:"PartitionIndex"`
+               Status   string `json:"PartitionStatus"`
+               Progress string `json:"Progress"`
+       }
+       var totalProgress int32 = 0
+       for i := 0; i < partitionCount; i++ {
+               aList = append(aList, allStruct{
+                       Pidx:     int32(i),
+                       Status:   resp.GetPartitionsStatus()[i].String(),
+                       Progress: "",
+               })
+               if resp.GetPartitionsStatus()[i] == 
admin.BulkLoadStatus_BLS_SUCCEED {
+                       totalProgress++
+               }
+       }
+       totalProgress = totalProgress * 100 / int32(partitionCount)
+       if !detailed {
+               var tList []interface{}
+               type summaryStruct struct {
+                       TName    string `json:"TableName"`
+                       TStatus  string `json:"TableStatus"`
+                       Progress int32  `json:"TotalIngestionProgress"`
+               }
+               tList = append(tList, summaryStruct{
+                       TName:    resp.GetAppName(),
+                       TStatus:  resp.GetAppStatus().String(),
+                       Progress: totalProgress,
+               })
+               tabular.Print(client, tList)
+               return
+       }
+       tabular.New(client, aList, func(tbWriter *tablewriter.Table) {
+               tbWriter.SetFooter([]string{
+                       fmt.Sprintf("TableName(%s)", resp.GetAppName()),
+                       fmt.Sprintf("Table(%s)", resp.GetAppStatus().String()),
+                       fmt.Sprintf("%d", totalProgress),
+               })
+       }).Render()
+}
+
 func PrintAllOthers(client *Client, resp *admin.QueryBulkLoadResponse, 
detailed bool) {
        partitionCount := len(resp.GetPartitionsStatus())
        if !detailed {
diff --git a/src/shell/commands/bulk_load.cpp b/src/shell/commands/bulk_load.cpp
index 62200bd7..8fcbcf7f 100644
--- a/src/shell/commands/bulk_load.cpp
+++ b/src/shell/commands/bulk_load.cpp
@@ -297,11 +297,12 @@ bool query_bulk_load_status(command_executor *e, 
shell_context *sc, arguments ar
     dsn::utils::multi_table_printer mtp;
 
     bool all_partitions = (pidx == -1);
-    bool print_progress = (resp.app_status == 
bulk_load_status::BLS_DOWNLOADING);
+    bool print_ingestion_progress = (resp.app_status == 
bulk_load_status::BLS_INGESTING);
+    bool print_download_progress = (resp.app_status == 
bulk_load_status::BLS_DOWNLOADING);
 
     std::unordered_map<int32_t, int32_t> partitions_progress;
-    auto total_progress = 0;
-    if (print_progress) {
+    auto total_download_progress = 0, total_ingestion_progress = 0;
+    if (print_download_progress) {
         for (auto i = 0; i < partition_count; ++i) {
             auto progress = 0;
             for (const auto &kv : resp.bulk_load_states[i]) {
@@ -309,9 +310,9 @@ bool query_bulk_load_status(command_executor *e, 
shell_context *sc, arguments ar
             }
             progress /= resp.max_replica_count;
             partitions_progress.insert(std::make_pair(i, progress));
-            total_progress += progress;
+            total_download_progress += progress;
         }
-        total_progress /= partition_count;
+        total_download_progress /= partition_count;
     }
 
     // print all partitions
@@ -322,7 +323,7 @@ bool query_bulk_load_status(command_executor *e, 
shell_context *sc, arguments ar
         dsn::utils::table_printer tp_all("all partitions");
         tp_all.add_title("partition_index");
         tp_all.add_column("partition_status");
-        if (print_progress) {
+        if (print_download_progress) {
             tp_all.add_column("download_progress(%)");
         }
         if (print_cleanup_flag) {
@@ -333,9 +334,13 @@ bool query_bulk_load_status(command_executor *e, 
shell_context *sc, arguments ar
             auto states = resp.bulk_load_states[i];
             tp_all.add_row(i);
             tp_all.append_data(get_short_status(resp.partitions_status[i]));
-            if (print_progress) {
+            if (print_download_progress) {
                 tp_all.append_data(partitions_progress[i]);
             }
+            if (print_ingestion_progress &&
+                resp.partitions_status[i] == bulk_load_status::BLS_SUCCEED) {
+                total_ingestion_progress += 1;
+            }
             if (print_cleanup_flag) {
                 bool is_cleanup = (states.size() == resp.max_replica_count);
                 for (const auto &kv : states) {
@@ -409,8 +414,12 @@ bool query_bulk_load_status(command_executor *e, 
shell_context *sc, arguments ar
     if (bulk_load_status::BLS_FAILED == resp.app_status) {
         tp_summary.add_row_name_and_data("bulk_load_err", 
resp.err.to_string());
     }
-    if (print_progress) {
-        tp_summary.add_row_name_and_data("app_total_download_progress", 
total_progress);
+    if (print_download_progress) {
+        tp_summary.add_row_name_and_data("app_total_download_progress", 
total_download_progress);
+    }
+    if (print_ingestion_progress) {
+        tp_summary.add_row_name_and_data("app_total_ingestion_progress",
+                                         total_ingestion_progress * 100 / 
partition_count);
     }
     mtp.add(std::move(tp_summary));
     mtp.output(std::cout, tp_output_format::kTabular);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to