This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit e7c97439d12644af2a59bcd893aa7b8bdcb83b36 Author: Sai Hemanth Gantasala <[email protected]> AuthorDate: Mon Nov 18 22:02:12 2024 -0800 IMPALA-12141: EP shouldn't fail while releasing write lock if the lock is not held previously Without IMPALA-12832, Event Processor (EP) is going into error state when there is an issue while obtaining a table write lock because the finally-clause of releaseWriteLock() is always invoked even if the lock is not held by current thread. This patch addresses the problem by checking if the table holds write lock before releasing it. Note: With IMPALA-12832, the EP invalidates the table when an error is encountered which is still an overhead. With this patch EP will neither goes into error state nor invalidates when this issue is encountered. Testing: - Added an end-to-end to verify the same. Change-Id: Ib2e4c965796dd515ab8549efa616f72510ca447f Reviewed-on: http://gerrit.cloudera.org:8080/22080 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- .../apache/impala/service/CatalogOpExecutor.java | 25 +++++++++++++++++----- .../java/org/apache/impala/util/DebugUtils.java | 5 +++++ tests/custom_cluster/test_events_custom_configs.py | 25 ++++++++++++++++++++++ 3 files changed, 50 insertions(+), 5 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java index 20937c57b..923894cd9 100644 --- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java @@ -4958,7 +4958,9 @@ public class CatalogOpExecutor { table.setLastSyncedEventId(eventId); } UnlockWriteLockIfErronouslyLocked(); - table.releaseWriteLock(); + if (table.isWriteLockedByCurrentThread()) { + table.releaseWriteLock(); + } } } @@ -5063,7 +5065,14 @@ public class CatalogOpExecutor { boolean errorOccured = false; try { - tryWriteLock(table, reason, NoOpEventSequence.INSTANCE); + if (!DebugUtils.hasDebugAction(BackendConfig.INSTANCE.debugActions(), + DebugUtils.MOCK_WRITE_LOCK_FAILURE)) { + tryWriteLock(table, reason, NoOpEventSequence.INSTANCE); + } else { + // Mock the debug action that there is a failure in obtaining write lock. + // We don't want to throw InternalException to fail EP for test purpose. + return 0; + } InProgressTableModification modification = new InProgressTableModification(catalog_, table); catalog_.getLock().writeLock().unlock(); @@ -5100,7 +5109,9 @@ public class CatalogOpExecutor { table.setLastSyncedEventId(eventId); } UnlockWriteLockIfErronouslyLocked(); - table.releaseWriteLock(); + if (table.isWriteLockedByCurrentThread()) { + table.releaseWriteLock(); + } } return 0; } @@ -5173,7 +5184,9 @@ public class CatalogOpExecutor { "Could not acquire lock on the table " + table.getFullName(), e); } finally { UnlockWriteLockIfErronouslyLocked(); - table.releaseWriteLock(); + if (table.isWriteLockedByCurrentThread()) { + table.releaseWriteLock(); + } } } @@ -5296,7 +5309,9 @@ public class CatalogOpExecutor { throw e; } finally { UnlockWriteLockIfErronouslyLocked(); - table.releaseWriteLock(); + if (table.isWriteLockedByCurrentThread()) { + table.releaseWriteLock(); + } } } diff --git a/fe/src/main/java/org/apache/impala/util/DebugUtils.java b/fe/src/main/java/org/apache/impala/util/DebugUtils.java index 28502d871..781acd766 100644 --- a/fe/src/main/java/org/apache/impala/util/DebugUtils.java +++ b/fe/src/main/java/org/apache/impala/util/DebugUtils.java @@ -95,6 +95,11 @@ public class DebugUtils { // behavior of metastore returning partitions with empty values public static final String MOCK_EMPTY_PARTITION_VALUES = "mock_empty_partition_values"; + // debug action label to mock catalogD to mimick that there was failure while + // obtaining write lock while reloading partitions. This action is required for repro + // test failure for IMPALA-13126. + public static final String MOCK_WRITE_LOCK_FAILURE = "mock_write_lock_failure"; + /** * Returns true if the label of action is set in the debugActions */ diff --git a/tests/custom_cluster/test_events_custom_configs.py b/tests/custom_cluster/test_events_custom_configs.py index adf30b18c..b23861622 100644 --- a/tests/custom_cluster/test_events_custom_configs.py +++ b/tests/custom_cluster/test_events_custom_configs.py @@ -1332,6 +1332,31 @@ class TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase): self.assert_impalad_log_contains('INFO', log_regex % 2, expected_count=1, timeout_s=20) + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + impalad_args="--use_local_catalog=true", + catalogd_args="--catalog_topic_mode=minimal " + "--hms_event_polling_interval_s=1 " + "--invalidate_metadata_on_event_processing_failure=false " + "--debug_actions=mock_write_lock_failure:true", + disable_log_buffering=True, cluster_size=1) + def test_write_lock_on_partitioned_events(self, unique_database): + """IMPALA-12277: This test verifies that CommitCompactionEvent on a partitioned table + succeeds if the write lock is not held by the table while processing the event by the + event processor. 'mock_write_lock_failure' mocks that there is a failure while + acquiring write lock for CommitCompactionEvent""" + test_tbl = unique_database + ".test_invalidate_table" + acid_props = self._get_transactional_tblproperties(True) + self.client.execute("create table {} (id int) partitioned by (p int) {}" + .format(test_tbl, acid_props)) + for _ in range(10): + self.client.execute( + "insert into {} partition(p=0) values (1),(2),(3)".format(test_tbl)) + self.run_stmt_in_hive( + "alter table {} partition(p=0) compact 'major' and wait".format(test_tbl)) + EventProcessorUtils.wait_for_event_processing(self) + assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" + @SkipIfFS.hive class TestEventProcessingWithImpala(TestEventProcessingCustomConfigsBase):
