IMPALA-3713,IMPALA-4439: Fix Kudu DML shell reporting

Adds support in the shell to report the number of modified
rows for all DML operations, as well as the number of rows
with errors.

Testing: Added shell tests.

Change-Id: I3d3d7aa8d176e03ea58fb00f2a81fb3e34965aa1
Reviewed-on: http://gerrit.cloudera.org:8080/5103
Reviewed-by: Alex Behm <[email protected]>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/77a2941a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/77a2941a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/77a2941a

Branch: refs/heads/master
Commit: 77a2941a42b3023d22f3cc7b2db01a94b4ddec2f
Parents: 3833707
Author: Matthew Jacobs <[email protected]>
Authored: Tue Nov 15 19:21:32 2016 -0800
Committer: Internal Jenkins <[email protected]>
Committed: Thu Nov 17 04:13:25 2016 +0000

----------------------------------------------------------------------
 be/src/service/impala-beeswax-server.cc |  9 ++++
 common/thrift/ImpalaService.thrift      | 10 ++++-
 shell/impala_client.py                  |  9 ++--
 shell/impala_shell.py                   | 62 ++++++++++++++++++----------
 tests/shell/test_shell_commandline.py   | 37 +++++++++++++++++
 5 files changed, 102 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/77a2941a/be/src/service/impala-beeswax-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-beeswax-server.cc 
b/be/src/service/impala-beeswax-server.cc
index c3f0c98..e740be2 100644
--- a/be/src/service/impala-beeswax-server.cc
+++ b/be/src/service/impala-beeswax-server.cc
@@ -520,13 +520,22 @@ Status ImpalaServer::CloseInsertInternal(const TUniqueId& 
query_id,
       // Note that when IMPALA-87 is fixed (INSERT without FROM clause) we 
might
       // need to revisit this, since that might lead us to insert a row 
without a
       // coordinator, depending on how we choose to drive the table sink.
+      int64_t num_row_errors = 0;
+      bool has_kudu_stats = false;
       if (exec_state->coord() != NULL) {
         for (const PartitionStatusMap::value_type& v:
              exec_state->coord()->per_partition_status()) {
           const pair<string, TInsertPartitionStatus> partition_status = v;
           insert_result->rows_modified[partition_status.first] =
               partition_status.second.num_modified_rows;
+
+          if (partition_status.second.__isset.stats &&
+              partition_status.second.stats.__isset.kudu_stats) {
+            has_kudu_stats = true;
+          }
+          num_row_errors += 
partition_status.second.stats.kudu_stats.num_row_errors;
         }
+        if (has_kudu_stats) 
insert_result->__set_num_row_errors(num_row_errors);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/77a2941a/common/thrift/ImpalaService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaService.thrift 
b/common/thrift/ImpalaService.thrift
index 68a0588..ac1cf4a 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -249,12 +249,20 @@ enum TImpalaQueryOptions {
   ENABLE_EXPR_REWRITES
 }
 
-// The summary of an insert.
+// The summary of a DML statement.
+// TODO: Rename to reflect that this is for all DML.
 struct TInsertResult {
   // Number of modified rows per partition. Only applies to HDFS and Kudu 
tables.
   // The keys represent partitions to create, coded as k1=v1/k2=v2/k3=v3..., 
with
   // the root in an unpartitioned table being the empty string.
   1: required map<string, i64> rows_modified
+
+  // Number of row operations attempted but not completed due to non-fatal 
errors
+  // reported by the storage engine that Impala treats as warnings. Only 
applies to Kudu
+  // tables. This includes errors due to duplicate/missing primary keys, 
nullability
+  // constraint violations, and primary keys in uncovered partition ranges.
+  // TODO: Provide a detailed breakdown of these counts by error. IMPALA-4416.
+  2: optional i64 num_row_errors
 }
 
 // Response from a call to PingImpalaService

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/77a2941a/shell/impala_client.py
----------------------------------------------------------------------
diff --git a/shell/impala_client.py b/shell/impala_client.py
index f84c14e..137a747 100755
--- a/shell/impala_client.py
+++ b/shell/impala_client.py
@@ -345,8 +345,11 @@ class ImpalaClient(object):
         if not result.has_more:
           break
 
-  def close_insert(self, last_query_handle):
-    """Fetches the results of an INSERT query"""
+  def close_dml(self, last_query_handle):
+    """Fetches the results of a DML query. Returns a tuple containing the
+       number of rows modified and the number of row errors, in that order. If 
the DML
+       operation doesn't return 'num_row_errors', then the second element in 
the tuple
+       is None."""
     rpc_result = self._do_rpc(
         lambda: self.imp_service.CloseInsert(last_query_handle))
     insert_result, status = rpc_result
@@ -355,7 +358,7 @@ class ImpalaClient(object):
       raise RPCException()
 
     num_rows = sum([int(k) for k in insert_result.rows_modified.values()])
-    return num_rows
+    return (num_rows, insert_result.num_row_errors)
 
   def close_query(self, last_query_handle, query_handle_closed=False):
     """Close the query handle"""

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/77a2941a/shell/impala_shell.py
----------------------------------------------------------------------
diff --git a/shell/impala_shell.py b/shell/impala_shell.py
index 74d7ecb..00aff33 100755
--- a/shell/impala_shell.py
+++ b/shell/impala_shell.py
@@ -106,8 +106,8 @@ class ImpalaShell(cmd.Cmd):
   # Variable names are prefixed with the following string
   VAR_PREFIXES = [ 'VAR', 'HIVEVAR' ]
   DEFAULT_DB = 'default'
-  # Regex applied to all tokens of a query to detect the query type.
-  INSERT_REGEX = re.compile("^insert$", re.I)
+  # Regex applied to all tokens of a query to detect DML statements.
+  DML_REGEX = re.compile("^(insert|upsert|update|delete)$", re.I)
   # Seperator for queries in the history file.
   HISTORY_FILE_QUERY_DELIM = '_IMP_DELIM_'
 
@@ -772,7 +772,7 @@ class ImpalaShell(cmd.Cmd):
     return self._execute_stmt(query)
 
   def do_profile(self, args):
-    """Prints the runtime profile of the last INSERT or SELECT query 
executed."""
+    """Prints the runtime profile of the last DML statement or SELECT query 
executed."""
     if len(args) > 0:
       print_to_stderr("'profile' does not accept any arguments")
       return CmdStatus.ERROR
@@ -856,13 +856,13 @@ class ImpalaShell(cmd.Cmd):
                                              "#Rows", "Est. #Rows", "Peak Mem",
                                              "Est. Peak Mem", "Detail"])
 
