This is an automated email from the ASF dual-hosted git repository. joemcdonnell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 1cc9ae42a8970b03a774d9e8e97ed9b4718bf1bf Author: Venu Reddy <[email protected]> AuthorDate: Wed Dec 17 01:25:26 2025 +0530 IMPALA-14628: Fix mismatch between greatest synced event time and last synced event time The greatest synced event id and its corresponding event time must match the last synced event id and time once all outstanding events have been fully processed. Previously, for batch events, timestamp of the first event in the batch is recorded when transitioning the batch event from the in-progress log to the processed log in EventExecutorService. As a result, the greatest synced event time incorrectly reflected the first event’s timestamp, even though the greatest synced event id corresponded to the last event in the batch, leading to a mismatch. This fix ensures that, for batch events, the timestamp of the last event in the batch is recorded when the batch is marked as processed. Consequently, the greatest synced event time now correctly aligns with the last synced event id after all outstanding events are processed. Testing: - Added an FE test and ran the existing tests. Change-Id: I26550441bc0d5e7736c3b900c3e5178103157446 Reviewed-on: http://gerrit.cloudera.org:8080/23797 Reviewed-by: Quanlong Huang <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- .../catalog/events/EventExecutorService.java | 6 ++--- .../events/MetastoreEventsProcessorTest.java | 31 ++++++++++++++++++++++ 2 files changed, 34 insertions(+), 3 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java b/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java index e0e07a323..734ec6f6a 100644 --- a/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java +++ b/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java @@ -482,7 +482,7 @@ public class EventExecutorService { if (inProgressLog_.isEmpty() && isDelimiter) { // Event is considered implicitly processed and is directly recorded in the // processed log - addToProcessedLog(event.getEventId(), event.getEvent().getEventTime()); + addToProcessedLog(event.getEventId(), event.getEventTime()); return; } // Initialize the event dispatch time. It is used to calculate the time taken to @@ -513,7 +513,7 @@ public class EventExecutorService { // dispatched for processing event.debugLog("Complete process time: {}", PrintUtils.printTimeMs(currentTime - event.getDispatchTime())); - processedLog_.put(eventId, (long) event.getEvent().getEventTime()); + processedLog_.put(eventId, event.getEventTime()); Preconditions.checkState(!event.isDelimiter()); // Remove all the subsequent delimiter events until a non-delimiter event is // encountered @@ -523,7 +523,7 @@ public class EventExecutorService { Map.Entry<Long, MetastoreEvent> entry = it.next(); event = entry.getValue(); if (!event.isDelimiter()) break; - processedLog_.put(entry.getKey(), (long) event.getEvent().getEventTime()); + processedLog_.put(entry.getKey(), event.getEventTime()); it.remove(); } LOG.debug("Current count of dispatched events that are being tracked: {} ", diff --git a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java index 1043806c5..eaed60b45 100644 --- a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java +++ b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java @@ -101,6 +101,7 @@ import org.apache.impala.catalog.events.MetastoreEventsProcessor.MetaDataFilter; import org.apache.impala.catalog.FileMetadataLoadOpts; import org.apache.impala.common.FileSystemUtil; import org.apache.impala.common.ImpalaException; +import org.apache.impala.common.Metrics; import org.apache.impala.common.Pair; import org.apache.impala.common.Reference; import org.apache.impala.common.TransactionException; @@ -4365,6 +4366,36 @@ public class MetastoreEventsProcessorTest { } } + @Test + public void testLastSyncAndGreatestSyncEventMetrics() throws Exception { + String tblName = "test_last_sync_and_greatest_sync_event"; + createDatabase(TEST_DB_NAME, null); + createTable(tblName, true); + List<List<String>> partVals = + Arrays.asList( + Collections.singletonList("1"), + Collections.singletonList("2")); + addPartitions(TEST_DB_NAME, tblName, partVals); + partVals = Collections.singletonList(Collections.singletonList("1")); + alterPartitionsParams(TEST_DB_NAME, tblName, "key", "val", partVals); + // HMS notification event time has second-level timestamp granularity. Add a delay to + // ensure consecutive alter partition events have distinct eventTime values. + Thread.sleep(1000); + partVals = Collections.singletonList(Collections.singletonList("2")); + alterPartitionsParams(TEST_DB_NAME, tblName, "key", "val", partVals); + // Both alter partition events are batched and processed together with + // BatchPartitionEvent. First alter partition event and the second alter partition in + // the batch have different notification event time. + eventsProcessor_.processEvents(); + Metrics metrics = eventsProcessor_.getMetrics(); + assertEquals( + metrics.getGauge(MetastoreEventsProcessor.LAST_SYNCED_ID_METRIC).getValue(), + metrics.getGauge(MetastoreEventsProcessor.GREATEST_SYNCED_EVENT_ID).getValue()); + assertEquals( + metrics.getGauge(MetastoreEventsProcessor.LAST_SYNCED_EVENT_TIME).getValue(), + metrics.getGauge(MetastoreEventsProcessor.GREATEST_SYNCED_EVENT_TIME).getValue()); + } + private void createDatabase(String catName, String dbName, Map<String, String> params) throws TException { try(MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
