This is an automated email from the ASF dual-hosted git repository. dbecker pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 46115e9a8ec1c6016cad7a1f3af7f7c260c170e3 Author: Sai Hemanth Gantasala <[email protected]> AuthorDate: Mon Aug 12 14:40:39 2024 -0700 IMPALA-13126: Obtain table read lock in EP to process partitioned event For a partition-level event, isOlderEvent() in catalogD needs to check whether the corresponding partition is reloaded after the event. This should be done after holding the table read lock. Otherwise, EventProcessor could hit ConcurrentModificationException error when there are concurrent DDLs/DMLs modifying the partition list. note: Created IMPALA-13650 for a cleaner solution to clear the inflight events list for partitioned table events. Testing: - Added a end-to-end stress test to verify the above scenario Change-Id: I26933f98556736f66df986f9440ebb64be395bc1 Reviewed-on: http://gerrit.cloudera.org:8080/21663 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- .../impala/catalog/events/MetastoreEvents.java | 78 +++++++++++++--------- tests/custom_cluster/test_events_custom_configs.py | 25 +++++++ 2 files changed, 72 insertions(+), 31 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java index c2612a878..43ec31549 100644 --- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java +++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java @@ -1297,19 +1297,29 @@ public class MetastoreEvents { return true; } // Always check the lastRefreshEventId on the table first for table level refresh - if (tbl.getLastRefreshEventId() > getEventId() || (partitionEventObj != null && - catalog_.isPartitionLoadedAfterEvent(dbName_, tblName_, - partitionEventObj, getEventId()))) { - metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC) - .inc(getNumberOfEvents()); - String messageStr = partitionEventObj == null ? "Skipping the event since the" + - " table " + dbName_+ "." + tblName_ + " has last refresh id as " + - tbl.getLastRefreshEventId() + ". Comparing it with current event " + - getEventId() + ". " : ""; - infoLog("{}Incremented events skipped counter to {}", messageStr, - metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC) - .getCount()); - return true; + boolean canSkip = tbl.getLastRefreshEventId() >= getEventId(); + try { + if (!canSkip && partitionEventObj != null) { + tbl.takeReadLock(); + canSkip = catalog_.isPartitionLoadedAfterEvent(dbName_, tblName_, + partitionEventObj, getEventId()); + } + if (canSkip) { + metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC) + .inc(getNumberOfEvents()); + String messageStr = partitionEventObj == null ? "Skipping the event since " + + "the table " + dbName_ + "." + tblName_ + " has last refresh id as " + + tbl.getLastRefreshEventId() + ". Comparing it with current event " + + getEventId() + ". " : ""; + infoLog("{}Incremented events skipped counter to {}", messageStr, + metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC) + .getCount()); + return true; + } + } finally { + if (tbl.isReadLockedByCurrentThread()) { + tbl.releaseReadLock(); + } } } catch (CatalogException e) { debugLog("ignoring exception while checking if it is an older event " @@ -1805,12 +1815,6 @@ public class MetastoreEvents { return; } - if (isOlderEvent(null)) { - infoLog("Not processing the alter table event {} as it is an older event", - getEventId()); - return; - } - // Determine whether this is an event which we have already seen or if it is a new // event if (isSelfEvent()) { @@ -1825,6 +1829,12 @@ public class MetastoreEvents { + "which can be ignored."); return; } + + if (isOlderEvent(null)) { + infoLog("Not processing the alter table event {} as it is an older event", + getEventId()); + return; + } skipFileMetadataReload_ = !isTruncateOp_ && canSkipFileMetadataReload(tableBefore_, tableAfter_); long startNs = System.nanoTime(); @@ -2640,12 +2650,6 @@ public class MetastoreEvents { return; } - if (isOlderEvent(partitionBefore_)) { - infoLog("Not processing the alter partition event {} as it is an older event", - getEventId()); - return; - } - // Ignore the event if this is a trivial event. See javadoc for // isTrivialAlterPartitionEvent() for examples. if (canBeSkipped()) { @@ -2654,6 +2658,12 @@ public class MetastoreEvents { + "parameters which can be ignored."); return; } + + if (isOlderEvent(partitionBefore_)) { + infoLog("Not processing the alter partition event {} as it is an older event", + getEventId()); + return; + } // Reload the whole table if it's a transactional table or materialized view. // Materialized views are treated as a special case because it's possible to // receive partition event on MVs, but they are regular views in Impala. That @@ -3133,13 +3143,19 @@ public class MetastoreEvents { org.apache.impala.catalog.Table tbl = catalog_.getTableNoThrow(dbName_, tblName_); if (tbl == null || tbl instanceof IncompleteTable) { return false; } // Always check the lastRefreshEventId on the table first for table level refresh - if (tbl.getLastRefreshEventId() >= getEventId() - || (reloadPartition_ != null - && catalog_.isPartitionLoadedAfterEvent( - dbName_, tblName_, reloadPartition_, getEventId()))) { - return true; + boolean canSkip = tbl.getLastRefreshEventId() >= getEventId(); + try { + if (!canSkip && reloadPartition_ != null) { + tbl.takeReadLock(); + canSkip = catalog_.isPartitionLoadedAfterEvent(dbName_, tblName_, + reloadPartition_, getEventId()); + } + return canSkip; + } finally { + if (tbl.isReadLockedByCurrentThread()) { + tbl.releaseReadLock(); + } } - return false; } /** diff --git a/tests/custom_cluster/test_events_custom_configs.py b/tests/custom_cluster/test_events_custom_configs.py index b23861622..1deaf24c8 100644 --- a/tests/custom_cluster/test_events_custom_configs.py +++ b/tests/custom_cluster/test_events_custom_configs.py @@ -1357,6 +1357,31 @@ class TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase): EventProcessorUtils.wait_for_event_processing(self) assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + catalogd_args="--enable_reload_events=true " + "--invalidate_metadata_on_event_processing_failure=false") + def test_reload_events_modified_concurrently(self): + """IMPALA-13126: This test verifies that the event processor successfully consumes or + ignores the RELOAD event triggered by refresh operation on a partitioned table if the + partitions are modified concurrently on the table.""" + tbl = "scale_db.num_partitions_1234_blocks_per_partition_1" + refresh_stmt = "refresh {} partition(j=0)".format(tbl) + for _ in range(32): + self.client.execute_async(refresh_stmt) + for _ in range(100): + self.client.execute( + "alter table {} add if not exists partition(j=-1)".format(tbl)) + self.client.execute( + "alter table {} drop partition(j=-1)".format(tbl)) + + try: + EventProcessorUtils.wait_for_event_processing(self, 1000) # bigger timeout required + assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" + finally: + # Make sure the table doesn't change after this test + self.execute_query("alter table {} drop if exists partition(j=-1)".format(tbl)) + @SkipIfFS.hive class TestEventProcessingWithImpala(TestEventProcessingCustomConfigsBase):
