This is an automated email from the ASF dual-hosted git repository.

dbecker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c003cdcd IMPALA-12978: Fix impala-shell`s live progress with older 
Impalas
5c003cdcd is described below

commit 5c003cdcda604c43dd836571db02afcfbdc05dbc
Author: Csaba Ringhofer <[email protected]>
AuthorDate: Mon Apr 8 16:42:35 2024 +0200

    IMPALA-12978: Fix impala-shell`s live progress with older Impalas
    
    If the Impala server has an older version that does not contain
    IMPALA-12048 then TExecProgress.total_fragment_instances will be
    None, leading to error when checking total_fragment_instances > 0.
    
    Note that this issue only comes with Python 3, in Python 2 None > 0
    returns False.
    
    Testing:
    - Manually checked with a modified Impala that doesn't set
      total_fragment_instances. Only the scanner progress bar is shown
      in this case.
    
    Change-Id: Ic6562ff6c908bfebd09b7612bc5bcbd92623a8e6
    Reviewed-on: http://gerrit.cloudera.org:8080/21256
    Reviewed-by: Michael Smith <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
    Reviewed-by: Zihao Ye <[email protected]>
---
 shell/impala_shell.py | 103 ++++++++++++++++++++++++++------------------------
 1 file changed, 53 insertions(+), 50 deletions(-)

diff --git a/shell/impala_shell.py b/shell/impala_shell.py
index cbc28f2a0..e76f09599 100755
--- a/shell/impala_shell.py
+++ b/shell/impala_shell.py
@@ -1282,58 +1282,61 @@ class ImpalaShell(cmd.Cmd, object):
       if not summary:
         return
 
-      if summary.is_queued:
-        queued_msg = "Query queued. Latest queuing reason: %s\n" % 
summary.queued_reason
-        self.progress_stream.write(queued_msg)
-        self.last_summary = time.time()
-        return
-
-      data = ""
-      if summary.error_logs:
-        for error_line in summary.error_logs:
-          data += error_line + "\n"
-          if self.webserver_address:
-            query_id_search = re.search("Retrying query using query id: (.*)",
-                                        error_line)
-            if query_id_search and len(query_id_search.groups()) == 1:
-              retried_query_id = query_id_search.group(1)
-              data += "Retried query link: %s\n"\
-                      % self.imp_client.get_query_link(retried_query_id)
-
-      if summary.progress:
-        progress = summary.progress
-
-        # If the data is not complete return and wait for a good result.
-        if not progress.total_scan_ranges and not 
progress.num_completed_scan_ranges and \
-           not progress.total_fragment_instances and \
-           not progress.num_completed_fragment_instances:
-          self.last_summary = time.time()
-          return
-
-        if self.live_progress and progress.total_scan_ranges > 0:
-          val = ((summary.progress.num_completed_scan_ranges * 100)
-                 // summary.progress.total_scan_ranges)
-          scan_progress_text =\
-              " Scan Progress:[%s%s] %s%%\n" % ("#" * val, " " * (100 - val), 
val)
-          data += scan_progress_text
-
-        if self.live_progress and progress.total_fragment_instances > 0:
-          val = ((progress.num_completed_fragment_instances * 100)
-                 // progress.total_fragment_instances)
-          query_progress_text =\
-              "Query Progress:[%s%s] %s%%\n" % ("#" * val, " " * (100 - val), 
val)
-          data += query_progress_text
-
-        if self.live_summary:
-          table = self._default_summary_table()
-          output = []
-          self.imp_client.build_summary_table(summary, 0, False, 0, False, 
output)
-          formatter = PrettyOutputFormatter(table)
-          data += formatter.format(output) + "\n"
-
-      self.progress_stream.write(data)
+      summary_str = self._format_periodic_summary(summary)
+      if summary_str:
+        self.progress_stream.write(summary_str)
       self.last_summary = time.time()
 
+  def _format_periodic_summary(self, summary):
+    if summary.is_queued:
+      return "Query queued. Latest queuing reason: %s\n" % 
summary.queued_reason
+
+    data = ""
+    if summary.error_logs:
+      for error_line in summary.error_logs:
+        data += error_line + "\n"
+        if self.webserver_address:
+          query_id_search = re.search("Retrying query using query id: (.*)",
+                                      error_line)
+          if query_id_search and len(query_id_search.groups()) == 1:
+            retried_query_id = query_id_search.group(1)
+            data += "Retried query link: %s\n"\
+                    % self.imp_client.get_query_link(retried_query_id)
+
+    if summary.progress:
+      progress = summary.progress
+
+      has_scan_progress = progress.num_completed_scan_ranges is not None \
+          and progress.total_scan_ranges is not None \
+          and progress.total_scan_ranges > 0
+
+      if self.live_progress and has_scan_progress:
+        val = ((summary.progress.num_completed_scan_ranges * 100)
+                // summary.progress.total_scan_ranges)
+        scan_progress_text =\
+            " Scan Progress:[%s%s] %s%%\n" % ("#" * val, " " * (100 - val), 
val)
+        data += scan_progress_text
+
+      has_query_progress = progress.num_completed_fragment_instances is not 
None \
+          and progress.total_fragment_instances is not None \
+          and progress.total_fragment_instances > 0
+
+      if self.live_progress and has_query_progress:
+        val = ((progress.num_completed_fragment_instances * 100)
+                // progress.total_fragment_instances)
+        query_progress_text =\
+            "Query Progress:[%s%s] %s%%\n" % ("#" * val, " " * (100 - val), 
val)
+        data += query_progress_text
+
+      if self.live_summary:
+        table = self._default_summary_table()
+        output = []
+        self.imp_client.build_summary_table(summary, 0, False, 0, False, 
output)
+        formatter = PrettyOutputFormatter(table)
+        data += formatter.format(output) + "\n"
+
+    return data
+
   def _default_summary_table(self):
     return self.construct_table_with_header(["Operator", "#Hosts", "#Inst",
                                              "Avg Time", "Max Time", "#Rows",

Reply via email to