IMPALA-3713,IMPALA-4439: Fix Kudu DML shell reporting Adds support in the shell to report the number of modified rows for all DML operations, as well as the number of rows with errors.
Testing: Added shell tests. Change-Id: I3d3d7aa8d176e03ea58fb00f2a81fb3e34965aa1 Reviewed-on: http://gerrit.cloudera.org:8080/5103 Reviewed-by: Alex Behm <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/77a2941a Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/77a2941a Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/77a2941a Branch: refs/heads/master Commit: 77a2941a42b3023d22f3cc7b2db01a94b4ddec2f Parents: 3833707 Author: Matthew Jacobs <[email protected]> Authored: Tue Nov 15 19:21:32 2016 -0800 Committer: Internal Jenkins <[email protected]> Committed: Thu Nov 17 04:13:25 2016 +0000 ---------------------------------------------------------------------- be/src/service/impala-beeswax-server.cc | 9 ++++ common/thrift/ImpalaService.thrift | 10 ++++- shell/impala_client.py | 9 ++-- shell/impala_shell.py | 62 ++++++++++++++++++---------- tests/shell/test_shell_commandline.py | 37 +++++++++++++++++ 5 files changed, 102 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/77a2941a/be/src/service/impala-beeswax-server.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-beeswax-server.cc b/be/src/service/impala-beeswax-server.cc index c3f0c98..e740be2 100644 --- a/be/src/service/impala-beeswax-server.cc +++ b/be/src/service/impala-beeswax-server.cc @@ -520,13 +520,22 @@ Status ImpalaServer::CloseInsertInternal(const TUniqueId& query_id, // Note that when IMPALA-87 is fixed (INSERT without FROM clause) we might // need to revisit this, since that might lead us to insert a row without a // coordinator, depending on how we choose to drive the table sink. + int64_t num_row_errors = 0; + bool has_kudu_stats = false; if (exec_state->coord() != NULL) { for (const PartitionStatusMap::value_type& v: exec_state->coord()->per_partition_status()) { const pair<string, TInsertPartitionStatus> partition_status = v; insert_result->rows_modified[partition_status.first] = partition_status.second.num_modified_rows; + + if (partition_status.second.__isset.stats && + partition_status.second.stats.__isset.kudu_stats) { + has_kudu_stats = true; + } + num_row_errors += partition_status.second.stats.kudu_stats.num_row_errors; } + if (has_kudu_stats) insert_result->__set_num_row_errors(num_row_errors); } } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/77a2941a/common/thrift/ImpalaService.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift index 68a0588..ac1cf4a 100644 --- a/common/thrift/ImpalaService.thrift +++ b/common/thrift/ImpalaService.thrift @@ -249,12 +249,20 @@ enum TImpalaQueryOptions { ENABLE_EXPR_REWRITES } -// The summary of an insert. +// The summary of a DML statement. +// TODO: Rename to reflect that this is for all DML. struct TInsertResult { // Number of modified rows per partition. Only applies to HDFS and Kudu tables. // The keys represent partitions to create, coded as k1=v1/k2=v2/k3=v3..., with // the root in an unpartitioned table being the empty string. 1: required map<string, i64> rows_modified + + // Number of row operations attempted but not completed due to non-fatal errors + // reported by the storage engine that Impala treats as warnings. Only applies to Kudu + // tables. This includes errors due to duplicate/missing primary keys, nullability + // constraint violations, and primary keys in uncovered partition ranges. + // TODO: Provide a detailed breakdown of these counts by error. IMPALA-4416. + 2: optional i64 num_row_errors } // Response from a call to PingImpalaService http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/77a2941a/shell/impala_client.py ---------------------------------------------------------------------- diff --git a/shell/impala_client.py b/shell/impala_client.py index f84c14e..137a747 100755 --- a/shell/impala_client.py +++ b/shell/impala_client.py @@ -345,8 +345,11 @@ class ImpalaClient(object): if not result.has_more: break - def close_insert(self, last_query_handle): - """Fetches the results of an INSERT query""" + def close_dml(self, last_query_handle): + """Fetches the results of a DML query. Returns a tuple containing the + number of rows modified and the number of row errors, in that order. If the DML + operation doesn't return 'num_row_errors', then the second element in the tuple + is None.""" rpc_result = self._do_rpc( lambda: self.imp_service.CloseInsert(last_query_handle)) insert_result, status = rpc_result @@ -355,7 +358,7 @@ class ImpalaClient(object): raise RPCException() num_rows = sum([int(k) for k in insert_result.rows_modified.values()]) - return num_rows + return (num_rows, insert_result.num_row_errors) def close_query(self, last_query_handle, query_handle_closed=False): """Close the query handle""" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/77a2941a/shell/impala_shell.py ---------------------------------------------------------------------- diff --git a/shell/impala_shell.py b/shell/impala_shell.py index 74d7ecb..00aff33 100755 --- a/shell/impala_shell.py +++ b/shell/impala_shell.py @@ -106,8 +106,8 @@ class ImpalaShell(cmd.Cmd): # Variable names are prefixed with the following string VAR_PREFIXES = [ 'VAR', 'HIVEVAR' ] DEFAULT_DB = 'default' - # Regex applied to all tokens of a query to detect the query type. - INSERT_REGEX = re.compile("^insert$", re.I) + # Regex applied to all tokens of a query to detect DML statements. + DML_REGEX = re.compile("^(insert|upsert|update|delete)$", re.I) # Seperator for queries in the history file. HISTORY_FILE_QUERY_DELIM = '_IMP_DELIM_' @@ -772,7 +772,7 @@ class ImpalaShell(cmd.Cmd): return self._execute_stmt(query) def do_profile(self, args): - """Prints the runtime profile of the last INSERT or SELECT query executed.""" + """Prints the runtime profile of the last DML statement or SELECT query executed.""" if len(args) > 0: print_to_stderr("'profile' does not accept any arguments") return CmdStatus.ERROR @@ -856,13 +856,13 @@ class ImpalaShell(cmd.Cmd): "#Rows", "Est. #Rows", "Peak Mem", "Est. Peak Mem", "Detail"]) - def _execute_stmt(self, query, is_insert=False, print_web_link=False): + def _execute_stmt(self, query, is_dml=False, print_web_link=False): """ The logic of executing any query statement The client executes the query and the query_handle is returned immediately, even as the client waits for the query to finish executing. - If the query was not an insert, the results are fetched from the client + If the query was not dml, the results are fetched from the client as they are streamed in, through the use of a generator. The execution time is printed and the query is closed if it hasn't been already @@ -892,10 +892,10 @@ class ImpalaShell(cmd.Cmd): # Reset the progress stream. self.progress_stream.clear() - if is_insert: + if is_dml: # retrieve the error log warning_log = self.imp_client.get_warning_log(self.last_query_handle) - num_rows = self.imp_client.close_insert(self.last_query_handle) + (num_rows, num_row_errors) = self.imp_client.close_dml(self.last_query_handle) else: # impalad does not support the fetching of metadata for certain types of queries. if not self.imp_client.expect_result_metadata(query.query): @@ -920,13 +920,21 @@ class ImpalaShell(cmd.Cmd): if warning_log: self._print_if_verbose(warning_log) - # print insert when is_insert is true (which is 1) - # print fetch when is_insert is false (which is 0) - verb = ["Fetch", "Insert"][is_insert] - self._print_if_verbose("%sed %d row(s) in %2.2fs" % (verb, num_rows, - end_time - start_time)) + # print 'Modified' when is_dml is true (i.e. 1), or 'Fetched' otherwise. + verb = ["Fetched", "Modified"][is_dml] + time_elapsed = end_time - start_time + + # Add the number of row errors if this DML and the operation supports it. + # num_row_errors is None if the DML operation doesn't return it. + if is_dml and num_row_errors is not None: + error_report = ", %d row error(s)" % (num_row_errors) + else: + error_report = "" + + self._print_if_verbose("%s %d row(s)%s in %2.2fs" %\ + (verb, num_rows, error_report, time_elapsed)) - if not is_insert: + if not is_dml: self.imp_client.close_query(self.last_query_handle, self.query_handle_closed) self.query_handle_closed = True @@ -989,12 +997,12 @@ class ImpalaShell(cmd.Cmd): # to deal with escaped quotes in string literals lexer = shlex.shlex(query.query.lstrip(), posix=True) lexer.escapedquotes += "'" - # Because the WITH clause may precede INSERT or SELECT queries, + # Because the WITH clause may precede DML or SELECT queries, # just checking the first token is insufficient. - is_insert = False + is_dml = False tokens = list(lexer) - if filter(self.INSERT_REGEX.match, tokens): is_insert = True - return self._execute_stmt(query, is_insert=is_insert) + if filter(self.DML_REGEX.match, tokens): is_dml = True + return self._execute_stmt(query, is_dml=is_dml) def do_use(self, args): """Executes a USE... query""" @@ -1020,11 +1028,23 @@ class ImpalaShell(cmd.Cmd): def do_desc(self, args): return self.do_describe(args) - def do_insert(self, args): - """Executes an INSERT query""" - query = self.imp_client.create_beeswax_query("insert %s" % args, + def __do_dml(self, stmt, args): + """Executes a DML query""" + query = self.imp_client.create_beeswax_query("%s %s" % (stmt, args), self.set_query_options) - return self._execute_stmt(query, is_insert=True, print_web_link=True) + return self._execute_stmt(query, is_dml=True, print_web_link=True) + + def do_upsert(self, args): + return self.__do_dml("upsert", args) + + def do_update(self, args): + return self.__do_dml("update", args) + + def do_delete(self, args): + return self.__do_dml("delete", args) + + def do_insert(self, args): + return self.__do_dml("insert", args) def do_explain(self, args): """Explain the query execution plan""" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/77a2941a/tests/shell/test_shell_commandline.py ---------------------------------------------------------------------- diff --git a/tests/shell/test_shell_commandline.py b/tests/shell/test_shell_commandline.py index 16a68ae..d880c2d 100644 --- a/tests/shell/test_shell_commandline.py +++ b/tests/shell/test_shell_commandline.py @@ -462,6 +462,43 @@ class TestImpalaShell(ImpalaTestSuite): results = run_impala_shell_cmd('--query="show tables"') self._validate_shell_messages(results.stderr, shell_messages, should_exist=False) + def test_insert_status(self, unique_database): + run_impala_shell_cmd('--query="create table %s.insert_test (id int)"' % + unique_database) + results = run_impala_shell_cmd('--query="insert into %s.insert_test values (1)"' % + unique_database) + assert "Modified 1 row(s)" in results.stderr + + def _validate_dml_stmt(self, stmt, expected_rows_modified, expected_row_errors): + results = run_impala_shell_cmd('--query="%s"' % (stmt)) + expected_output = "Modified %d row(s), %d row error(s)" %\ + (expected_rows_modified, expected_row_errors) + assert expected_output in results.stderr + + def test_kudu_dml_reporting(self, unique_database): + db = unique_database + run_impala_shell_cmd('--query="create table %s.dml_test (id int primary key, '\ + 'age int) distribute by hash(id) into 2 buckets stored as kudu"' % db) + + self._validate_dml_stmt("insert into %s.dml_test (id) values (7), (7)" % db, 1, 1) + self._validate_dml_stmt("insert into %s.dml_test (id) values (7)" % db, 0, 1) + self._validate_dml_stmt("upsert into %s.dml_test (id) values (7), (7)" % db, 2, 0) + self._validate_dml_stmt("update %s.dml_test set age = 1 where id = 7" % db, 1, 0) + self._validate_dml_stmt("delete from %s.dml_test where id = 7" % db, 1, 0) + + # UPDATE/DELETE where there are no matching rows; there are no errors because the + # scan produced no rows. + self._validate_dml_stmt("update %s.dml_test set age = 1 where id = 8" % db, 0, 0) + self._validate_dml_stmt("delete from %s.dml_test where id = 7" % db, 0, 0) + + # WITH clauses, only apply to INSERT and UPSERT + self._validate_dml_stmt(\ + "with y as (values(7)) insert into %s.dml_test (id) select * from y" % db, 1, 0) + self._validate_dml_stmt(\ + "with y as (values(7)) insert into %s.dml_test (id) select * from y" % db, 0, 1) + self._validate_dml_stmt(\ + "with y as (values(7)) upsert into %s.dml_test (id) select * from y" % db, 1, 0) + def test_missing_query_file(self): result = run_impala_shell_cmd('-f nonexistent.sql', expect_success=False) assert "Could not open file 'nonexistent.sql'" in result.stderr
