This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit b121a40d20107bad6c04732ba580f26639acd43a
Author: Riza Suminto <[email protected]>
AuthorDate: Wed Mar 26 16:28:34 2025 -0700

    IMPALA-13909: Remove cursor fixture from custom_cluster/test_kudu.py
    
    This patch remove deprecated cursor fixture in
    custom_cluster/test_kudu.py. It is replaced with assert_num_row() method
    that creates a fresh hs2 client. All test class in
    custom_cluster/test_kudu.py now extend CustomKuduTest and use hs2
    protocol as default test protocol.
    
    Testing:
    - Run and pass custom_cluster/test_kudu.py
    
    Change-Id: I046bf987dd16ecdf493d999e86191d85210f2de5
    Reviewed-on: http://gerrit.cloudera.org:8080/22698
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../queries/QueryTest/kudu_create.test             |  10 +-
 tests/custom_cluster/test_kudu.py                  | 327 ++++++++++-----------
 2 files changed, 165 insertions(+), 172 deletions(-)

diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/kudu_create.test 
b/testdata/workloads/functional-query/queries/QueryTest/kudu_create.test
index 6ee28c8e5..37f8fda42 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/kudu_create.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_create.test
@@ -132,8 +132,6 @@ create table ignore_column_case (Id int, NAME string, vAlf 
float, vali bigint,
 ====
 ---- QUERY
 insert into ignore_column_case values (1, 'Martin', 1.0, 10);
----- RESULTS
-: 1
 ---- RUNTIME_PROFILE
 NumModifiedRows: 1
 NumRowErrors: 0
@@ -168,6 +166,8 @@ X, I1, I2, I3, I4, VALS, VALF, VALD, VALB, VALDEC4, 
VALDEC8, VALDEC16, VALDATE,
 1,NULL,NULL,NULL,NULL,'NULL',NULL,NULL,NULL,NULL,NULL,NULL,NULL,'NULL'
 ---- TYPES
 
INT,TINYINT,SMALLINT,INT,BIGINT,STRING,FLOAT,DOUBLE,BOOLEAN,DECIMAL,DECIMAL,DECIMAL,DATE,STRING
+---- HS2_TYPES
+INT,TINYINT,SMALLINT,INT,BIGINT,STRING,FLOAT,DOUBLE,BOOLEAN,DECIMAL,DECIMAL,DECIMAL,DATE,VARCHAR
 ====
 ---- QUERY
 # Overlapping ranges are rejected by the Kudu client
@@ -452,6 +452,8 @@ VALVC
 'z'
 ---- TYPES
 STRING
+---- HS2_TYPES
+VARCHAR
 ====
 ---- QUERY
 # Creates as select table with varchar columns and primary key
@@ -469,6 +471,8 @@ select * from ctas_varchar;
 'c'
 ---- TYPES
 STRING
+---- HS2_TYPES
+VARCHAR
 ====
 ---- QUERY
 # Create with keyword 'stored by'
@@ -493,8 +497,6 @@ stored as kudu
 ====
 ---- QUERY
 insert into non_unique_key_create_tbl1 values (1,'Martin'), (2,'Smith');
----- RESULTS
-: 2
 ---- RUNTIME_PROFILE
 NumModifiedRows: 2
 NumRowErrors: 0
diff --git a/tests/custom_cluster/test_kudu.py 
b/tests/custom_cluster/test_kudu.py
index 7a7920cca..9ac8f566d 100644
--- a/tests/custom_cluster/test_kudu.py
+++ b/tests/custom_cluster/test_kudu.py
@@ -22,12 +22,11 @@ import pytest
 from copy import deepcopy
 from kudu.schema import INT32
 
-from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
-from tests.common.impala_connection import FINISHED
+from tests.common.impala_connection import FINISHED, 
IMPALA_CONNECTION_EXCEPTION
 from tests.common.kudu_test_suite import KuduTestSuite
 from tests.common.skip import SkipIfKudu, SkipIfBuildType, SkipIf
-from tests.common.test_dimensions import BEESWAX, add_mandatory_exec_option
+from tests.common.test_dimensions import HS2, add_mandatory_exec_option
 from tests.common.test_result_verifier import error_msg_startswith
 
 KUDU_MASTER_HOSTS = pytest.config.option.kudu_master_hosts
@@ -45,7 +44,7 @@ class CustomKuduTest(CustomClusterTestSuite, KuduTestSuite):
     # run_test_case() can produce different result types between beeswax vs 
hs2 protocol
     # in some tests. This fix the test to use beeswax protocol until we can 
migrate
     # to hs2.
-    return BEESWAX
+    return HS2
 
   @classmethod
   def add_custom_cluster_constraints(cls):
@@ -58,6 +57,13 @@ class CustomKuduTest(CustomClusterTestSuite, KuduTestSuite):
         and v.get_value('exec_option')['disable_codegen'] is False
         and v.get_value('exec_option')['num_nodes'] == 0)
 
+  def assert_num_row(self, table_name, expected_num_row):
+    """Assert number of rows in a table using a clean client."""
+    with self.create_impala_client(protocol=self.default_test_protocol()) as 
client:
+      row_num_query = "select count(*) from {0}".format(table_name)
+      num_row = int(self.execute_scalar_expect_success(client, row_num_query))
+      assert expected_num_row == num_row
+
 
 class TestKuduOperations(CustomKuduTest):
 
@@ -90,40 +96,42 @@ class TestKuduOperations(CustomKuduTest):
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(impalad_args="-kudu_master_hosts=")
   @SkipIfKudu.hms_integration_enabled()
-  def test_kudu_master_hosts(self, cursor, kudu_client):
+  def test_kudu_master_hosts(self, kudu_client):
     """Check behavior when -kudu_master_hosts is not provided to catalogd."""
+    client = self.hs2_client
     with self.temp_kudu_table(kudu_client, [INT32]) as kudu_table:
       table_name = self.get_kudu_table_base_name(kudu_table.name)
       props = "TBLPROPERTIES('kudu.table_name'='%s')" % (kudu_table.name)
       try:
