This is an automated email from the ASF dual-hosted git repository. csringhofer pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit be6d45bc40b5fce96879216381e1673a966fe1fb Author: Peter Rozsa <[email protected]> AuthorDate: Fri Jan 3 14:46:01 2025 +0100 IMPALA-13611: Add interop tests for Iceberg tables This change adds new e2e tests where an Iceberg table is modified from an external system. These tests validate that externally modified, but HMS-tracked tables remain synchronized, and both actors see the same results. Change-Id: Ic2ee6d3354c3b11264c5e3ded9826831e3962a98 Reviewed-on: http://gerrit.cloudera.org:8080/22459 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- tests/stress/test_update_stress.py | 93 +++++++++++++++++++++++++++++++++++++- 1 file changed, 91 insertions(+), 2 deletions(-) diff --git a/tests/stress/test_update_stress.py b/tests/stress/test_update_stress.py index bb966c864..7dc38376f 100644 --- a/tests/stress/test_update_stress.py +++ b/tests/stress/test_update_stress.py @@ -28,6 +28,7 @@ from tests.common.parametrize import UniqueDatabase from tests.common.test_dimensions import create_exec_option_dimension from tests.stress.stress_util import run_tasks, Task from tests.util.filesystem_utils import FILESYSTEM_PREFIX, IS_HDFS +from tests.conftest import DEFAULT_HIVE_SERVER2 # Longer-running UPDATE tests are executed here @@ -210,6 +211,36 @@ class TestIcebergConcurrentOperations(ImpalaTestSuite): time.sleep(random.random()) impalad_client.close() + def _hive_role_concurrent_writer(self, tbl_name, all_rows_deleted): + """Increments j's value with each iteration until all rows are deleted""" + hive_client = ImpalaTestSuite.create_impala_client( + host_port=DEFAULT_HIVE_SERVER2, protocol='hs2', is_hive=True) + while all_rows_deleted.value != 1: + try: + hive_client.execute( + "update {0} set j = j + 1".format(tbl_name)) + except Exception as e: + # Exceptions are expected due to concurrent operations. + print(str(e)) + time.sleep(random.random() * 7) + hive_client.close() + + def _hive_role_concurrent_deleter(self, tbl_name, all_rows_deleted, num_rows): + hive_client = ImpalaTestSuite.create_impala_client( + host_port=DEFAULT_HIVE_SERVER2, protocol='hs2', is_hive=True) + i = 0 + while i < num_rows: + try: + hive_client.execute( + "delete from {0} WHERE id = {1}".format(tbl_name, i)) + i += 1 + except Exception as e: + # Exceptions are expected due to concurrent operations. + print(str(e)) + time.sleep(random.random()) + all_rows_deleted.value = 1 + hive_client.close() + def _impala_role_concurrent_checker(self, tbl_name, all_rows_deleted, num_rows): """Checks if the table's invariant is true. The invariant is that we have a consecutive range of 'id's starting from N to num_rows - 1. And 'j's are equal.""" @@ -247,8 +278,8 @@ class TestIcebergConcurrentOperations(ImpalaTestSuite): tblproperties('format-version'='2')""".format(tbl_name,)) num_rows = 10 - for i in range(num_rows): - self.client.execute("insert into {} values ({}, 0)".format(tbl_name, i)) + values = ', '.join("({}, 0)".format(i) for i in range(num_rows)) + self.client.execute("insert into {} values ({})".format(tbl_name, values)) all_rows_deleted = Value('i', 0) deleter = Task(self._impala_role_concurrent_deleter, tbl_name, all_rows_deleted, @@ -320,3 +351,61 @@ class TestIcebergConcurrentOperations(ImpalaTestSuite): assert len(data_files_in_tbl) == len(data_files_on_fs_rows) assert set(data_files_on_fs) == set(data_files_in_tbl) + + @pytest.mark.execute_serially + @UniqueDatabase.parametrize(sync_ddl=True) + def test_iceberg_impala_deletes_and_hive_updates(self, unique_database): + """Issues DELETE statements from Impala and UPDATE statements from Hive + in parallel in a way that some invariants must be true when a spectator process + inspects the table.""" + + tbl_name = "%s.test_concurrent_deletes_and_updates" % unique_database + self.client.set_configuration_option("SYNC_DDL", "true") + self.client.execute("""create table {0} (id int, j bigint) + stored as iceberg + tblproperties('format-version'='2')""".format(tbl_name,)) + + num_rows = 5 + values = ', '.join("({}, 0)".format(i) for i in range(num_rows)) + self.client.execute("insert into {} values ({})".format(tbl_name, values)) + + all_rows_deleted = Value('i', 0) + deleter = Task(self._impala_role_concurrent_deleter, tbl_name, all_rows_deleted, + num_rows) + hive_updater = Task(self._hive_role_concurrent_writer, tbl_name, all_rows_deleted) + checker = Task(self._impala_role_concurrent_checker, tbl_name, all_rows_deleted, + num_rows) + run_tasks([deleter, hive_updater, checker]) + + result = self.client.execute("select count(*) from {}".format(tbl_name)) + assert result.data == ['0'] + + @pytest.mark.execute_serially + @UniqueDatabase.parametrize(sync_ddl=True) + def test_iceberg_impala_updates_and_hive_deletes(self, unique_database): + """Issues DELETE statemes from Hive and UPDATE statements from Impala + in parallel in a way that some invariants must be true when a spectator + process inspects the table.""" + + tbl_name = "%s.test_concurrent_deletes_and_updates" % unique_database + self.client.set_configuration_option("SYNC_DDL", "true") + self.client.execute("""create table {0} (id int, j bigint) + stored as iceberg + tblproperties('format-version'='2')""".format(tbl_name)) + + num_rows = 5 + values = ', '.join("({}, 0)".format(i) for i in range(num_rows)) + self.client.execute("insert into {} values ({})".format(tbl_name, values)) + + all_rows_deleted = Value('i', 0) + impala_updater = Task(self._impala_role_concurrent_writer, tbl_name, all_rows_deleted) + hive_deleter = Task(self._hive_role_concurrent_deleter, + tbl_name, all_rows_deleted, num_rows) + checker = Task(self._impala_role_concurrent_checker, tbl_name, all_rows_deleted, + num_rows) + run_tasks([impala_updater, hive_deleter, checker]) + + self.client.execute("refresh %s" % tbl_name) + + result = self.client.execute("select count(*) from {}".format(tbl_name)) + assert result.data == ['0']
