This is an automated email from the ASF dual-hosted git repository. dbecker pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 68e61c1aabacee5da37b1f66842d8a2354d0e215 Author: Zoltan Borok-Nagy <[email protected]> AuthorDate: Mon Feb 5 14:07:54 2024 +0100 IMPALA-12787: Concurrent DELETE and UPDATE operations on Iceberg tables can be problematic If an UPDATE operation runs concurrently with a DELETE operation, and the DELETE commits first, then the UPDATE can revive deleted rows. This is because only RowDelta.validateNoConflictingDataFiles() is called, but RowDelta.validateNoConflictingDeleteFiles() is not. Therefore, the UPDATE operation ignores the concurrently written delete files. This patch adds RowDelta.validateNoConflictingDeleteFiles() to UPDATE operations. Testing * added a stress test to validate concurrent DELETE and UPDATE operations Change-Id: I9e581ea17fa8f6ccd9c87aaad1281bb694079f6e Reviewed-on: http://gerrit.cloudera.org:8080/20999 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- .../impala/service/IcebergCatalogOpExecutor.java | 24 +++-- tests/stress/test_update_stress.py | 103 +++++++++++++++++++++ 2 files changed, 120 insertions(+), 7 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java index 27a62dea6..cc4d68248 100644 --- a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java @@ -367,7 +367,16 @@ public class IcebergCatalogOpExecutor { DeleteFile deleteFile = createDeleteFile(feIcebergTable, buf); rowDelta.addDeletes(deleteFile); } - validateAndCommitRowDelta(rowDelta, icebergOp.getInitial_snapshot_id()); + try { + // Validate that there are no conflicting data files, because if data files are + // added in the meantime, they potentially contain records that should have been + // affected by this DELETE operation. + rowDelta.validateFromSnapshot(icebergOp.getInitial_snapshot_id()); + rowDelta.validateNoConflictingDataFiles(); + rowDelta.commit(); + } catch (ValidationException e) { + throw new ImpalaRuntimeException(e.getMessage(), e); + } } private static void updateRows(FeIcebergTable feIcebergTable, Transaction txn, @@ -384,14 +393,15 @@ public class IcebergCatalogOpExecutor { DataFile dataFile = createDataFile(feIcebergTable, buf); rowDelta.addRows(dataFile); } - validateAndCommitRowDelta(rowDelta, icebergOp.getInitial_snapshot_id()); - } - - private static void validateAndCommitRowDelta(RowDelta rowDelta, - long initialSnapshotId) throws ImpalaRuntimeException { try { - rowDelta.validateFromSnapshot(initialSnapshotId); + // Validate that there are no conflicting data files, because if data files are + // added in the meantime, they potentially contain records that should have been + // affected by this UPDATE operation. Also validate that there are no conflicting + // delete files, because we don't want to revive records that have been deleted + // in the meantime. + rowDelta.validateFromSnapshot(icebergOp.getInitial_snapshot_id()); rowDelta.validateNoConflictingDataFiles(); + rowDelta.validateNoConflictingDeleteFiles(); rowDelta.commit(); } catch (ValidationException e) { throw new ImpalaRuntimeException(e.getMessage(), e); diff --git a/tests/stress/test_update_stress.py b/tests/stress/test_update_stress.py index 01c295e9d..dec84ddfc 100644 --- a/tests/stress/test_update_stress.py +++ b/tests/stress/test_update_stress.py @@ -20,6 +20,7 @@ from builtins import map, range import pytest import random import time +from multiprocessing import Value from tests.common.impala_test_suite import ImpalaTestSuite from tests.common.parametrize import UniqueDatabase @@ -146,3 +147,105 @@ class TestIcebergConcurrentUpdateStress(ImpalaTestSuite): checkers = [Task(self._impala_role_concurrent_checker, tbl_name, target_total) for i in range(0, num_checkers)] run_tasks([updater_a, updater_b, updater_c] + checkers) + + +class TestIcebergConcurrentDeletesAndUpdates(ImpalaTestSuite): + """This test checks that concurrent DELETE and UPDATE operations leave the table + in a consistent state.""" + + @classmethod + def get_workload(self): + return 'targeted-stress' + + @classmethod + def add_test_dimensions(cls): + super(TestIcebergConcurrentDeletesAndUpdates, cls).add_test_dimensions() + cls.ImpalaTestMatrix.add_constraint( + lambda v: (v.get_value('table_format').file_format == 'parquet' + and v.get_value('table_format').compression_codec == 'snappy')) + + def _impala_role_concurrent_deleter(self, tbl_name, flag, num_rows): + """Deletes every row from the table one by one.""" + target_impalad = random.randint(0, ImpalaTestSuite.get_impalad_cluster_size() - 1) + impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad) + i = 0 + while i < num_rows: + try: + impalad_client.execute( + "delete from {0} WHERE id = {1}".format(tbl_name, i)) + i += 1 + # Sleep after a succesful operation. + time.sleep(random.random()) + except Exception: + # Exceptions are expected due to concurrent operations. + pass + flag.value = 1 + impalad_client.close() + + def _impala_role_concurrent_writer(self, tbl_name, flag): + """Updates every row in the table in a loop.""" + target_impalad = random.randint(0, ImpalaTestSuite.get_impalad_cluster_size() - 1) + impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad) + while flag.value != 1: + try: + impalad_client.execute( + "update {0} set j = j + 1".format(tbl_name)) + # Sleep after a succesful operation. + time.sleep(random.random()) + except Exception: + # Exceptions are expected due to concurrent operations. + pass + impalad_client.close() + + def _impala_role_concurrent_checker(self, tbl_name, flag, num_rows): + """Checks if the table's invariant is true. The invariant is that we have a + consecutive range of 'id's starting from N to num_rows - 1. And 'j's are equal.""" + def verify_result_set(result): + if len(result.data) == 0: return + line = result.data[0] + [prev_id, prev_j] = list(map(int, (line.split('\t')))) + for line in result.data[1:]: + [id, j] = list(map(int, (line.split('\t')))) + assert id - prev_id == 1 + assert j == prev_j + prev_id = id + prev_j = j + assert prev_id == num_rows - 1 + + target_impalad = random.randint(0, ImpalaTestSuite.get_impalad_cluster_size() - 1) + impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad) + while flag.value != 1: + result = impalad_client.execute("select * from %s order by id" % tbl_name) + verify_result_set(result) + time.sleep(random.random()) + impalad_client.close() + + @pytest.mark.stress + @UniqueDatabase.parametrize(sync_ddl=True) + def test_iceberg_deletes_and_updates(self, unique_database): + """Issues DELETE and UPDATE statements in parallel in a way that some + invariants must be true when a spectator process inspects the table.""" + + tbl_name = "%s.test_concurrent_deletes_and_updates" % unique_database + self.client.set_configuration_option("SYNC_DDL", "true") + self.client.execute("""create table {0} (id int, j bigint) + stored as iceberg + tblproperties('format-version'='2')""".format(tbl_name,)) + + num_rows = 20 + values_str = "" + for i in range(num_rows): + values_str += "({}, 0)".format(i) + if i != num_rows - 1: + values_str += ", " + self.client.execute("insert into {} values {}".format(tbl_name, values_str)) + + flag = Value('i', 0) + deleter = Task(self._impala_role_concurrent_deleter, tbl_name, flag, num_rows) + updater = Task(self._impala_role_concurrent_writer, tbl_name, flag) + checker = Task(self._impala_role_concurrent_checker, tbl_name, flag, num_rows) + run_tasks([deleter, updater, checker]) + + self.client.execute("refresh {}".format(tbl_name)) + result = self.client.execute("select count(*) from {}".format(tbl_name)) + assert result.data == ['0']
