This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit b121a40d20107bad6c04732ba580f26639acd43a Author: Riza Suminto <[email protected]> AuthorDate: Wed Mar 26 16:28:34 2025 -0700 IMPALA-13909: Remove cursor fixture from custom_cluster/test_kudu.py This patch remove deprecated cursor fixture in custom_cluster/test_kudu.py. It is replaced with assert_num_row() method that creates a fresh hs2 client. All test class in custom_cluster/test_kudu.py now extend CustomKuduTest and use hs2 protocol as default test protocol. Testing: - Run and pass custom_cluster/test_kudu.py Change-Id: I046bf987dd16ecdf493d999e86191d85210f2de5 Reviewed-on: http://gerrit.cloudera.org:8080/22698 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- .../queries/QueryTest/kudu_create.test | 10 +- tests/custom_cluster/test_kudu.py | 327 ++++++++++----------- 2 files changed, 165 insertions(+), 172 deletions(-) diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu_create.test b/testdata/workloads/functional-query/queries/QueryTest/kudu_create.test index 6ee28c8e5..37f8fda42 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/kudu_create.test +++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_create.test @@ -132,8 +132,6 @@ create table ignore_column_case (Id int, NAME string, vAlf float, vali bigint, ==== ---- QUERY insert into ignore_column_case values (1, 'Martin', 1.0, 10); ----- RESULTS -: 1 ---- RUNTIME_PROFILE NumModifiedRows: 1 NumRowErrors: 0 @@ -168,6 +166,8 @@ X, I1, I2, I3, I4, VALS, VALF, VALD, VALB, VALDEC4, VALDEC8, VALDEC16, VALDATE, 1,NULL,NULL,NULL,NULL,'NULL',NULL,NULL,NULL,NULL,NULL,NULL,NULL,'NULL' ---- TYPES INT,TINYINT,SMALLINT,INT,BIGINT,STRING,FLOAT,DOUBLE,BOOLEAN,DECIMAL,DECIMAL,DECIMAL,DATE,STRING +---- HS2_TYPES +INT,TINYINT,SMALLINT,INT,BIGINT,STRING,FLOAT,DOUBLE,BOOLEAN,DECIMAL,DECIMAL,DECIMAL,DATE,VARCHAR ==== ---- QUERY # Overlapping ranges are rejected by the Kudu client @@ -452,6 +452,8 @@ VALVC 'z' ---- TYPES STRING +---- HS2_TYPES +VARCHAR ==== ---- QUERY # Creates as select table with varchar columns and primary key @@ -469,6 +471,8 @@ select * from ctas_varchar; 'c' ---- TYPES STRING +---- HS2_TYPES +VARCHAR ==== ---- QUERY # Create with keyword 'stored by' @@ -493,8 +497,6 @@ stored as kudu ==== ---- QUERY insert into non_unique_key_create_tbl1 values (1,'Martin'), (2,'Smith'); ----- RESULTS -: 2 ---- RUNTIME_PROFILE NumModifiedRows: 2 NumRowErrors: 0 diff --git a/tests/custom_cluster/test_kudu.py b/tests/custom_cluster/test_kudu.py index 7a7920cca..9ac8f566d 100644 --- a/tests/custom_cluster/test_kudu.py +++ b/tests/custom_cluster/test_kudu.py @@ -22,12 +22,11 @@ import pytest from copy import deepcopy from kudu.schema import INT32 -from tests.beeswax.impala_beeswax import ImpalaBeeswaxException from tests.common.custom_cluster_test_suite import CustomClusterTestSuite -from tests.common.impala_connection import FINISHED +from tests.common.impala_connection import FINISHED, IMPALA_CONNECTION_EXCEPTION from tests.common.kudu_test_suite import KuduTestSuite from tests.common.skip import SkipIfKudu, SkipIfBuildType, SkipIf -from tests.common.test_dimensions import BEESWAX, add_mandatory_exec_option +from tests.common.test_dimensions import HS2, add_mandatory_exec_option from tests.common.test_result_verifier import error_msg_startswith KUDU_MASTER_HOSTS = pytest.config.option.kudu_master_hosts @@ -45,7 +44,7 @@ class CustomKuduTest(CustomClusterTestSuite, KuduTestSuite): # run_test_case() can produce different result types between beeswax vs hs2 protocol # in some tests. This fix the test to use beeswax protocol until we can migrate # to hs2. - return BEESWAX + return HS2 @classmethod def add_custom_cluster_constraints(cls): @@ -58,6 +57,13 @@ class CustomKuduTest(CustomClusterTestSuite, KuduTestSuite): and v.get_value('exec_option')['disable_codegen'] is False and v.get_value('exec_option')['num_nodes'] == 0) + def assert_num_row(self, table_name, expected_num_row): + """Assert number of rows in a table using a clean client.""" + with self.create_impala_client(protocol=self.default_test_protocol()) as client: + row_num_query = "select count(*) from {0}".format(table_name) + num_row = int(self.execute_scalar_expect_success(client, row_num_query)) + assert expected_num_row == num_row + class TestKuduOperations(CustomKuduTest): @@ -90,40 +96,42 @@ class TestKuduOperations(CustomKuduTest): @pytest.mark.execute_serially @CustomClusterTestSuite.with_args(impalad_args="-kudu_master_hosts=") @SkipIfKudu.hms_integration_enabled() - def test_kudu_master_hosts(self, cursor, kudu_client): + def test_kudu_master_hosts(self, kudu_client): """Check behavior when -kudu_master_hosts is not provided to catalogd.""" + client = self.hs2_client with self.temp_kudu_table(kudu_client, [INT32]) as kudu_table: table_name = self.get_kudu_table_base_name(kudu_table.name) props = "TBLPROPERTIES('kudu.table_name'='%s')" % (kudu_table.name) try: - cursor.execute("CREATE EXTERNAL TABLE %s STORED AS KUDU %s" % (table_name, + client.execute("CREATE EXTERNAL TABLE %s STORED AS KUDU %s" % (table_name, props)) assert False except Exception as e: assert "Table property 'kudu.master_addresses' is required" in str(e) - cursor.execute(""" + client.execute(""" CREATE EXTERNAL TABLE %s STORED AS KUDU TBLPROPERTIES ('kudu.master_addresses' = '%s', 'kudu.table_name'='%s') """ % (table_name, KUDU_MASTER_HOSTS, kudu_table.name)) - cursor.execute("DROP TABLE %s" % table_name) + client.execute("DROP TABLE %s" % table_name) @pytest.mark.execute_serially @CustomClusterTestSuite.with_args(impalad_args="-kudu_error_buffer_size=1024") @SkipIfKudu.hms_integration_enabled() - def test_error_buffer_size(self, cursor, unique_database): + def test_error_buffer_size(self, unique_database): """Check that queries fail if the size of the Kudu client errors they generate is greater than kudu_error_buffer_size.""" + client = self.hs2_client table_name = "%s.test_error_buffer_size" % unique_database - cursor.execute("create table %s (a bigint primary key) stored as kudu" % table_name) + client.execute("create table %s (a bigint primary key) stored as kudu" % table_name) # Insert a large number of a constant value into the table to generate many "Key # already present" errors. 50 errors should fit inside the 1024 byte limit. - cursor.execute( + client.execute( "insert into %s select 1 from functional.alltypes limit 50" % table_name) try: # 200 errors should overflow the 1024 byte limit. - cursor.execute( + client.execute( "insert into %s select 1 from functional.alltypes limit 200" % table_name) assert False, "Expected: 'Error overflow in Kudu session.'" except Exception as e: @@ -199,45 +207,48 @@ class TestKuduHMSIntegration(CustomKuduTest): self.run_test_case('QueryTest/kudu_create', new_vector, use_db=unique_database) @pytest.mark.execute_serially - def test_implicit_external_table_props(self, cursor, kudu_client): + def test_implicit_external_table_props(self, unique_database, kudu_client): """Check that table properties added internally for external table during table creation are as expected. """ - db_name = cursor.conn.db_name - with self.temp_kudu_table(kudu_client, [INT32], db_name=db_name) as kudu_table: + client = self.hs2_client + with self.temp_kudu_table( + kudu_client, [INT32], db_name=unique_database) as kudu_table: impala_table_name = self.get_kudu_table_base_name(kudu_table.name) external_table_name = "%s_external" % impala_table_name props = "TBLPROPERTIES('kudu.table_name'='%s')" % kudu_table.name - cursor.execute("CREATE EXTERNAL TABLE %s STORED AS KUDU %s" % ( + client.execute("CREATE EXTERNAL TABLE %s STORED AS KUDU %s" % ( external_table_name, props)) - with self.drop_impala_table_after_context(cursor, external_table_name): - cursor.execute("DESCRIBE FORMATTED %s" % external_table_name) - table_desc = [[col.strip() if col else col for col in row] for row in cursor] - # Pytest shows truncated output on failure, so print the details just in case. - LOG.info(table_desc) - assert not any("kudu.table_id" in s for s in table_desc) - assert any("Owner:" in s for s in table_desc) - assert ["", "EXTERNAL", "TRUE"] in table_desc - assert ["", "kudu.master_addresses", KUDU_MASTER_HOSTS] in table_desc - assert ["", "kudu.table_name", kudu_table.name] in table_desc - assert ["", "storage_handler", "org.apache.hadoop.hive.kudu.KuduStorageHandler"] \ - in table_desc + result = client.execute("DESCRIBE FORMATTED %s" % external_table_name) + table_desc = [[col.strip() if col else col for col in row] + for row in result.tuples()] + # Pytest shows truncated output on failure, so print the details just in case. + LOG.info(table_desc) + assert not any("kudu.table_id" in s for s in table_desc) + assert any("Owner:" in s for s in table_desc) + assert ["", "EXTERNAL", "TRUE"] in table_desc + assert ["", "kudu.master_addresses", KUDU_MASTER_HOSTS] in table_desc + assert ["", "kudu.table_name", kudu_table.name] in table_desc + assert ["", "storage_handler", "org.apache.hadoop.hive.kudu.KuduStorageHandler"] \ + in table_desc @pytest.mark.execute_serially @CustomClusterTestSuite.with_args(impalad_args="-kudu_client_rpc_timeout_ms=30000") - def test_implicit_managed_table_props(self, cursor, kudu_client, unique_database): + def test_implicit_managed_table_props(self, kudu_client, unique_database): """Check that table properties added internally for managed table during table creation are as expected. Increase timeout of individual Kudu client rpcs to avoid requests fail due to operation delay in the Hive Metastore for managed tables (IMPALA-8856). """ + client = self.hs2_client comment = "kudu_comment" - cursor.execute("""CREATE TABLE %s.foo (a INT PRIMARY KEY, s STRING) PARTITION BY + client.execute("""CREATE TABLE %s.foo (a INT PRIMARY KEY, s STRING) PARTITION BY HASH(a) PARTITIONS 3 COMMENT '%s' STORED AS KUDU""" % (unique_database, comment)) assert kudu_client.table_exists( KuduTestSuite.to_kudu_table_name(unique_database, "foo")) - cursor.execute("DESCRIBE FORMATTED %s.foo" % unique_database) - table_desc = [[col.strip() if col else col for col in row] for row in cursor] + result = client.execute("DESCRIBE FORMATTED %s.foo" % unique_database) + table_desc = [[col.strip() if col else col for col in row] + for row in result.tuples()] # Pytest shows truncated output on failure, so print the details just in case. LOG.info(table_desc) @@ -255,19 +266,20 @@ class TestKuduHMSIntegration(CustomKuduTest): @pytest.mark.execute_serially @CustomClusterTestSuite.with_args(impalad_args="-kudu_client_rpc_timeout_ms=30000") - def test_drop_non_empty_db(self, unique_cursor, kudu_client): + def test_drop_non_empty_db(self, unique_database, kudu_client): """Check that an attempt to drop a database will fail if Kudu tables are present and that the tables remain. Increase timeout of individual Kudu client rpcs to avoid requests fail due to operation delay in the Hive Metastore for managed tables (IMPALA-8856). """ - db_name = unique_cursor.conn.db_name + client = self.hs2_client + db_name = unique_database with self.temp_kudu_table(kudu_client, [INT32], db_name=db_name) as kudu_table: assert kudu_client.table_exists(kudu_table.name) - unique_cursor.execute("INVALIDATE METADATA") - unique_cursor.execute("USE DEFAULT") + client.execute("INVALIDATE METADATA") + client.execute("USE DEFAULT") try: - unique_cursor.execute("DROP DATABASE %s" % db_name) + client.execute("DROP DATABASE %s" % db_name) assert False except Exception as e: assert "One or more tables exist" in str(e) @@ -275,44 +287,50 @@ class TestKuduHMSIntegration(CustomKuduTest): # Dropping an empty database should succeed, once all tables # from the database have been dropped. assert kudu_client.table_exists(kudu_table.name) - unique_cursor.execute("DROP Table %s" % kudu_table.name) - unique_cursor.execute("DROP DATABASE %s" % db_name) + client.execute("DROP Table %s" % kudu_table.name) + client.execute("DROP DATABASE %s" % db_name) assert not kudu_client.table_exists(kudu_table.name) + # Create the database again so 'unique_database' fixture can finish cleanly. + client.execute("CREATE DATABASE %s" % db_name) @pytest.mark.execute_serially @CustomClusterTestSuite.with_args(impalad_args="-kudu_client_rpc_timeout_ms=30000") - def test_drop_db_cascade(self, unique_cursor, kudu_client): + def test_drop_db_cascade(self, unique_database, kudu_client): """Check that an attempt to drop a database cascade will succeed even if Kudu tables are present. Make sure the corresponding managed tables are removed from Kudu. Increase timeout of individual Kudu client rpcs to avoid requests fail due to operation delay in the Hive Metastore for managed tables (IMPALA-8856). """ - db_name = unique_cursor.conn.db_name + client = self.hs2_client + db_name = unique_database with self.temp_kudu_table(kudu_client, [INT32], db_name=db_name) as kudu_table: assert kudu_client.table_exists(kudu_table.name) - unique_cursor.execute("INVALIDATE METADATA") + client.execute("INVALIDATE METADATA") # Create a table in HDFS hdfs_table_name = self.random_table_name() - unique_cursor.execute(""" + client.execute(""" CREATE TABLE %s (a INT) PARTITIONED BY (x INT)""" % (hdfs_table_name)) - unique_cursor.execute("USE DEFAULT") - unique_cursor.execute("DROP DATABASE %s CASCADE" % db_name) - unique_cursor.execute("SHOW DATABASES") - assert (db_name, '') not in unique_cursor.fetchall() + client.execute("USE DEFAULT") + client.execute("DROP DATABASE %s CASCADE" % db_name) + result = client.execute("SHOW DATABASES") + assert (db_name, '') not in result.tuples() assert not kudu_client.table_exists(kudu_table.name) + # Create the database again so 'unique_database' fixture can finish cleanly. + client.execute("CREATE DATABASE %s" % db_name) @pytest.mark.execute_serially @CustomClusterTestSuite.with_args(impalad_args="-kudu_client_rpc_timeout_ms=30000") - def test_drop_managed_kudu_table(self, cursor, kudu_client, unique_database): + def test_drop_managed_kudu_table(self, kudu_client, unique_database): """Check that dropping a managed Kudu table should fail if the underlying Kudu table has been dropped externally. Increase timeout of individual Kudu client rpcs to avoid requests fail due to operation delay in the Hive Metastore for managed tables (IMPALA-8856). """ + client = self.hs2_client impala_tbl_name = "foo" - cursor.execute("""CREATE TABLE %s.%s (a INT PRIMARY KEY) PARTITION BY HASH (a) + client.execute("""CREATE TABLE %s.%s (a INT PRIMARY KEY) PARTITION BY HASH (a) PARTITIONS 3 STORED AS KUDU""" % (unique_database, impala_tbl_name)) kudu_tbl_name = KuduTestSuite.to_kudu_table_name(unique_database, impala_tbl_name) assert kudu_client.table_exists(kudu_tbl_name) @@ -328,20 +346,21 @@ class TestKuduHMSIntegration(CustomKuduTest): assert "Table does not exist: %s" % kudu_tbl_name in str(e) @pytest.mark.execute_serially - def test_drop_external_kudu_table(self, cursor, kudu_client, unique_database): + def test_drop_external_kudu_table(self, kudu_client, unique_database): """Check that Impala can recover from the case where the underlying Kudu table of an external table is dropped using the Kudu client. """ + client = self.hs2_client with self.temp_kudu_table(kudu_client, [INT32], db_name=unique_database) \ as kudu_table: # Create an external Kudu table impala_table_name = self.get_kudu_table_base_name(kudu_table.name) external_table_name = "%s_external" % impala_table_name props = "TBLPROPERTIES('kudu.table_name'='%s')" % kudu_table.name - cursor.execute("CREATE EXTERNAL TABLE %s STORED AS KUDU %s" % ( + client.execute("CREATE EXTERNAL TABLE %s STORED AS KUDU %s" % ( external_table_name, props)) - cursor.execute("DESCRIBE %s" % (external_table_name)) - assert cursor.fetchall() == \ + result = client.execute("DESCRIBE %s" % (external_table_name)) + assert result.tuples() == \ [("a", "int", "", "true", "true", "false", "", "AUTO_ENCODING", "DEFAULT_COMPRESSION", "0")] # Drop the underlying Kudu table @@ -349,12 +368,12 @@ class TestKuduHMSIntegration(CustomKuduTest): assert not kudu_client.table_exists(kudu_table.name) err_msg = 'the table does not exist: table_name: "%s"' % (kudu_table.name) try: - cursor.execute("REFRESH %s" % (external_table_name)) + client.execute("REFRESH %s" % (external_table_name)) except Exception as e: assert err_msg in str(e) - cursor.execute("DROP TABLE %s" % (external_table_name)) - cursor.execute("SHOW TABLES") - assert (external_table_name,) not in cursor.fetchall() + client.execute("DROP TABLE %s" % (external_table_name)) + result = client.execute("SHOW TABLES") + assert (external_table_name,) not in result.tuples() @SkipIfKudu.no_hybrid_clock() def test_kudu_alter_table(self, vector, unique_database): @@ -368,7 +387,7 @@ class TestKuduHMSIntegration(CustomKuduTest): use_db=unique_database) -class TestKuduTransactionBase(CustomClusterTestSuite): +class TestKuduTransactionBase(CustomKuduTest): """ This is a base class of other TestKuduTransaction classes. """ @@ -393,14 +412,12 @@ class TestKuduTransactionBase(CustomClusterTestSuite): _update_query = "update {0} set b='test' where a=1" # query to upsert a row in Kudu table. _upsert_query = "upsert into {0} values (3, 'hello')" - # query to get number of rows. - _row_num_query = "select count(*) from {0}" @classmethod def get_workload(cls): return 'functional-query' - def _test_kudu_txn_succeed(self, cursor, unique_database): + def _test_kudu_txn_succeed(self, unique_database): # Create Kudu table. table_name = "%s.test_kudu_txn_succeed" % unique_database self.execute_query(self._create_kudu_table_query.format(table_name)) @@ -408,24 +425,21 @@ class TestKuduTransactionBase(CustomClusterTestSuite): # Enable Kudu transactions and insert rows to Kudu table. self.execute_query("set ENABLE_KUDU_TRANSACTION=true") self.execute_query(self._insert_3_rows_query.format(table_name)) - cursor.execute(self._row_num_query.format(table_name)) - assert cursor.fetchall() == [(3,)] + + self.assert_num_row(table_name, 3) self.execute_query(self._insert_select_query.format(table_name)) - cursor.execute(self._row_num_query.format(table_name)) - assert cursor.fetchall() == [(103,)] + self.assert_num_row(table_name, 103) # Disable Kudu transactions and delete all rows from Kudu table. # Insert rows to the Kudu table. Should get same results as transaction enabled. self.execute_query("set ENABLE_KUDU_TRANSACTION=false") self.execute_query(self._delete_query.format(table_name)) self.execute_query(self._insert_3_rows_query.format(table_name)) - cursor.execute(self._row_num_query.format(table_name)) - assert cursor.fetchall() == [(3,)] + self.assert_num_row(table_name, 3) self.execute_query(self._insert_select_query.format(table_name)) - cursor.execute(self._row_num_query.format(table_name)) - assert cursor.fetchall() == [(103,)] + self.assert_num_row(table_name, 103) - def _test_kudu_txn_not_implemented(self, cursor, unique_database): + def _test_kudu_txn_not_implemented(self, unique_database): # Create Kudu table. table_name = "%s.test_kudu_txn_succeed" % unique_database self.execute_query(self._create_kudu_table_query.format(table_name)) @@ -433,35 +447,33 @@ class TestKuduTransactionBase(CustomClusterTestSuite): # Enable Kudu transactions and insert rows to Kudu table. self.execute_query("set ENABLE_KUDU_TRANSACTION=true") self.execute_query(self._insert_3_rows_query.format(table_name)) - cursor.execute(self._row_num_query.format(table_name)) - assert cursor.fetchall() == [(3,)] + self.assert_num_row(table_name, 3) # Kudu only support multi-row transaction for INSERT now. Impala return an error # if UPDATE/UPSERT/DELETE are performed within a transaction context. try: self.execute_query(self._update_query.format(table_name)) assert False, "query was expected to fail" - except ImpalaBeeswaxException as e: + except IMPALA_CONNECTION_EXCEPTION as e: assert error_msg_startswith(str(e), "Kudu reported error: Not implemented") try: self.execute_query(self._upsert_query.format(table_name)) assert False, "query was expected to fail" - except ImpalaBeeswaxException as e: + except IMPALA_CONNECTION_EXCEPTION as e: assert error_msg_startswith(str(e), "Kudu reported error: Not implemented") try: self.execute_query(self._delete_query.format(table_name)) assert False, "query was expected to fail" - except ImpalaBeeswaxException as e: + except IMPALA_CONNECTION_EXCEPTION as e: assert error_msg_startswith(str(e), "Kudu reported error: Not implemented") # Verify that number of rows has not been changed. - cursor.execute(self._row_num_query.format(table_name)) - assert cursor.fetchall() == [(3,)] + self.assert_num_row(table_name, 3) - def _test_kudu_txn_abort_dup_key(self, cursor, unique_database, - expect_fail_on_conflict, expected_error_msg): + def _test_kudu_txn_abort_dup_key(self, unique_database, expect_fail_on_conflict, + expected_error_msg): # Create Kudu table. table_name = "%s.test_kudu_txn_abort_dup_key" % unique_database self.execute_query(self._create_kudu_table_query.format(table_name)) @@ -472,17 +484,15 @@ class TestKuduTransactionBase(CustomClusterTestSuite): try: self.execute_query(self._insert_dup_key_query.format(table_name)) assert (not expect_fail_on_conflict), "query was expected to fail" - except ImpalaBeeswaxException as e: + except IMPALA_CONNECTION_EXCEPTION as e: assert expected_error_msg in str(e) - cursor.execute(self._row_num_query.format(table_name)) - assert cursor.fetchall() == [(0 if expect_fail_on_conflict else 2,)] + self.assert_num_row(table_name, 0 if expect_fail_on_conflict else 2) # Disable Kudu transactions and run the same query. Part of rows are inserted into # Kudu table. self.execute_query("set ENABLE_KUDU_TRANSACTION=false") self.execute_query(self._insert_dup_key_query.format(table_name)) - cursor.execute(self._row_num_query.format(table_name)) - assert cursor.fetchall() == [(2,)] + self.assert_num_row(table_name, 2) # Delete all rows from Kudu table. self.execute_query(self._delete_query.format(table_name)) @@ -490,8 +500,7 @@ class TestKuduTransactionBase(CustomClusterTestSuite): table_name2 = "%s.test_kudu_txn_abort_dup_key2" % unique_database self.execute_query(self._create_parquet_table_query.format(table_name2)) self.execute_query(self._insert_dup_key_query.format(table_name2)) - cursor.execute(self._row_num_query.format(table_name2)) - assert cursor.fetchall() == [(3,)] + self.assert_num_row(table_name2, 3) # Enable Kudu transactions self.execute_query("set ENABLE_KUDU_TRANSACTION=true") @@ -500,20 +509,18 @@ class TestKuduTransactionBase(CustomClusterTestSuite): try: self.execute_query(self._insert_select_query2.format(table_name, table_name2)) assert (not expect_fail_on_conflict), "query was expected to fail" - except ImpalaBeeswaxException as e: + except IMPALA_CONNECTION_EXCEPTION as e: assert expected_error_msg in str(e) - cursor.execute(self._row_num_query.format(table_name)) - assert cursor.fetchall() == [(0 if expect_fail_on_conflict else 2,)] + self.assert_num_row(table_name, 0 if expect_fail_on_conflict else 2) # Disable Kudu transactions and run the same query. Part of rows are inserted into # Kudu table. self.execute_query("set ENABLE_KUDU_TRANSACTION=false") self.execute_query(self._insert_select_query2.format(table_name, table_name2)) - cursor.execute(self._row_num_query.format(table_name)) - assert cursor.fetchall() == [(2,)] + self.assert_num_row(table_name, 2) - def _test_kudu_txn_ctas(self, cursor, unique_database, expect_fail_on_conflict, - expected_error_msg): + def _test_kudu_txn_ctas(self, unique_database, expect_fail_on_conflict, + expected_error_msg): # Enable Kudu transactions self.execute_query("set ENABLE_KUDU_TRANSACTION=true") @@ -521,21 +528,18 @@ class TestKuduTransactionBase(CustomClusterTestSuite): table_name1 = "%s.test_kudu_txn_ctas1" % unique_database self.execute_query(self._create_kudu_table_query.format(table_name1)) self.execute_query(self._insert_3_rows_query.format(table_name1)) - cursor.execute(self._row_num_query.format(table_name1)) - assert cursor.fetchall() == [(3,)] + self.assert_num_row(table_name1, 3) # Run CTAS query without duplicate rows in source table. table_name2 = "%s.test_kudu_txn_ctas2" % unique_database self.execute_query(self._ctas_query.format(table_name2, table_name1)) - cursor.execute(self._row_num_query.format(table_name2)) - assert cursor.fetchall() == [(3,)] + self.assert_num_row(table_name2, 3) # Create source Parquet table without primary key and insert duplicate rows. table_name3 = "%s.test_kudu_txn_ctas3" % unique_database self.execute_query(self._create_parquet_table_query.format(table_name3)) self.execute_query(self._insert_dup_key_query.format(table_name3)) - cursor.execute(self._row_num_query.format(table_name3)) - assert cursor.fetchall() == [(3,)] + self.assert_num_row(table_name3, 3) # Run CTAS query with duplicate rows in source table. # Transaction should be aborted and no rows are inserted into Kudu table. @@ -543,20 +547,18 @@ class TestKuduTransactionBase(CustomClusterTestSuite): try: self.execute_query(self._ctas_query.format(table_name4, table_name3)) assert (not expect_fail_on_conflict), "query was expected to fail" - except ImpalaBeeswaxException as e: + except IMPALA_CONNECTION_EXCEPTION as e: assert expected_error_msg in str(e) - cursor.execute(self._row_num_query.format(table_name4)) - assert cursor.fetchall() == [(0 if expect_fail_on_conflict else 2,)] + self.assert_num_row(table_name4, 0 if expect_fail_on_conflict else 2) # Disable Kudu transactions and run the same CTAS query. Part of rows are inserted # into Kudu table. self.execute_query("set ENABLE_KUDU_TRANSACTION=false") table_name5 = "%s.test_kudu_txn_ctas5" % unique_database self.execute_query(self._ctas_query.format(table_name5, table_name3)) - cursor.execute(self._row_num_query.format(table_name5)) - assert cursor.fetchall() == [(2,)] + self.assert_num_row(table_name5, 2) - def _test_kudu_txn_abort_row_batch(self, cursor, unique_database): + def _test_kudu_txn_abort_row_batch(self, unique_database): # Create Kudu table. table_name = "%s.test_kudu_txn_abort_row_batch" % unique_database self.execute_query(self._create_kudu_table_query.format(table_name)) @@ -569,12 +571,11 @@ class TestKuduTransactionBase(CustomClusterTestSuite): try: self.execute_query(self._insert_3_rows_query.format(table_name), query_options) assert False, "query was expected to fail" - except ImpalaBeeswaxException as e: + except IMPALA_CONNECTION_EXCEPTION as e: assert "FIS_KUDU_TABLE_SINK_WRITE_BATCH" in str(e) - cursor.execute(self._row_num_query.format(table_name)) - assert cursor.fetchall() == [(0,)] + self.assert_num_row(table_name, 0) - def _test_kudu_txn_abort_partial_rows(self, cursor, unique_database): + def _test_kudu_txn_abort_partial_rows(self, unique_database): # Create Kudu table. table_name = "%s.test_kudu_txn_abort_partial_rows" % unique_database self.execute_query(self._create_kudu_table_query.format(table_name)) @@ -587,12 +588,11 @@ class TestKuduTransactionBase(CustomClusterTestSuite): try: self.execute_query(self._insert_3_rows_query.format(table_name), query_options) assert False, "query was expected to fail" - except ImpalaBeeswaxException as e: + except IMPALA_CONNECTION_EXCEPTION as e: assert "FIS_KUDU_TABLE_SINK_WRITE_PARTIAL_ROW" in str(e) - cursor.execute(self._row_num_query.format(table_name)) - assert cursor.fetchall() == [(0,)] + self.assert_num_row(table_name, 0) - def _test_kudu_txn_abort_partition_lock(self, cursor, unique_database): + def _test_kudu_txn_abort_partition_lock(self, unique_database): # Running two separate queries that are inserting to the same Kudu partitions. # Verify that one of the queries should fail, given Kudu's current implementation # of partition locking. @@ -617,7 +617,7 @@ class TestKuduTransactionBase(CustomClusterTestSuite): try: self.execute_query(query) assert False, "query was expected to fail" - except ImpalaBeeswaxException as e: + except IMPALA_CONNECTION_EXCEPTION as e: assert "aborted since it tries to acquire the partition lock that is held by " \ "another transaction" in str(e) @@ -632,42 +632,42 @@ class TestKuduTransaction(TestKuduTransactionBase): @pytest.mark.execute_serially @SkipIfKudu.no_hybrid_clock() - def test_kudu_txn_succeed(self, cursor, unique_database): - self._test_kudu_txn_succeed(cursor, unique_database) + def test_kudu_txn_succeed(self, unique_database): + self._test_kudu_txn_succeed(unique_database) @pytest.mark.execute_serially @SkipIfKudu.no_hybrid_clock() - def test_kudu_txn_not_implemented(self, cursor, unique_database): - self._test_kudu_txn_not_implemented(cursor, unique_database) + def test_kudu_txn_not_implemented(self, unique_database): + self._test_kudu_txn_not_implemented(unique_database) @pytest.mark.execute_serially @SkipIfKudu.no_hybrid_clock() - def test_kudu_txn_abort_dup_key(self, cursor, unique_database): - self._test_kudu_txn_abort_dup_key(cursor, unique_database, True, + def test_kudu_txn_abort_dup_key(self, unique_database): + self._test_kudu_txn_abort_dup_key(unique_database, True, self._duplicate_key_error) @pytest.mark.execute_serially @SkipIfKudu.no_hybrid_clock() - def test_kudu_txn_ctas(self, cursor, unique_database): - self._test_kudu_txn_ctas(cursor, unique_database, True, self._duplicate_key_error) + def test_kudu_txn_ctas(self, unique_database): + self._test_kudu_txn_ctas(unique_database, True, self._duplicate_key_error) @pytest.mark.execute_serially @SkipIfKudu.no_hybrid_clock() @SkipIfBuildType.not_dev_build - def test_kudu_txn_abort_row_batch(self, cursor, unique_database): - self._test_kudu_txn_abort_row_batch(cursor, unique_database) + def test_kudu_txn_abort_row_batch(self, unique_database): + self._test_kudu_txn_abort_row_batch(unique_database) @pytest.mark.execute_serially @SkipIfKudu.no_hybrid_clock() @SkipIfBuildType.not_dev_build - def test_kudu_txn_abort_partial_rows(self, cursor, unique_database): - self._test_kudu_txn_abort_partial_rows(cursor, unique_database) + def test_kudu_txn_abort_partial_rows(self, unique_database): + self._test_kudu_txn_abort_partial_rows(unique_database) @pytest.mark.execute_serially @SkipIfKudu.no_hybrid_clock() @SkipIfBuildType.not_dev_build - def test_kudu_txn_abort_partition_lock(self, cursor, unique_database): - self._test_kudu_txn_abort_partition_lock(cursor, unique_database) + def test_kudu_txn_abort_partition_lock(self, unique_database): + self._test_kudu_txn_abort_partition_lock(unique_database) class TestKuduTransactionNoIgnore(TestKuduTransactionBase): @@ -679,53 +679,53 @@ class TestKuduTransactionNoIgnore(TestKuduTransactionBase): # impalad args to start the cluster. _impalad_args = "--kudu_ignore_conflicts=false" # expected error message from kudu on duplicated key. - _duplicate_key_error = "Key already present in Kudu table" + _duplicate_key_error = "Kudu reported error: Already present: key already present" @pytest.mark.execute_serially @SkipIfKudu.no_hybrid_clock() @CustomClusterTestSuite.with_args(impalad_args=_impalad_args) - def test_kudu_txn_succeed(self, cursor, unique_database): - self._test_kudu_txn_succeed(cursor, unique_database) + def test_kudu_txn_succeed(self, unique_database): + self._test_kudu_txn_succeed(unique_database) @pytest.mark.execute_serially @SkipIfKudu.no_hybrid_clock() @CustomClusterTestSuite.with_args(impalad_args=_impalad_args) - def test_kudu_txn_not_implemented(self, cursor, unique_database): - self._test_kudu_txn_not_implemented(cursor, unique_database) + def test_kudu_txn_not_implemented(self, unique_database): + self._test_kudu_txn_not_implemented(unique_database) @pytest.mark.execute_serially @SkipIfKudu.no_hybrid_clock() @CustomClusterTestSuite.with_args(impalad_args=_impalad_args) - def test_kudu_txn_abort_dup_key(self, cursor, unique_database): - self._test_kudu_txn_abort_dup_key(cursor, unique_database, True, + def test_kudu_txn_abort_dup_key(self, unique_database): + self._test_kudu_txn_abort_dup_key(unique_database, True, self._duplicate_key_error) @pytest.mark.execute_serially @SkipIfKudu.no_hybrid_clock() @CustomClusterTestSuite.with_args(impalad_args=_impalad_args) - def test_kudu_txn_ctas(self, cursor, unique_database): - self._test_kudu_txn_ctas(cursor, unique_database, True, self._duplicate_key_error) + def test_kudu_txn_ctas(self, unique_database): + self._test_kudu_txn_ctas(unique_database, True, self._duplicate_key_error) @pytest.mark.execute_serially @SkipIfKudu.no_hybrid_clock() @SkipIfBuildType.not_dev_build @CustomClusterTestSuite.with_args(impalad_args=_impalad_args) - def test_kudu_txn_abort_row_batch(self, cursor, unique_database): - self._test_kudu_txn_abort_row_batch(cursor, unique_database) + def test_kudu_txn_abort_row_batch(self, unique_database): + self._test_kudu_txn_abort_row_batch(unique_database) @pytest.mark.execute_serially @SkipIfKudu.no_hybrid_clock() @SkipIfBuildType.not_dev_build @CustomClusterTestSuite.with_args(impalad_args=_impalad_args) - def test_kudu_txn_abort_partial_rows(self, cursor, unique_database): - self._test_kudu_txn_abort_partial_rows(cursor, unique_database) + def test_kudu_txn_abort_partial_rows(self, unique_database): + self._test_kudu_txn_abort_partial_rows(unique_database) @pytest.mark.execute_serially @SkipIfKudu.no_hybrid_clock() @SkipIfBuildType.not_dev_build @CustomClusterTestSuite.with_args(impalad_args=_impalad_args) - def test_kudu_txn_abort_partition_lock(self, cursor, unique_database): - self._test_kudu_txn_abort_partition_lock(cursor, unique_database) + def test_kudu_txn_abort_partition_lock(self, unique_database): + self._test_kudu_txn_abort_partition_lock(unique_database) class TestKuduTransactionIgnoreConflict(TestKuduTransactionBase): @@ -741,14 +741,14 @@ class TestKuduTransactionIgnoreConflict(TestKuduTransactionBase): @pytest.mark.execute_serially @SkipIfKudu.no_hybrid_clock() @CustomClusterTestSuite.with_args(impalad_args=_impalad_args) - def test_kudu_txn_dup_key(self, cursor, unique_database): - self._test_kudu_txn_abort_dup_key(cursor, unique_database, False, "no error") + def test_kudu_txn_dup_key(self, unique_database): + self._test_kudu_txn_abort_dup_key(unique_database, False, "no error") @pytest.mark.execute_serially @SkipIfKudu.no_hybrid_clock() @CustomClusterTestSuite.with_args(impalad_args=_impalad_args) - def test_kudu_txn_ctas(self, cursor, unique_database): - self._test_kudu_txn_ctas(cursor, unique_database, False, "no error") + def test_kudu_txn_ctas(self, unique_database): + self._test_kudu_txn_ctas(unique_database, False, "no error") @SkipIf.is_test_jdk @@ -762,8 +762,6 @@ class TestKuduTxnKeepalive(CustomKuduTest): "partition by hash(a) partitions 8 stored as kudu" # queries to insert rows into Kudu table. _insert_3_rows_query = "insert into {0} values (0, 'a'), (1, 'b'), (2, 'c')" - # query to get number of rows. - _row_num_query = "select count(*) from {0}" @classmethod def get_workload(cls): @@ -785,7 +783,7 @@ class TestKuduTxnKeepalive(CustomKuduTest): @pytest.mark.execute_serially @SkipIfKudu.no_hybrid_clock() @SkipIfBuildType.not_dev_build - def test_kudu_txn_heartbeat(self, cursor, unique_database): + def test_kudu_txn_heartbeat(self, unique_database): # Create Kudu table. table_name = "%s.test_kudu_txn_heartbeat" % unique_database self.execute_query(self._create_kudu_table_query.format(table_name)) @@ -796,11 +794,10 @@ class TestKuduTxnKeepalive(CustomKuduTest): self.execute_query("set ENABLE_KUDU_TRANSACTION=true") query_options = {'debug_action': 'FIS_KUDU_TABLE_SINK_CREATE_SESSION:SLEEP@10000'} self.execute_query(self._insert_3_rows_query.format(table_name), query_options) - cursor.execute(self._row_num_query.format(table_name)) - assert cursor.fetchall() == [(3,)] + self.assert_num_row(table_name, 3) -class TestKuduDmlConflictBase(CustomClusterTestSuite): +class TestKuduDmlConflictBase(CustomKuduTest): """ This is a base class of other TestKuduDml classes. """ @@ -823,8 +820,6 @@ class TestKuduDmlConflictBase(CustomClusterTestSuite): _delete_by_key_query = "delete from {0} where a = {1}" # query to drop all rows from Kudu table. _delete_all_query = "delete from {0}" - # query to get number of rows. - _row_num_query = "select count(*) from {0}" @classmethod def get_workload(cls): @@ -862,7 +857,7 @@ class TestKuduDmlConflictBase(CustomClusterTestSuite): self.client.close_query(fast_handle) self.client.close_query(slow_handle) - def _test_insert_update_delete(self, cursor, unique_database, + def _test_insert_update_delete(self, unique_database, expect_log_on_conflict): """ Do sequence of insert, update, and delete query with conflicting primary keys. @@ -877,8 +872,7 @@ class TestKuduDmlConflictBase(CustomClusterTestSuite): result = self.execute_query(self._insert_dup_key_query.format(table_name)) self._check_errors(result.runtime_profile, expect_log_on_conflict, "Key already present in Kudu table", 3) - cursor.execute(self._row_num_query.format(table_name)) - assert cursor.fetchall() == [(3,)] + self.assert_num_row(table_name, 3) # Update rows with some constraint violation. # Error message should exist in profile regardless of kudu_ignore_conflicts value. @@ -892,22 +886,19 @@ class TestKuduDmlConflictBase(CustomClusterTestSuite): update_query = self._update_by_key_query.format(table_name, 1) self._race_queries(delete_query, update_query, expect_log_on_conflict, "Not found in Kudu table", 1) - cursor.execute(self._row_num_query.format(table_name)) - assert cursor.fetchall() == [(2,)] + self.assert_num_row(table_name, 2) # Delete row with non-existent primary key by racing it against another concurrent # delete. delete_query = self._delete_by_key_query.format(table_name, 2) self._race_queries(delete_query, delete_query, expect_log_on_conflict, "Not found in Kudu table", 1) - cursor.execute(self._row_num_query.format(table_name)) - assert cursor.fetchall() == [(1,)] + self.assert_num_row(table_name, 1) # Delete all rows. Expect no errors. result = self.execute_query(self._delete_all_query.format(table_name)) self._check_errors(result.runtime_profile, True, "\n", 0) - cursor.execute(self._row_num_query.format(table_name)) - assert cursor.fetchall() == [(0,)] + self.assert_num_row(table_name, 0) class TestKuduDmlConflictNoError(TestKuduDmlConflictBase): @@ -917,8 +908,8 @@ class TestKuduDmlConflictNoError(TestKuduDmlConflictBase): @pytest.mark.execute_serially @SkipIfKudu.no_hybrid_clock() - def test_insert_update_delete(self, cursor, unique_database): - self._test_insert_update_delete(cursor, unique_database, False) + def test_insert_update_delete(self, unique_database): + self._test_insert_update_delete(unique_database, False) class TestKuduDmlConflictLogError(TestKuduDmlConflictBase): @@ -932,5 +923,5 @@ class TestKuduDmlConflictLogError(TestKuduDmlConflictBase): @pytest.mark.execute_serially @SkipIfKudu.no_hybrid_clock() @CustomClusterTestSuite.with_args(impalad_args=_impalad_args) - def test_insert_update_delete(self, cursor, unique_database): - self._test_insert_update_delete(cursor, unique_database, True) + def test_insert_update_delete(self, unique_database): + self._test_insert_update_delete(unique_database, True)
