This is an automated email from the ASF dual-hosted git repository.
dbecker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 5c003cdcd IMPALA-12978: Fix impala-shell`s live progress with older
Impalas
5c003cdcd is described below
commit 5c003cdcda604c43dd836571db02afcfbdc05dbc
Author: Csaba Ringhofer <[email protected]>
AuthorDate: Mon Apr 8 16:42:35 2024 +0200
IMPALA-12978: Fix impala-shell`s live progress with older Impalas
If the Impala server has an older version that does not contain
IMPALA-12048 then TExecProgress.total_fragment_instances will be
None, leading to error when checking total_fragment_instances > 0.
Note that this issue only comes with Python 3, in Python 2 None > 0
returns False.
Testing:
- Manually checked with a modified Impala that doesn't set
total_fragment_instances. Only the scanner progress bar is shown
in this case.
Change-Id: Ic6562ff6c908bfebd09b7612bc5bcbd92623a8e6
Reviewed-on: http://gerrit.cloudera.org:8080/21256
Reviewed-by: Michael Smith <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
Reviewed-by: Zihao Ye <[email protected]>
---
shell/impala_shell.py | 103 ++++++++++++++++++++++++++------------------------
1 file changed, 53 insertions(+), 50 deletions(-)
diff --git a/shell/impala_shell.py b/shell/impala_shell.py
index cbc28f2a0..e76f09599 100755
--- a/shell/impala_shell.py
+++ b/shell/impala_shell.py
@@ -1282,58 +1282,61 @@ class ImpalaShell(cmd.Cmd, object):
if not summary:
return
- if summary.is_queued:
- queued_msg = "Query queued. Latest queuing reason: %s\n" %
summary.queued_reason
- self.progress_stream.write(queued_msg)
- self.last_summary = time.time()
- return
-
- data = ""
- if summary.error_logs:
- for error_line in summary.error_logs:
- data += error_line + "\n"
- if self.webserver_address:
- query_id_search = re.search("Retrying query using query id: (.*)",
- error_line)
- if query_id_search and len(query_id_search.groups()) == 1:
- retried_query_id = query_id_search.group(1)
- data += "Retried query link: %s\n"\
- % self.imp_client.get_query_link(retried_query_id)
-
- if summary.progress:
- progress = summary.progress
-
- # If the data is not complete return and wait for a good result.
- if not progress.total_scan_ranges and not
progress.num_completed_scan_ranges and \
- not progress.total_fragment_instances and \
- not progress.num_completed_fragment_instances:
- self.last_summary = time.time()
- return
-
- if self.live_progress and progress.total_scan_ranges > 0:
- val = ((summary.progress.num_completed_scan_ranges * 100)
- // summary.progress.total_scan_ranges)
- scan_progress_text =\
- " Scan Progress:[%s%s] %s%%\n" % ("#" * val, " " * (100 - val),
val)
- data += scan_progress_text
-
- if self.live_progress and progress.total_fragment_instances > 0:
- val = ((progress.num_completed_fragment_instances * 100)
- // progress.total_fragment_instances)
- query_progress_text =\
- "Query Progress:[%s%s] %s%%\n" % ("#" * val, " " * (100 - val),
val)
- data += query_progress_text
-
- if self.live_summary:
- table = self._default_summary_table()
- output = []
- self.imp_client.build_summary_table(summary, 0, False, 0, False,
output)
- formatter = PrettyOutputFormatter(table)
- data += formatter.format(output) + "\n"
-
- self.progress_stream.write(data)
+ summary_str = self._format_periodic_summary(summary)
+ if summary_str:
+ self.progress_stream.write(summary_str)
self.last_summary = time.time()
+ def _format_periodic_summary(self, summary):
+ if summary.is_queued:
+ return "Query queued. Latest queuing reason: %s\n" %
summary.queued_reason
+
+ data = ""
+ if summary.error_logs:
+ for error_line in summary.error_logs:
+ data += error_line + "\n"
+ if self.webserver_address:
+ query_id_search = re.search("Retrying query using query id: (.*)",
+ error_line)
+ if query_id_search and len(query_id_search.groups()) == 1:
+ retried_query_id = query_id_search.group(1)
+ data += "Retried query link: %s\n"\
+ % self.imp_client.get_query_link(retried_query_id)
+
+ if summary.progress:
+ progress = summary.progress
+
+ has_scan_progress = progress.num_completed_scan_ranges is not None \
+ and progress.total_scan_ranges is not None \
+ and progress.total_scan_ranges > 0
+
+ if self.live_progress and has_scan_progress:
+ val = ((summary.progress.num_completed_scan_ranges * 100)
+ // summary.progress.total_scan_ranges)
+ scan_progress_text =\
+ " Scan Progress:[%s%s] %s%%\n" % ("#" * val, " " * (100 - val),
val)
+ data += scan_progress_text
+
+ has_query_progress = progress.num_completed_fragment_instances is not
None \
+ and progress.total_fragment_instances is not None \
+ and progress.total_fragment_instances > 0
+
+ if self.live_progress and has_query_progress:
+ val = ((progress.num_completed_fragment_instances * 100)
+ // progress.total_fragment_instances)
+ query_progress_text =\
+ "Query Progress:[%s%s] %s%%\n" % ("#" * val, " " * (100 - val),
val)
+ data += query_progress_text
+
+ if self.live_summary:
+ table = self._default_summary_table()
+ output = []
+ self.imp_client.build_summary_table(summary, 0, False, 0, False,
output)
+ formatter = PrettyOutputFormatter(table)
+ data += formatter.format(output) + "\n"
+
+ return data
+
def _default_summary_table(self):
return self.construct_table_with_header(["Operator", "#Hosts", "#Inst",
"Avg Time", "Max Time", "#Rows",