-        cursor.execute("CREATE EXTERNAL TABLE %s STORED AS KUDU %s" % 
(table_name,
+        client.execute("CREATE EXTERNAL TABLE %s STORED AS KUDU %s" % 
(table_name,
             props))
         assert False
       except Exception as e:
         assert "Table property 'kudu.master_addresses' is required" in str(e)
 
-      cursor.execute("""
+      client.execute("""
           CREATE EXTERNAL TABLE %s STORED AS KUDU
           TBLPROPERTIES ('kudu.master_addresses' = '%s',
           'kudu.table_name'='%s')
           """ % (table_name, KUDU_MASTER_HOSTS, kudu_table.name))
-      cursor.execute("DROP TABLE %s" % table_name)
+      client.execute("DROP TABLE %s" % table_name)
 
   @pytest.mark.execute_serially
   
@CustomClusterTestSuite.with_args(impalad_args="-kudu_error_buffer_size=1024")
   @SkipIfKudu.hms_integration_enabled()
-  def test_error_buffer_size(self, cursor, unique_database):
+  def test_error_buffer_size(self, unique_database):
     """Check that queries fail if the size of the Kudu client errors they 
generate is
     greater than kudu_error_buffer_size."""
+    client = self.hs2_client
     table_name = "%s.test_error_buffer_size" % unique_database
-    cursor.execute("create table %s (a bigint primary key) stored as kudu" % 
table_name)
+    client.execute("create table %s (a bigint primary key) stored as kudu" % 
table_name)
     # Insert a large number of a constant value into the table to generate 
many "Key
     # already present" errors. 50 errors should fit inside the 1024 byte limit.
-    cursor.execute(
+    client.execute(
         "insert into %s select 1 from functional.alltypes limit 50" % 
table_name)
     try:
       # 200 errors should overflow the 1024 byte limit.
-      cursor.execute(
+      client.execute(
           "insert into %s select 1 from functional.alltypes limit 200" % 
table_name)
       assert False, "Expected: 'Error overflow in Kudu session.'"
     except Exception as e:
@@ -199,45 +207,48 @@ class TestKuduHMSIntegration(CustomKuduTest):
     self.run_test_case('QueryTest/kudu_create', new_vector, 
use_db=unique_database)
 
   @pytest.mark.execute_serially
-  def test_implicit_external_table_props(self, cursor, kudu_client):
+  def test_implicit_external_table_props(self, unique_database, kudu_client):
     """Check that table properties added internally for external table during
        table creation are as expected.
     """
-    db_name = cursor.conn.db_name
-    with self.temp_kudu_table(kudu_client, [INT32], db_name=db_name) as 
kudu_table:
+    client = self.hs2_client
+    with self.temp_kudu_table(
+        kudu_client, [INT32], db_name=unique_database) as kudu_table:
       impala_table_name = self.get_kudu_table_base_name(kudu_table.name)
       external_table_name = "%s_external" % impala_table_name
       props = "TBLPROPERTIES('kudu.table_name'='%s')" % kudu_table.name
-      cursor.execute("CREATE EXTERNAL TABLE %s STORED AS KUDU %s" % (
+      client.execute("CREATE EXTERNAL TABLE %s STORED AS KUDU %s" % (
           external_table_name, props))
-      with self.drop_impala_table_after_context(cursor, external_table_name):
-        cursor.execute("DESCRIBE FORMATTED %s" % external_table_name)
-        table_desc = [[col.strip() if col else col for col in row] for row in 
cursor]
-        # Pytest shows truncated output on failure, so print the details just 
in case.
-        LOG.info(table_desc)
-        assert not any("kudu.table_id" in s for s in table_desc)
-        assert any("Owner:" in s for s in table_desc)
-        assert ["", "EXTERNAL", "TRUE"] in table_desc
-        assert ["", "kudu.master_addresses", KUDU_MASTER_HOSTS] in table_desc
-        assert ["", "kudu.table_name", kudu_table.name] in table_desc
-        assert ["", "storage_handler", 
"org.apache.hadoop.hive.kudu.KuduStorageHandler"] \
-            in table_desc
+      result = client.execute("DESCRIBE FORMATTED %s" % external_table_name)
+      table_desc = [[col.strip() if col else col for col in row]
+                    for row in result.tuples()]
+      # Pytest shows truncated output on failure, so print the details just in 
case.
+      LOG.info(table_desc)
+      assert not any("kudu.table_id" in s for s in table_desc)
+      assert any("Owner:" in s for s in table_desc)
+      assert ["", "EXTERNAL", "TRUE"] in table_desc
+      assert ["", "kudu.master_addresses", KUDU_MASTER_HOSTS] in table_desc
+      assert ["", "kudu.table_name", kudu_table.name] in table_desc
+      assert ["", "storage_handler", 
"org.apache.hadoop.hive.kudu.KuduStorageHandler"] \
+          in table_desc
 
   @pytest.mark.execute_serially
   
@CustomClusterTestSuite.with_args(impalad_args="-kudu_client_rpc_timeout_ms=30000")
-  def test_implicit_managed_table_props(self, cursor, kudu_client, 
unique_database):
+  def test_implicit_managed_table_props(self, kudu_client, unique_database):
     """Check that table properties added internally for managed table during 
table
        creation are as expected. Increase timeout of individual Kudu client 
rpcs to
        avoid requests fail due to operation delay in the Hive Metastore for 
managed
        tables (IMPALA-8856).
     """
+    client = self.hs2_client
     comment = "kudu_comment"
-    cursor.execute("""CREATE TABLE %s.foo (a INT PRIMARY KEY, s STRING) 
PARTITION BY
+    client.execute("""CREATE TABLE %s.foo (a INT PRIMARY KEY, s STRING) 
PARTITION BY
         HASH(a) PARTITIONS 3 COMMENT '%s' STORED AS KUDU""" % 
(unique_database, comment))
     assert kudu_client.table_exists(
       KuduTestSuite.to_kudu_table_name(unique_database, "foo"))
-    cursor.execute("DESCRIBE FORMATTED %s.foo" % unique_database)
-    table_desc = [[col.strip() if col else col for col in row] for row in 
cursor]
+    result = client.execute("DESCRIBE FORMATTED %s.foo" % unique_database)
+    table_desc = [[col.strip() if col else col for col in row]
+                  for row in result.tuples()]
     # Pytest shows truncated output on failure, so print the details just in 
case.
     LOG.info(table_desc)
 
@@ -255,19 +266,20 @@ class TestKuduHMSIntegration(CustomKuduTest):
 
   @pytest.mark.execute_serially
   
@CustomClusterTestSuite.with_args(impalad_args="-kudu_client_rpc_timeout_ms=30000")
-  def test_drop_non_empty_db(self, unique_cursor, kudu_client):
+  def test_drop_non_empty_db(self, unique_database, kudu_client):
     """Check that an attempt to drop a database will fail if Kudu tables are 
present
        and that the tables remain. Increase timeout of individual Kudu client 
rpcs
        to avoid requests fail due to operation delay in the Hive Metastore for 
managed
        tables (IMPALA-8856).
     """
-    db_name = unique_cursor.conn.db_name
+    client = self.hs2_client
+    db_name = unique_database
     with self.temp_kudu_table(kudu_client, [INT32], db_name=db_name) as 
kudu_table:
       assert kudu_client.table_exists(kudu_table.name)
-      unique_cursor.execute("INVALIDATE METADATA")
-      unique_cursor.execute("USE DEFAULT")
+      client.execute("INVALIDATE METADATA")
+      client.execute("USE DEFAULT")
       try:
-        unique_cursor.execute("DROP DATABASE %s" % db_name)
+        client.execute("DROP DATABASE %s" % db_name)
         assert False
       except Exception as e:
         assert "One or more tables exist" in str(e)
@@ -275,44 +287,50 @@ class TestKuduHMSIntegration(CustomKuduTest):
       # Dropping an empty database should succeed, once all tables
       # from the database have been dropped.
       assert kudu_client.table_exists(kudu_table.name)
-      unique_cursor.execute("DROP Table %s" % kudu_table.name)
-      unique_cursor.execute("DROP DATABASE %s" % db_name)
+      client.execute("DROP Table %s" % kudu_table.name)
+      client.execute("DROP DATABASE %s" % db_name)
       assert not kudu_client.table_exists(kudu_table.name)
+      # Create the database again so 'unique_database' fixture can finish 
cleanly.
+      client.execute("CREATE DATABASE %s" % db_name)
 
   @pytest.mark.execute_serially
   
@CustomClusterTestSuite.with_args(impalad_args="-kudu_client_rpc_timeout_ms=30000")
-  def test_drop_db_cascade(self, unique_cursor, kudu_client):
+  def test_drop_db_cascade(self, unique_database, kudu_client):
     """Check that an attempt to drop a database cascade will succeed even if 
Kudu
        tables are present. Make sure the corresponding managed tables are 
removed
        from Kudu. Increase timeout of individual Kudu client rpcs to avoid 
requests
        fail due to operation delay in the Hive Metastore for managed tables 
(IMPALA-8856).
     """
-    db_name = unique_cursor.conn.db_name
+    client = self.hs2_client
+    db_name = unique_database
     with self.temp_kudu_table(kudu_client, [INT32], db_name=db_name) as 
kudu_table:
       assert kudu_client.table_exists(kudu_table.name)
-      unique_cursor.execute("INVALIDATE METADATA")
+      client.execute("INVALIDATE METADATA")
 
       # Create a table in HDFS
       hdfs_table_name = self.random_table_name()
-      unique_cursor.execute("""
+      client.execute("""
           CREATE TABLE %s (a INT) PARTITIONED BY (x INT)""" % 
(hdfs_table_name))
 
-      unique_cursor.execute("USE DEFAULT")
-      unique_cursor.execute("DROP DATABASE %s CASCADE" % db_name)
-      unique_cursor.execute("SHOW DATABASES")
-      assert (db_name, '') not in unique_cursor.fetchall()
+      client.execute("USE DEFAULT")
+      client.execute("DROP DATABASE %s CASCADE" % db_name)
+      result = client.execute("SHOW DATABASES")
+      assert (db_name, '') not in result.tuples()
       assert not kudu_client.table_exists(kudu_table.name)
+      # Create the database again so 'unique_database' fixture can finish 
cleanly.
+      client.execute("CREATE DATABASE %s" % db_name)
 
   @pytest.mark.execute_serially
   
@CustomClusterTestSuite.with_args(impalad_args="-kudu_client_rpc_timeout_ms=30000")
-  def test_drop_managed_kudu_table(self, cursor, kudu_client, unique_database):
+  def test_drop_managed_kudu_table(self, kudu_client, unique_database):
     """Check that dropping a managed Kudu table should fail if the underlying
        Kudu table has been dropped externally. Increase timeout of individual
        Kudu client rpcs to avoid requests fail due to operation delay in the
        Hive Metastore for managed tables (IMPALA-8856).
     """
+    client = self.hs2_client
     impala_tbl_name = "foo"
-    cursor.execute("""CREATE TABLE %s.%s (a INT PRIMARY KEY) PARTITION BY HASH 
(a)
+    client.execute("""CREATE TABLE %s.%s (a INT PRIMARY KEY) PARTITION BY HASH 
(a)
         PARTITIONS 3 STORED AS KUDU""" % (unique_database, impala_tbl_name))
     kudu_tbl_name = KuduTestSuite.to_kudu_table_name(unique_database, 
impala_tbl_name)
     assert kudu_client.table_exists(kudu_tbl_name)
@@ -328,20 +346,21 @@ class TestKuduHMSIntegration(CustomKuduTest):
       assert "Table does not exist: %s" % kudu_tbl_name in str(e)
 
   @pytest.mark.execute_serially
-  def test_drop_external_kudu_table(self, cursor, kudu_client, 
unique_database):
+  def test_drop_external_kudu_table(self, kudu_client, unique_database):
     """Check that Impala can recover from the case where the underlying Kudu 
table of
        an external table is dropped using the Kudu client.
     """
+    client = self.hs2_client
     with self.temp_kudu_table(kudu_client, [INT32], db_name=unique_database) \
         as kudu_table:
       # Create an external Kudu table
       impala_table_name = self.get_kudu_table_base_name(kudu_table.name)
       external_table_name = "%s_external" % impala_table_name
       props = "TBLPROPERTIES('kudu.table_name'='%s')" % kudu_table.name
-      cursor.execute("CREATE EXTERNAL TABLE %s STORED AS KUDU %s" % (
+      client.execute("CREATE EXTERNAL TABLE %s STORED AS KUDU %s" % (
         external_table_name, props))
-      cursor.execute("DESCRIBE %s" % (external_table_name))
-      assert cursor.fetchall() == \
+      result = client.execute("DESCRIBE %s" % (external_table_name))
+      assert result.tuples() == \
              [("a", "int", "", "true", "true", "false", "", "AUTO_ENCODING",
                "DEFAULT_COMPRESSION", "0")]
       # Drop the underlying Kudu table
@@ -349,12 +368,12 @@ class TestKuduHMSIntegration(CustomKuduTest):
       assert not kudu_client.table_exists(kudu_table.name)
       err_msg = 'the table does not exist: table_name: "%s"' % 
(kudu_table.name)
       try:
-        cursor.execute("REFRESH %s" % (external_table_name))
+        client.execute("REFRESH %s" % (external_table_name))
       except Exception as e:
         assert err_msg in str(e)
-      cursor.execute("DROP TABLE %s" % (external_table_name))
-      cursor.execute("SHOW TABLES")
-      assert (external_table_name,) not in cursor.fetchall()
+      client.execute("DROP TABLE %s" % (external_table_name))
+      result = client.execute("SHOW TABLES")
+      assert (external_table_name,) not in result.tuples()
 
   @SkipIfKudu.no_hybrid_clock()
   def test_kudu_alter_table(self, vector, unique_database):
@@ -368,7 +387,7 @@ class TestKuduHMSIntegration(CustomKuduTest):
       use_db=unique_database)
 
 
-class TestKuduTransactionBase(CustomClusterTestSuite):
+class TestKuduTransactionBase(CustomKuduTest):
   """
   This is a base class of other TestKuduTransaction classes.
   """
@@ -393,14 +412,12 @@ class TestKuduTransactionBase(CustomClusterTestSuite):
   _update_query = "update {0} set b='test' where a=1"
   # query to upsert a row in Kudu table.
   _upsert_query = "upsert into {0} values (3, 'hello')"
-  # query to get number of rows.
-  _row_num_query = "select count(*) from {0}"
 
   @classmethod
   def get_workload(cls):
     return 'functional-query'
 
-  def _test_kudu_txn_succeed(self, cursor, unique_database):
+  def _test_kudu_txn_succeed(self, unique_database):
     # Create Kudu table.
     table_name = "%s.test_kudu_txn_succeed" % unique_database
     self.execute_query(self._create_kudu_table_query.format(table_name))
@@ -408,24 +425,21 @@ class TestKuduTransactionBase(CustomClusterTestSuite):
     # Enable Kudu transactions and insert rows to Kudu table.
     self.execute_query("set ENABLE_KUDU_TRANSACTION=true")
     self.execute_query(self._insert_3_rows_query.format(table_name))
-    cursor.execute(self._row_num_query.format(table_name))
-    assert cursor.fetchall() == [(3,)]
+
+    self.assert_num_row(table_name, 3)
     self.execute_query(self._insert_select_query.format(table_name))
-    cursor.execute(self._row_num_query.format(table_name))
-    assert cursor.fetchall() == [(103,)]
+    self.assert_num_row(table_name, 103)
 
     # Disable Kudu transactions and delete all rows from Kudu table.
     # Insert rows to the Kudu table. Should get same results as transaction 
enabled.
     self.execute_query("set ENABLE_KUDU_TRANSACTION=false")
     self.execute_query(self._delete_query.format(table_name))
     self.execute_query(self._insert_3_rows_query.format(table_name))
-    cursor.execute(self._row_num_query.format(table_name))
-    assert cursor.fetchall() == [(3,)]
+    self.assert_num_row(table_name, 3)
     self.execute_query(self._insert_select_query.format(table_name))
-    cursor.execute(self._row_num_query.format(table_name))
-    assert cursor.fetchall() == [(103,)]
+    self.assert_num_row(table_name, 103)
 
-  def _test_kudu_txn_not_implemented(self, cursor, unique_database):
+  def _test_kudu_txn_not_implemented(self, unique_database):
     # Create Kudu table.
     table_name = "%s.test_kudu_txn_succeed" % unique_database
     self.execute_query(self._create_kudu_table_query.format(table_name))
@@ -433,35 +447,33 @@ class TestKuduTransactionBase(CustomClusterTestSuite):
     # Enable Kudu transactions and insert rows to Kudu table.
     self.execute_query("set ENABLE_KUDU_TRANSACTION=true")
     self.execute_query(self._insert_3_rows_query.format(table_name))
-    cursor.execute(self._row_num_query.format(table_name))
-    assert cursor.fetchall() == [(3,)]
+    self.assert_num_row(table_name, 3)
 
     # Kudu only support multi-row transaction for INSERT now. Impala return an 
error
     # if UPDATE/UPSERT/DELETE are performed within a transaction context.
     try:
       self.execute_query(self._update_query.format(table_name))
       assert False, "query was expected to fail"
-    except ImpalaBeeswaxException as e:
+    except IMPALA_CONNECTION_EXCEPTION as e:
       assert error_msg_startswith(str(e), "Kudu reported error: Not 
implemented")
 
     try:
       self.execute_query(self._upsert_query.format(table_name))
       assert False, "query was expected to fail"
-    except ImpalaBeeswaxException as e:
+    except IMPALA_CONNECTION_EXCEPTION as e:
       assert error_msg_startswith(str(e), "Kudu reported error: Not 
implemented")
 
     try:
       self.execute_query(self._delete_query.format(table_name))
       assert False, "query was expected to fail"
-    except ImpalaBeeswaxException as e:
+    except IMPALA_CONNECTION_EXCEPTION as e:
       assert error_msg_startswith(str(e), "Kudu reported error: Not 
implemented")
 
     # Verify that number of rows has not been changed.
-    cursor.execute(self._row_num_query.format(table_name))
-    assert cursor.fetchall() == [(3,)]
+    self.assert_num_row(table_name, 3)
 
-  def _test_kudu_txn_abort_dup_key(self, cursor, unique_database,
-      expect_fail_on_conflict, expected_error_msg):
+  def _test_kudu_txn_abort_dup_key(self, unique_database, 
expect_fail_on_conflict,
+                                   expected_error_msg):
     # Create Kudu table.
     table_name = "%s.test_kudu_txn_abort_dup_key" % unique_database
     self.execute_query(self._create_kudu_table_query.format(table_name))
@@ -472,17 +484,15 @@ class TestKuduTransactionBase(CustomClusterTestSuite):
     try:
       self.execute_query(self._insert_dup_key_query.format(table_name))
       assert (not expect_fail_on_conflict), "query was expected to fail"
-    except ImpalaBeeswaxException as e:
+    except IMPALA_CONNECTION_EXCEPTION as e:
       assert expected_error_msg in str(e)
-    cursor.execute(self._row_num_query.format(table_name))
-    assert cursor.fetchall() == [(0 if expect_fail_on_conflict else 2,)]
+    self.assert_num_row(table_name, 0 if expect_fail_on_conflict else 2)
 
     # Disable Kudu transactions and run the same query. Part of rows are 
inserted into
     # Kudu table.
     self.execute_query("set ENABLE_KUDU_TRANSACTION=false")
     self.execute_query(self._insert_dup_key_query.format(table_name))
-    cursor.execute(self._row_num_query.format(table_name))
-    assert cursor.fetchall() == [(2,)]
+    self.assert_num_row(table_name, 2)
 
     # Delete all rows from Kudu table.
     self.execute_query(self._delete_query.format(table_name))
@@ -490,8 +500,7 @@ class TestKuduTransactionBase(CustomClusterTestSuite):
     table_name2 = "%s.test_kudu_txn_abort_dup_key2" % unique_database
     self.execute_query(self._create_parquet_table_query.format(table_name2))
     self.execute_query(self._insert_dup_key_query.format(table_name2))
-    cursor.execute(self._row_num_query.format(table_name2))
-    assert cursor.fetchall() == [(3,)]
+    self.assert_num_row(table_name2, 3)
 
     # Enable Kudu transactions
     self.execute_query("set ENABLE_KUDU_TRANSACTION=true")
@@ -500,20 +509,18 @@ class TestKuduTransactionBase(CustomClusterTestSuite):
     try:
       self.execute_query(self._insert_select_query2.format(table_name, 
table_name2))
       assert (not expect_fail_on_conflict), "query was expected to fail"
-    except ImpalaBeeswaxException as e:
+    except IMPALA_CONNECTION_EXCEPTION as e:
       assert expected_error_msg in str(e)
-    cursor.execute(self._row_num_query.format(table_name))
-    assert cursor.fetchall() == [(0 if expect_fail_on_conflict else 2,)]
+    self.assert_num_row(table_name, 0 if expect_fail_on_conflict else 2)
 
     # Disable Kudu transactions and run the same query. Part of rows are 
inserted into
     # Kudu table.
     self.execute_query("set ENABLE_KUDU_TRANSACTION=false")
     self.execute_query(self._insert_select_query2.format(table_name, 
table_name2))
-    cursor.execute(self._row_num_query.format(table_name))
-    assert cursor.fetchall() == [(2,)]
+    self.assert_num_row(table_name, 2)
 
-  def _test_kudu_txn_ctas(self, cursor, unique_database, 
expect_fail_on_conflict,
-      expected_error_msg):
+  def _test_kudu_txn_ctas(self, unique_database, expect_fail_on_conflict,
+                          expected_error_msg):
     # Enable Kudu transactions
     self.execute_query("set ENABLE_KUDU_TRANSACTION=true")
 
@@ -521,21 +528,18 @@ class TestKuduTransactionBase(CustomClusterTestSuite):
     table_name1 = "%s.test_kudu_txn_ctas1" % unique_database
     self.execute_query(self._create_kudu_table_query.format(table_name1))
     self.execute_query(self._insert_3_rows_query.format(table_name1))
-    cursor.execute(self._row_num_query.format(table_name1))
-    assert cursor.fetchall() == [(3,)]
+    self.assert_num_row(table_name1, 3)
 
     # Run CTAS query without duplicate rows in source table.
     table_name2 = "%s.test_kudu_txn_ctas2" % unique_database
     self.execute_query(self._ctas_query.format(table_name2, table_name1))
-    cursor.execute(self._row_num_query.format(table_name2))
-    assert cursor.fetchall() == [(3,)]
+    self.assert_num_row(table_name2, 3)
 
     # Create source Parquet table without primary key and insert duplicate 
rows.
     table_name3 = "%s.test_kudu_txn_ctas3" % unique_database
     self.execute_query(self._create_parquet_table_query.format(table_name3))
     self.execute_query(self._insert_dup_key_query.format(table_name3))
-    cursor.execute(self._row_num_query.format(table_name3))
-    assert cursor.fetchall() == [(3,)]
+    self.assert_num_row(table_name3, 3)
 
     # Run CTAS query with duplicate rows in source table.
     # Transaction should be aborted and no rows are inserted into Kudu table.
@@ -543,20 +547,18 @@ class TestKuduTransactionBase(CustomClusterTestSuite):
     try:
       self.execute_query(self._ctas_query.format(table_name4, table_name3))
       assert (not expect_fail_on_conflict), "query was expected to fail"
-    except ImpalaBeeswaxException as e:
+    except IMPALA_CONNECTION_EXCEPTION as e:
       assert expected_error_msg in str(e)
-    cursor.execute(self._row_num_query.format(table_name4))
-    assert cursor.fetchall() == [(0 if expect_fail_on_conflict else 2,)]
+    self.assert_num_row(table_name4, 0 if expect_fail_on_conflict else 2)
 
     # Disable Kudu transactions and run the same CTAS query. Part of rows are 
inserted
     # into Kudu table.
     self.execute_query("set ENABLE_KUDU_TRANSACTION=false")
     table_name5 = "%s.test_kudu_txn_ctas5" % unique_database
     self.execute_query(self._ctas_query.format(table_name5, table_name3))
-    cursor.execute(self._row_num_query.format(table_name5))
-    assert cursor.fetchall() == [(2,)]
+    self.assert_num_row(table_name5, 2)
 
-  def _test_kudu_txn_abort_row_batch(self, cursor, unique_database):
+  def _test_kudu_txn_abort_row_batch(self, unique_database):
     # Create Kudu table.
     table_name = "%s.test_kudu_txn_abort_row_batch" % unique_database
     self.execute_query(self._create_kudu_table_query.format(table_name))
@@ -569,12 +571,11 @@ class TestKuduTransactionBase(CustomClusterTestSuite):
     try:
       self.execute_query(self._insert_3_rows_query.format(table_name), 
query_options)
       assert False, "query was expected to fail"
-    except ImpalaBeeswaxException as e:
+    except IMPALA_CONNECTION_EXCEPTION as e:
       assert "FIS_KUDU_TABLE_SINK_WRITE_BATCH" in str(e)
-    cursor.execute(self._row_num_query.format(table_name))
-    assert cursor.fetchall() == [(0,)]
+    self.assert_num_row(table_name, 0)
 
-  def _test_kudu_txn_abort_partial_rows(self, cursor, unique_database):
+  def _test_kudu_txn_abort_partial_rows(self, unique_database):
     # Create Kudu table.
     table_name = "%s.test_kudu_txn_abort_partial_rows" % unique_database
     self.execute_query(self._create_kudu_table_query.format(table_name))
@@ -587,12 +588,11 @@ class TestKuduTransactionBase(CustomClusterTestSuite):
     try:
       self.execute_query(self._insert_3_rows_query.format(table_name), 
query_options)
       assert False, "query was expected to fail"
-    except ImpalaBeeswaxException as e:
+    except IMPALA_CONNECTION_EXCEPTION as e:
       assert "FIS_KUDU_TABLE_SINK_WRITE_PARTIAL_ROW" in str(e)
-    cursor.execute(self._row_num_query.format(table_name))
-    assert cursor.fetchall() == [(0,)]
+    self.assert_num_row(table_name, 0)
 
-  def _test_kudu_txn_abort_partition_lock(self, cursor, unique_database):
+  def _test_kudu_txn_abort_partition_lock(self, unique_database):
     # Running two separate queries that are inserting to the same Kudu 
partitions.
     # Verify that one of the queries should fail, given Kudu's current 
implementation
     # of partition locking.
@@ -617,7 +617,7 @@ class TestKuduTransactionBase(CustomClusterTestSuite):
     try:
       self.execute_query(query)
       assert False, "query was expected to fail"
-    except ImpalaBeeswaxException as e:
+    except IMPALA_CONNECTION_EXCEPTION as e:
       assert "aborted since it tries to acquire the partition lock that is 
held by " \
           "another transaction" in str(e)
 
@@ -632,42 +632,42 @@ class TestKuduTransaction(TestKuduTransactionBase):
 
   @pytest.mark.execute_serially
   @SkipIfKudu.no_hybrid_clock()
-  def test_kudu_txn_succeed(self, cursor, unique_database):
-    self._test_kudu_txn_succeed(cursor, unique_database)
+  def test_kudu_txn_succeed(self, unique_database):
+    self._test_kudu_txn_succeed(unique_database)
 
   @pytest.mark.execute_serially
   @SkipIfKudu.no_hybrid_clock()
-  def test_kudu_txn_not_implemented(self, cursor, unique_database):
-    self._test_kudu_txn_not_implemented(cursor, unique_database)
+  def test_kudu_txn_not_implemented(self, unique_database):
+    self._test_kudu_txn_not_implemented(unique_database)
 
   @pytest.mark.execute_serially
   @SkipIfKudu.no_hybrid_clock()
-  def test_kudu_txn_abort_dup_key(self, cursor, unique_database):
-    self._test_kudu_txn_abort_dup_key(cursor, unique_database, True,
+  def test_kudu_txn_abort_dup_key(self, unique_database):
+    self._test_kudu_txn_abort_dup_key(unique_database, True,
         self._duplicate_key_error)
 
   @pytest.mark.execute_serially
   @SkipIfKudu.no_hybrid_clock()
-  def test_kudu_txn_ctas(self, cursor, unique_database):
-    self._test_kudu_txn_ctas(cursor, unique_database, True, 
self._duplicate_key_error)
+  def test_kudu_txn_ctas(self, unique_database):
+    self._test_kudu_txn_ctas(unique_database, True, self._duplicate_key_error)
 
   @pytest.mark.execute_serially
   @SkipIfKudu.no_hybrid_clock()
   @SkipIfBuildType.not_dev_build
-  def test_kudu_txn_abort_row_batch(self, cursor, unique_database):
-    self._test_kudu_txn_abort_row_batch(cursor, unique_database)
+  def test_kudu_txn_abort_row_batch(self, unique_database):
+    self._test_kudu_txn_abort_row_batch(unique_database)
 
   @pytest.mark.execute_serially
   @SkipIfKudu.no_hybrid_clock()
   @SkipIfBuildType.not_dev_build
-  def test_kudu_txn_abort_partial_rows(self, cursor, unique_database):
-    self._test_kudu_txn_abort_partial_rows(cursor, unique_database)
+  def test_kudu_txn_abort_partial_rows(self, unique_database):
+    self._test_kudu_txn_abort_partial_rows(unique_database)
 
   @pytest.mark.execute_serially
   @SkipIfKudu.no_hybrid_clock()
   @SkipIfBuildType.not_dev_build
-  def test_kudu_txn_abort_partition_lock(self, cursor, unique_database):
-    self._test_kudu_txn_abort_partition_lock(cursor, unique_database)
+  def test_kudu_txn_abort_partition_lock(self, unique_database):
+    self._test_kudu_txn_abort_partition_lock(unique_database)
 
 
 class TestKuduTransactionNoIgnore(TestKuduTransactionBase):
@@ -679,53 +679,53 @@ class 
TestKuduTransactionNoIgnore(TestKuduTransactionBase):
   # impalad args to start the cluster.
   _impalad_args = "--kudu_ignore_conflicts=false"
   # expected error message from kudu on duplicated key.
-  _duplicate_key_error = "Key already present in Kudu table"
+  _duplicate_key_error = "Kudu reported error: Already present: key already 
present"
 
   @pytest.mark.execute_serially
   @SkipIfKudu.no_hybrid_clock()
   @CustomClusterTestSuite.with_args(impalad_args=_impalad_args)
-  def test_kudu_txn_succeed(self, cursor, unique_database):
-    self._test_kudu_txn_succeed(cursor, unique_database)
+  def test_kudu_txn_succeed(self, unique_database):
+    self._test_kudu_txn_succeed(unique_database)
 
   @pytest.mark.execute_serially
   @SkipIfKudu.no_hybrid_clock()
   @CustomClusterTestSuite.with_args(impalad_args=_impalad_args)
-  def test_kudu_txn_not_implemented(self, cursor, unique_database):
-    self._test_kudu_txn_not_implemented(cursor, unique_database)
+  def test_kudu_txn_not_implemented(self, unique_database):
+    self._test_kudu_txn_not_implemented(unique_database)
 
   @pytest.mark.execute_serially
   @SkipIfKudu.no_hybrid_clock()
   @CustomClusterTestSuite.with_args(impalad_args=_impalad_args)
-  def test_kudu_txn_abort_dup_key(self, cursor, unique_database):
-    self._test_kudu_txn_abort_dup_key(cursor, unique_database, True,
+  def test_kudu_txn_abort_dup_key(self, unique_database):
+    self._test_kudu_txn_abort_dup_key(unique_database, True,
         self._duplicate_key_error)
 
   @pytest.mark.execute_serially
   @SkipIfKudu.no_hybrid_clock()
   @CustomClusterTestSuite.with_args(impalad_args=_impalad_args)
-  def test_kudu_txn_ctas(self, cursor, unique_database):
-    self._test_kudu_txn_ctas(cursor, unique_database, True, 
self._duplicate_key_error)
+  def test_kudu_txn_ctas(self, unique_database):
+    self._test_kudu_txn_ctas(unique_database, True, self._duplicate_key_error)
 
   @pytest.mark.execute_serially
   @SkipIfKudu.no_hybrid_clock()
   @SkipIfBuildType.not_dev_build
   @CustomClusterTestSuite.with_args(impalad_args=_impalad_args)
-  def test_kudu_txn_abort_row_batch(self, cursor, unique_database):
-    self._test_kudu_txn_abort_row_batch(cursor, unique_database)
+  def test_kudu_txn_abort_row_batch(self, unique_database):
+    self._test_kudu_txn_abort_row_batch(unique_database)
 
   @pytest.mark.execute_serially
   @SkipIfKudu.no_hybrid_clock()
   @SkipIfBuildType.not_dev_build
   @CustomClusterTestSuite.with_args(impalad_args=_impalad_args)
-  def test_kudu_txn_abort_partial_rows(self, cursor, unique_database):
-    self._test_kudu_txn_abort_partial_rows(cursor, unique_database)
+  def test_kudu_txn_abort_partial_rows(self, unique_database):
+    self._test_kudu_txn_abort_partial_rows(unique_database)
 
   @pytest.mark.execute_serially
   @SkipIfKudu.no_hybrid_clock()
   @SkipIfBuildType.not_dev_build
   @CustomClusterTestSuite.with_args(impalad_args=_impalad_args)
-  def test_kudu_txn_abort_partition_lock(self, cursor, unique_database):
-    self._test_kudu_txn_abort_partition_lock(cursor, unique_database)
+  def test_kudu_txn_abort_partition_lock(self, unique_database):
+    self._test_kudu_txn_abort_partition_lock(unique_database)
 
 
 class TestKuduTransactionIgnoreConflict(TestKuduTransactionBase):
@@ -741,14 +741,14 @@ class 
TestKuduTransactionIgnoreConflict(TestKuduTransactionBase):
   @pytest.mark.execute_serially
   @SkipIfKudu.no_hybrid_clock()
   @CustomClusterTestSuite.with_args(impalad_args=_impalad_args)
-  def test_kudu_txn_dup_key(self, cursor, unique_database):
-    self._test_kudu_txn_abort_dup_key(cursor, unique_database, False, "no 
error")
+  def test_kudu_txn_dup_key(self, unique_database):
+    self._test_kudu_txn_abort_dup_key(unique_database, False, "no error")
 
   @pytest.mark.execute_serially
   @SkipIfKudu.no_hybrid_clock()
   @CustomClusterTestSuite.with_args(impalad_args=_impalad_args)
-  def test_kudu_txn_ctas(self, cursor, unique_database):
-    self._test_kudu_txn_ctas(cursor, unique_database, False, "no error")
+  def test_kudu_txn_ctas(self, unique_database):
+    self._test_kudu_txn_ctas(unique_database, False, "no error")
 
 
 @SkipIf.is_test_jdk
@@ -762,8 +762,6 @@ class TestKuduTxnKeepalive(CustomKuduTest):
       "partition by hash(a) partitions 8 stored as kudu"
   # queries to insert rows into Kudu table.
   _insert_3_rows_query = "insert into {0} values (0, 'a'), (1, 'b'), (2, 'c')"
-  # query to get number of rows.
-  _row_num_query = "select count(*) from {0}"
 
   @classmethod
   def get_workload(cls):
@@ -785,7 +783,7 @@ class TestKuduTxnKeepalive(CustomKuduTest):
   @pytest.mark.execute_serially
   @SkipIfKudu.no_hybrid_clock()
   @SkipIfBuildType.not_dev_build
-  def test_kudu_txn_heartbeat(self, cursor, unique_database):
+  def test_kudu_txn_heartbeat(self, unique_database):
     # Create Kudu table.
     table_name = "%s.test_kudu_txn_heartbeat" % unique_database
     self.execute_query(self._create_kudu_table_query.format(table_name))
@@ -796,11 +794,10 @@ class TestKuduTxnKeepalive(CustomKuduTest):
     self.execute_query("set ENABLE_KUDU_TRANSACTION=true")
     query_options = {'debug_action': 
'FIS_KUDU_TABLE_SINK_CREATE_SESSION:SLEEP@10000'}
     self.execute_query(self._insert_3_rows_query.format(table_name), 
query_options)
-    cursor.execute(self._row_num_query.format(table_name))
-    assert cursor.fetchall() == [(3,)]
+    self.assert_num_row(table_name, 3)
 
 
-class TestKuduDmlConflictBase(CustomClusterTestSuite):
+class TestKuduDmlConflictBase(CustomKuduTest):
   """
   This is a base class of other TestKuduDml classes.
   """
@@ -823,8 +820,6 @@ class TestKuduDmlConflictBase(CustomClusterTestSuite):
   _delete_by_key_query = "delete from {0} where a = {1}"
   # query to drop all rows from Kudu table.
   _delete_all_query = "delete from {0}"
-  # query to get number of rows.
-  _row_num_query = "select count(*) from {0}"
 
   @classmethod
   def get_workload(cls):
@@ -862,7 +857,7 @@ class TestKuduDmlConflictBase(CustomClusterTestSuite):
       self.client.close_query(fast_handle)
       self.client.close_query(slow_handle)
 
-  def _test_insert_update_delete(self, cursor, unique_database,
+  def _test_insert_update_delete(self, unique_database,
       expect_log_on_conflict):
     """
     Do sequence of insert, update, and delete query with conflicting primary 
keys.
@@ -877,8 +872,7 @@ class TestKuduDmlConflictBase(CustomClusterTestSuite):
     result = self.execute_query(self._insert_dup_key_query.format(table_name))
     self._check_errors(result.runtime_profile, expect_log_on_conflict,
         "Key already present in Kudu table", 3)
-    cursor.execute(self._row_num_query.format(table_name))
-    assert cursor.fetchall() == [(3,)]
+    self.assert_num_row(table_name, 3)
 
     # Update rows with some constraint violation.
     # Error message should exist in profile regardless of 
kudu_ignore_conflicts value.
@@ -892,22 +886,19 @@ class TestKuduDmlConflictBase(CustomClusterTestSuite):
     update_query = self._update_by_key_query.format(table_name, 1)
     self._race_queries(delete_query, update_query, expect_log_on_conflict,
         "Not found in Kudu table", 1)
-    cursor.execute(self._row_num_query.format(table_name))
-    assert cursor.fetchall() == [(2,)]
+    self.assert_num_row(table_name, 2)
 
     # Delete row with non-existent primary key by racing it against another 
concurrent
     # delete.
     delete_query = self._delete_by_key_query.format(table_name, 2)
     self._race_queries(delete_query, delete_query, expect_log_on_conflict,
         "Not found in Kudu table", 1)
-    cursor.execute(self._row_num_query.format(table_name))
-    assert cursor.fetchall() == [(1,)]
+    self.assert_num_row(table_name, 1)
 
     # Delete all rows. Expect no errors.
     result = self.execute_query(self._delete_all_query.format(table_name))
     self._check_errors(result.runtime_profile, True, "\n", 0)
-    cursor.execute(self._row_num_query.format(table_name))
-    assert cursor.fetchall() == [(0,)]
+    self.assert_num_row(table_name, 0)
 
 
 class TestKuduDmlConflictNoError(TestKuduDmlConflictBase):
@@ -917,8 +908,8 @@ class TestKuduDmlConflictNoError(TestKuduDmlConflictBase):
 
   @pytest.mark.execute_serially
   @SkipIfKudu.no_hybrid_clock()
-  def test_insert_update_delete(self, cursor, unique_database):
-    self._test_insert_update_delete(cursor, unique_database, False)
+  def test_insert_update_delete(self, unique_database):
+    self._test_insert_update_delete(unique_database, False)
 
 
 class TestKuduDmlConflictLogError(TestKuduDmlConflictBase):
@@ -932,5 +923,5 @@ class TestKuduDmlConflictLogError(TestKuduDmlConflictBase):
   @pytest.mark.execute_serially
   @SkipIfKudu.no_hybrid_clock()
   @CustomClusterTestSuite.with_args(impalad_args=_impalad_args)
-  def test_insert_update_delete(self, cursor, unique_database):
-    self._test_insert_update_delete(cursor, unique_database, True)
+  def test_insert_update_delete(self, unique_database):
+    self._test_insert_update_delete(unique_database, True)


Reply via email to