This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 547f96ec04ce2736fbf24d2a7253f549fcbbd167 Author: Sai Hemanth Gantasala <[email protected]> AuthorDate: Tue Aug 13 10:02:35 2024 -0700 IMPALA-12865: Fix wrong lastRefreshEventId set by firing RELOAD events When enable_reload_events is true, catalogd fires RELOAD events after REFRESH finishes reloading the table/partition. The RELOAD event id is also used to update lastRefreshEventId of the table/partition. This is problematic when enable_skipping_older_events is true. HMS events generated after the reload and before the RELOAD event will be skipped. Solution: Fetch the current HMS notification event id before the table/partition is refreshed, and set it as lastRefreshEventId on the metadata object. Testing: - Manually verified the issue is addressed. - Added an end-to-end test which is close to real time issue. Change-Id: I90039da77ec561c5aede44456f88c6650582815b Reviewed-on: http://gerrit.cloudera.org:8080/21665 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- .../org/apache/impala/catalog/HdfsPartition.java | 4 +- .../java/org/apache/impala/catalog/HdfsTable.java | 2 + .../impala/catalog/events/MetastoreEvents.java | 19 +++++++-- .../apache/impala/service/CatalogOpExecutor.java | 39 +++++++++++++++--- .../java/org/apache/impala/util/DebugUtils.java | 6 +++ tests/custom_cluster/test_events_custom_configs.py | 46 ++++++++++++++++++++++ 6 files changed, 106 insertions(+), 10 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java index 6128e27f2..c50a94fbf 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java @@ -971,7 +971,9 @@ public class HdfsPartition extends CatalogObjectImpl implements FeFsPartition { } public Builder setLastRefreshEventId(long eventId) { - lastRefreshEventId_ = eventId; + if (eventId > lastRefreshEventId_) { + lastRefreshEventId_ = eventId; + } return this; } diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java index 9c0e0d03f..ba9e66ec2 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java @@ -1045,6 +1045,8 @@ public class HdfsTable extends Table implements FeFsTable { boolean partitionNotChanged = partBuilder.equalsToOriginal(oldPartition); LOG.trace("Partition {} {}", oldPartition.getName(), partitionNotChanged ? "changed" : "unchanged"); + // for partitioned refresh, partition should be updated whether the partition is + // changed or not. if (partitionNotChanged) return false; HdfsPartition newPartition = partBuilder.build(); // Partition is reloaded and hence cache directives are not dropped. diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java index 23b78500b..2a938c251 100644 --- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java +++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java @@ -1534,6 +1534,8 @@ public class MetastoreEvents { .getCount()); return true; } + DebugUtils.executeDebugAction(BackendConfig.INSTANCE.debugActions(), + DebugUtils.IS_OLDER_EVENT_CHECK_DELAY); // Always check the lastRefreshEventId on the table first for table level refresh boolean canSkip = tbl.getLastRefreshEventId() >= getEventId(); try { @@ -3280,6 +3282,7 @@ public class MetastoreEvents { List<Long> eventIds = new ArrayList<>(); // We treat insert event as a special case since the self-event context for an // insert event is generated differently using the eventIds. + boolean isReloadEvent = baseEvent_ instanceof ReloadEvent; boolean isInsertEvent = baseEvent_ instanceof InsertEvent; for (T event : batchedEvents_) { partitionKeyValues.add( @@ -3288,7 +3291,8 @@ public class MetastoreEvents { eventIds.add(event.getEventId()); } return new SelfEventContext(dbName_, tblName_, partitionKeyValues, - baseEvent_.getPartitionForBatching().getParameters(), + isReloadEvent ? msTbl_.getParameters() : + baseEvent_.getPartitionForBatching().getParameters(), isInsertEvent ? eventIds : null); } } @@ -3483,12 +3487,21 @@ public class MetastoreEvents { @Override public SelfEventContext getSelfEventContext() { - throw new UnsupportedOperationException("Self-event evaluation is unnecessary for" - + " this event type"); + if (reloadPartition_ == null) { + return new SelfEventContext(msTbl_.getDbName(), msTbl_.getTableName(), + msTbl_.getParameters()); + } + return new SelfEventContext(msTbl_.getDbName(), msTbl_.getTableName(), + Arrays.asList(getTPartitionSpecFromHmsPartition(msTbl_, reloadPartition_)), + msTbl_.getParameters()); } @Override public void processTableEvent() throws MetastoreNotificationException { + if (isSelfEvent()) { + infoLog("Not processing the event as it is a self-event"); + return; + } if (isOlderEvent()) { metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC) .inc(getNumberOfEvents()); diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java index 7c72531a7..6c4319136 100644 --- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java @@ -7452,7 +7452,7 @@ public class CatalogOpExecutor { } Preconditions.checkNotNull(tbl, "tbl is null in " + cmdString); // fire event for refresh event and update the last refresh event id - fireReloadEventAndUpdateRefreshEventId(req, tblName, tbl); + fireReloadEventAndUpdateRefreshEventId(req, tblName, tbl, eventId); catalogTimeline.markEvent("Fired reload events in Metastore"); } @@ -7515,7 +7515,7 @@ public class CatalogOpExecutor { * and update the last refresh event id in the cache */ private void fireReloadEventAndUpdateRefreshEventId( - TResetMetadataRequest req, TableName tblName, Table tbl) { + TResetMetadataRequest req, TableName tblName, Table tbl, long currentHMSEventId) { // Partition spec (List<TPartitionKeyValue>) for each partition List<List<TPartitionKeyValue>> partSpecList = null; // Partition values (List<String>) for each partition @@ -7532,10 +7532,18 @@ public class CatalogOpExecutor { .collect(Collectors.toList())) .collect(Collectors.toList()); } + DebugUtils.executeDebugAction( + BackendConfig.INSTANCE.debugActions(), DebugUtils.FIRE_RELOAD_EVENT_DELAY); + long newCatalogVersion = catalog_.incrementAndGetCatalogVersion(); try { + Map<String, String> selfEventProps = new HashMap<>(); + selfEventProps.put(MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(), + catalog_.getCatalogServiceId()); + selfEventProps.put(MetastoreEventPropertyKey.CATALOG_VERSION.getKey(), + String.valueOf(newCatalogVersion)); List<Long> eventIds = MetastoreShim.fireReloadEventHelper( catalog_.getMetaStoreClient(), req.isIs_refresh(), partValsList, - tblName.getDb(), tblName.getTbl(), Collections.emptyMap()); + tblName.getDb(), tblName.getTbl(), selfEventProps); LOG.info("Fired {} RELOAD events for table {}: {}", eventIds.size(), tbl.getFullName(), StringUtils.join(",", eventIds)); // Update the lastRefreshEventId accordingly @@ -7548,6 +7556,10 @@ public class CatalogOpExecutor { } // tbl lock is held at this point. + // It is possible that some operations might have modified the metadata externally + // while refresh operation is still in-progress, so it is safe to set the latest + // HMS notification event id before refresh operation, on the metadata object as + // lastRefreshEventId if (partSpecList != null) { Preconditions.checkNotNull(partValsList); boolean partitionChanged = false; @@ -7555,11 +7567,20 @@ public class CatalogOpExecutor { HdfsTable hdfsTbl = (HdfsTable) tbl; HdfsPartition partition = hdfsTbl .getPartitionFromThriftPartitionSpec(partSpecList.get(i)); + if (currentHMSEventId + 1 == eventIds.get(i)) { + currentHMSEventId = eventIds.get(i); + } if (partition != null) { HdfsPartition.Builder partBuilder = new HdfsPartition.Builder(partition); // use last event id, so that batch partition events will not reloaded again - partBuilder.setLastRefreshEventId(eventIds.get(eventIds.size() - 1)); - partitionChanged |= hdfsTbl.updatePartition(partBuilder); + partBuilder.setLastRefreshEventId(currentHMSEventId); + if (hdfsTbl.updatePartition(partBuilder)) { + partitionChanged = true; + partition = hdfsTbl.getPartitionFromThriftPartitionSpec( + partSpecList.get(i)); + Preconditions.checkNotNull(partition, "Partition is null after update"); + } + partition.addToVersionsForInflightEvents(false, newCatalogVersion); } else { LOG.warn("Partition {} no longer exists in table {}. It might be " + "dropped by a concurrent operation.", @@ -7572,8 +7593,14 @@ public class CatalogOpExecutor { tbl.setCatalogVersion(catalog_.incrementAndGetCatalogVersion()); } } else { - tbl.setLastRefreshEventId(eventIds.get(0)); + if (currentHMSEventId + 1 == eventIds.get(0)) { + currentHMSEventId = eventIds.get(0); + } + tbl.setLastRefreshEventId(currentHMSEventId); + // Add inflight event at table level + catalog_.addVersionsForInflightEvents(false, tbl, newCatalogVersion); } + } catch (TException | CatalogException e) { LOG.error(String.format(HMS_RPC_ERROR_FORMAT_STR, "fireReloadEvent") + e.getMessage()); diff --git a/fe/src/main/java/org/apache/impala/util/DebugUtils.java b/fe/src/main/java/org/apache/impala/util/DebugUtils.java index 5754263d9..b5d402dba 100644 --- a/fe/src/main/java/org/apache/impala/util/DebugUtils.java +++ b/fe/src/main/java/org/apache/impala/util/DebugUtils.java @@ -131,6 +131,12 @@ public class DebugUtils { public static final String RESET_METADATA_LOOP_UNLOCKED = "reset_metadata_loop_unlocked"; + // debug action label to inject a delay when firing reload events + public static final String FIRE_RELOAD_EVENT_DELAY = "fire_reload_event_delay"; + + // debug action label to inject a delay when checking for older event + public static final String IS_OLDER_EVENT_CHECK_DELAY = "older_event_check_delay"; + /** * Returns true if the label of action is set in the debugActions */ diff --git a/tests/custom_cluster/test_events_custom_configs.py b/tests/custom_cluster/test_events_custom_configs.py index d8ced8838..1c2209209 100644 --- a/tests/custom_cluster/test_events_custom_configs.py +++ b/tests/custom_cluster/test_events_custom_configs.py @@ -1298,6 +1298,52 @@ class TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase): unique_database, hive_tbl))) assert data == 0 + @CustomClusterTestSuite.with_args( + catalogd_args="--hms_event_polling_interval_s=1" + " --enable_reload_events=true" + " --debug_actions=fire_reload_event_delay:SLEEP@3000|" + "older_event_check_delay:SLEEP@3000" + " --enable_skipping_older_events=true") + def test_verify_last_refresh_event_id(self, unique_database): + """Test to verify IMPALA-12865 to not skip the events older but not processed by + event processor. Also, the test verifies self-events of reload event.""" + tbl = unique_database + ".partitioned_tbl" + self.client.execute( + "create external table {} (i int) partitioned by (year int)".format(tbl)) + self.client.execute( + "alter table {} add partition(year=2024)".format(tbl)) + EventProcessorUtils.wait_for_event_processing(self) + + def __verify_refresh(verify_self_event=False): + prev_events_skipped = EventProcessorUtils.get_int_metric('events-skipped', 0) + if not verify_self_event: + handle = self.client.execute_async( + "refresh {} partition(year=2024)".format(tbl)) + self.run_stmt_in_hive( + "alter table {} partition(year=2024) set fileformat ORC".format(tbl)) + else: + handle = self.client.execute_async( + "refresh {} partition(year=2024) partition(year=2025)".format(tbl)) + self.run_stmt_in_hive( + "create table {} (i int)".format(unique_database + ".tbl2")) + parts_refreshed_before = EventProcessorUtils.get_int_metric("partitions-refreshed") + self.client.wait_for_impala_state(handle, FINISHED, timeout=10) + assert self.client.is_finished(handle) + EventProcessorUtils.wait_for_event_processing(self, timeout=10) # avoid flakiness + current_events_skipped = EventProcessorUtils.get_int_metric('events-skipped', 0) + parts_refreshed_after = EventProcessorUtils.get_int_metric("partitions-refreshed") + if verify_self_event: + assert parts_refreshed_after == parts_refreshed_before + else: + assert parts_refreshed_after > parts_refreshed_before + assert current_events_skipped > prev_events_skipped + + __verify_refresh(False) + self.client.execute( + "alter table {} add partition(year=2025)".format(tbl)) + EventProcessorUtils.wait_for_event_processing(self, timeout=10) + __verify_refresh(True) + @SkipIf.is_test_jdk @CustomClusterTestSuite.with_args( catalogd_args="--hms_event_polling_interval_s=100",
