This is an automated email from the ASF dual-hosted git repository.

csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit be6d45bc40b5fce96879216381e1673a966fe1fb
Author: Peter Rozsa <[email protected]>
AuthorDate: Fri Jan 3 14:46:01 2025 +0100

    IMPALA-13611: Add interop tests for Iceberg tables
    
    This change adds new e2e tests where an Iceberg table is modified from
    an external system. These tests validate that externally modified, but
    HMS-tracked tables remain synchronized, and both actors see the
    same results.
    
    Change-Id: Ic2ee6d3354c3b11264c5e3ded9826831e3962a98
    Reviewed-on: http://gerrit.cloudera.org:8080/22459
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 tests/stress/test_update_stress.py | 93 +++++++++++++++++++++++++++++++++++++-
 1 file changed, 91 insertions(+), 2 deletions(-)

diff --git a/tests/stress/test_update_stress.py 
b/tests/stress/test_update_stress.py
index bb966c864..7dc38376f 100644
--- a/tests/stress/test_update_stress.py
+++ b/tests/stress/test_update_stress.py
@@ -28,6 +28,7 @@ from tests.common.parametrize import UniqueDatabase
 from tests.common.test_dimensions import create_exec_option_dimension
 from tests.stress.stress_util import run_tasks, Task
 from tests.util.filesystem_utils import FILESYSTEM_PREFIX, IS_HDFS
+from tests.conftest import DEFAULT_HIVE_SERVER2
 
 
 # Longer-running UPDATE tests are executed here
@@ -210,6 +211,36 @@ class TestIcebergConcurrentOperations(ImpalaTestSuite):
       time.sleep(random.random())
     impalad_client.close()
 
+  def _hive_role_concurrent_writer(self, tbl_name, all_rows_deleted):
+    """Increments j's value with each iteration until all rows are deleted"""
+    hive_client = ImpalaTestSuite.create_impala_client(
+      host_port=DEFAULT_HIVE_SERVER2, protocol='hs2', is_hive=True)
+    while all_rows_deleted.value != 1:
+      try:
+        hive_client.execute(
+            "update {0} set j = j + 1".format(tbl_name))
+      except Exception as e:
+        # Exceptions are expected due to concurrent operations.
+        print(str(e))
+      time.sleep(random.random() * 7)
+    hive_client.close()
+
+  def _hive_role_concurrent_deleter(self, tbl_name, all_rows_deleted, 
num_rows):
+    hive_client = ImpalaTestSuite.create_impala_client(
+      host_port=DEFAULT_HIVE_SERVER2, protocol='hs2', is_hive=True)
+    i = 0
+    while i < num_rows:
+      try:
+        hive_client.execute(
+            "delete from {0} WHERE id = {1}".format(tbl_name, i))
+        i += 1
+      except Exception as e:
+        # Exceptions are expected due to concurrent operations.
+        print(str(e))
+      time.sleep(random.random())
+    all_rows_deleted.value = 1
+    hive_client.close()
+
   def _impala_role_concurrent_checker(self, tbl_name, all_rows_deleted, 
num_rows):
     """Checks if the table's invariant is true. The invariant is that we have a
     consecutive range of 'id's starting from N to num_rows - 1. And 'j's are 
equal."""
@@ -247,8 +278,8 @@ class TestIcebergConcurrentOperations(ImpalaTestSuite):
         tblproperties('format-version'='2')""".format(tbl_name,))
 
     num_rows = 10
-    for i in range(num_rows):
-      self.client.execute("insert into {} values ({}, 0)".format(tbl_name, i))
+    values = ', '.join("({}, 0)".format(i) for i in range(num_rows))
+    self.client.execute("insert into {} values ({})".format(tbl_name, values))
 
     all_rows_deleted = Value('i', 0)
     deleter = Task(self._impala_role_concurrent_deleter, tbl_name, 
all_rows_deleted,
@@ -320,3 +351,61 @@ class TestIcebergConcurrentOperations(ImpalaTestSuite):
 
     assert len(data_files_in_tbl) == len(data_files_on_fs_rows)
     assert set(data_files_on_fs) == set(data_files_in_tbl)
+
+  @pytest.mark.execute_serially
+  @UniqueDatabase.parametrize(sync_ddl=True)
+  def test_iceberg_impala_deletes_and_hive_updates(self, unique_database):
+    """Issues DELETE statements from Impala and UPDATE statements from Hive
+    in parallel in a way that some invariants must be true when a spectator 
process
+    inspects the table."""
+
+    tbl_name = "%s.test_concurrent_deletes_and_updates" % unique_database
+    self.client.set_configuration_option("SYNC_DDL", "true")
+    self.client.execute("""create table {0} (id int, j bigint)
+        stored as iceberg
+        tblproperties('format-version'='2')""".format(tbl_name,))
+
+    num_rows = 5
+    values = ', '.join("({}, 0)".format(i) for i in range(num_rows))
+    self.client.execute("insert into {} values ({})".format(tbl_name, values))
+
+    all_rows_deleted = Value('i', 0)
+    deleter = Task(self._impala_role_concurrent_deleter, tbl_name, 
all_rows_deleted,
+        num_rows)
+    hive_updater = Task(self._hive_role_concurrent_writer, tbl_name, 
all_rows_deleted)
+    checker = Task(self._impala_role_concurrent_checker, tbl_name, 
all_rows_deleted,
+        num_rows)
+    run_tasks([deleter, hive_updater, checker])
+
+    result = self.client.execute("select count(*) from {}".format(tbl_name))
+    assert result.data == ['0']
+
+  @pytest.mark.execute_serially
+  @UniqueDatabase.parametrize(sync_ddl=True)
+  def test_iceberg_impala_updates_and_hive_deletes(self, unique_database):
+    """Issues DELETE statemes from Hive and UPDATE statements from Impala
+    in parallel in a way that some invariants must be true when a spectator
+    process inspects the table."""
+
+    tbl_name = "%s.test_concurrent_deletes_and_updates" % unique_database
+    self.client.set_configuration_option("SYNC_DDL", "true")
+    self.client.execute("""create table {0} (id int, j bigint)
+        stored as iceberg
+        tblproperties('format-version'='2')""".format(tbl_name))
+
+    num_rows = 5
+    values = ', '.join("({}, 0)".format(i) for i in range(num_rows))
+    self.client.execute("insert into {} values ({})".format(tbl_name, values))
+
+    all_rows_deleted = Value('i', 0)
+    impala_updater = Task(self._impala_role_concurrent_writer, tbl_name, 
all_rows_deleted)
+    hive_deleter = Task(self._hive_role_concurrent_deleter,
+                        tbl_name, all_rows_deleted, num_rows)
+    checker = Task(self._impala_role_concurrent_checker, tbl_name, 
all_rows_deleted,
+        num_rows)
+    run_tasks([impala_updater, hive_deleter, checker])
+
+    self.client.execute("refresh %s" % tbl_name)
+
+    result = self.client.execute("select count(*) from {}".format(tbl_name))
+    assert result.data == ['0']

Reply via email to