-  def _execute_stmt(self, query, is_insert=False, print_web_link=False):
+  def _execute_stmt(self, query, is_dml=False, print_web_link=False):
     """ The logic of executing any query statement
 
     The client executes the query and the query_handle is returned immediately,
     even as the client waits for the query to finish executing.
 
-    If the query was not an insert, the results are fetched from the client
+    If the query was not dml, the results are fetched from the client
     as they are streamed in, through the use of a generator.
 
     The execution time is printed and the query is closed if it hasn't been 
already
@@ -892,10 +892,10 @@ class ImpalaShell(cmd.Cmd):
       # Reset the progress stream.
       self.progress_stream.clear()
 
-      if is_insert:
+      if is_dml:
         # retrieve the error log
         warning_log = self.imp_client.get_warning_log(self.last_query_handle)
-        num_rows = self.imp_client.close_insert(self.last_query_handle)
+        (num_rows, num_row_errors) = 
self.imp_client.close_dml(self.last_query_handle)
       else:
         # impalad does not support the fetching of metadata for certain types 
of queries.
         if not self.imp_client.expect_result_metadata(query.query):
@@ -920,13 +920,21 @@ class ImpalaShell(cmd.Cmd):
 
       if warning_log:
         self._print_if_verbose(warning_log)
-      # print insert when is_insert is true (which is 1)
-      # print fetch when is_insert is false (which is 0)
-      verb = ["Fetch", "Insert"][is_insert]
-      self._print_if_verbose("%sed %d row(s) in %2.2fs" % (verb, num_rows,
-                                                               end_time - 
start_time))
+      # print 'Modified' when is_dml is true (i.e. 1), or 'Fetched' otherwise.
+      verb = ["Fetched", "Modified"][is_dml]
+      time_elapsed = end_time - start_time
+
+      # Add the number of row errors if this DML and the operation supports it.
+      # num_row_errors is None if the DML operation doesn't return it.
+      if is_dml and num_row_errors is not None:
+        error_report = ", %d row error(s)" % (num_row_errors)
+      else:
+        error_report = ""
+
+      self._print_if_verbose("%s %d row(s)%s in %2.2fs" %\
+          (verb, num_rows, error_report, time_elapsed))
 
-      if not is_insert:
+      if not is_dml:
         self.imp_client.close_query(self.last_query_handle, 
self.query_handle_closed)
       self.query_handle_closed = True
 
@@ -989,12 +997,12 @@ class ImpalaShell(cmd.Cmd):
     # to deal with escaped quotes in string literals
     lexer = shlex.shlex(query.query.lstrip(), posix=True)
     lexer.escapedquotes += "'"
-    # Because the WITH clause may precede INSERT or SELECT queries,
+    # Because the WITH clause may precede DML or SELECT queries,
     # just checking the first token is insufficient.
-    is_insert = False
+    is_dml = False
     tokens = list(lexer)
-    if filter(self.INSERT_REGEX.match, tokens): is_insert = True
-    return self._execute_stmt(query, is_insert=is_insert)
+    if filter(self.DML_REGEX.match, tokens): is_dml = True
+    return self._execute_stmt(query, is_dml=is_dml)
 
   def do_use(self, args):
     """Executes a USE... query"""
@@ -1020,11 +1028,23 @@ class ImpalaShell(cmd.Cmd):
   def do_desc(self, args):
     return self.do_describe(args)
 
-  def do_insert(self, args):
-    """Executes an INSERT query"""
-    query = self.imp_client.create_beeswax_query("insert %s" % args,
+  def __do_dml(self, stmt, args):
+    """Executes a DML query"""
+    query = self.imp_client.create_beeswax_query("%s %s" % (stmt, args),
                                                  self.set_query_options)
-    return self._execute_stmt(query, is_insert=True, print_web_link=True)
+    return self._execute_stmt(query, is_dml=True, print_web_link=True)
+
+  def do_upsert(self, args):
+    return self.__do_dml("upsert", args)
+
+  def do_update(self, args):
+    return self.__do_dml("update", args)
+
+  def do_delete(self, args):
+    return self.__do_dml("delete", args)
+
+  def do_insert(self, args):
+    return self.__do_dml("insert", args)
 
   def do_explain(self, args):
     """Explain the query execution plan"""

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/77a2941a/tests/shell/test_shell_commandline.py
----------------------------------------------------------------------
diff --git a/tests/shell/test_shell_commandline.py 
b/tests/shell/test_shell_commandline.py
index 16a68ae..d880c2d 100644
--- a/tests/shell/test_shell_commandline.py
+++ b/tests/shell/test_shell_commandline.py
@@ -462,6 +462,43 @@ class TestImpalaShell(ImpalaTestSuite):
     results = run_impala_shell_cmd('--query="show tables"')
     self._validate_shell_messages(results.stderr, shell_messages, 
should_exist=False)
 
+  def test_insert_status(self, unique_database):
+    run_impala_shell_cmd('--query="create table %s.insert_test (id int)"' %
+        unique_database)
+    results = run_impala_shell_cmd('--query="insert into %s.insert_test values 
(1)"' %
+        unique_database)
+    assert "Modified 1 row(s)" in results.stderr
+
+  def _validate_dml_stmt(self, stmt, expected_rows_modified, 
expected_row_errors):
+    results = run_impala_shell_cmd('--query="%s"' % (stmt))
+    expected_output = "Modified %d row(s), %d row error(s)" %\
+        (expected_rows_modified, expected_row_errors)
+    assert expected_output in results.stderr
+
+  def test_kudu_dml_reporting(self, unique_database):
+    db = unique_database
+    run_impala_shell_cmd('--query="create table %s.dml_test (id int primary 
key, '\
+        'age int) distribute by hash(id) into 2 buckets stored as kudu"' % db)
+
+    self._validate_dml_stmt("insert into %s.dml_test (id) values (7), (7)" % 
db, 1, 1)
+    self._validate_dml_stmt("insert into %s.dml_test (id) values (7)" % db, 0, 
1)
+    self._validate_dml_stmt("upsert into %s.dml_test (id) values (7), (7)" % 
db, 2, 0)
+    self._validate_dml_stmt("update %s.dml_test set age = 1 where id = 7" % 
db, 1, 0)
+    self._validate_dml_stmt("delete from %s.dml_test where id = 7" % db, 1, 0)
+
+    # UPDATE/DELETE where there are no matching rows; there are no errors 
because the
+    # scan produced no rows.
+    self._validate_dml_stmt("update %s.dml_test set age = 1 where id = 8" % 
db, 0, 0)
+    self._validate_dml_stmt("delete from %s.dml_test where id = 7" % db, 0, 0)
+
+    # WITH clauses, only apply to INSERT and UPSERT
+    self._validate_dml_stmt(\
+        "with y as (values(7)) insert into %s.dml_test (id) select * from y" % 
db, 1, 0)
+    self._validate_dml_stmt(\
+        "with y as (values(7)) insert into %s.dml_test (id) select * from y" % 
db, 0, 1)
+    self._validate_dml_stmt(\
+        "with y as (values(7)) upsert into %s.dml_test (id) select * from y" % 
db, 1, 0)
+
   def test_missing_query_file(self):
     result = run_impala_shell_cmd('-f nonexistent.sql', expect_success=False)
     assert "Could not open file 'nonexistent.sql'" in result.stderr

Reply via email to