This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 1cc9ae42a8970b03a774d9e8e97ed9b4718bf1bf
Author: Venu Reddy <[email protected]>
AuthorDate: Wed Dec 17 01:25:26 2025 +0530

    IMPALA-14628: Fix mismatch between greatest synced event time and last 
synced event time
    
    The greatest synced event id and its corresponding event time
    must match the last synced event id and time once all outstanding
    events have been fully processed.
    
    Previously, for batch events, timestamp of the first event in
    the batch is recorded when transitioning the batch event from
    the in-progress log to the processed log in EventExecutorService.
    As a result, the greatest synced event time incorrectly reflected
    the first event’s timestamp, even though the greatest synced
    event id corresponded to the last event in the batch, leading to
    a mismatch.
    
    This fix ensures that, for batch events, the timestamp of the
    last event in the batch is recorded when the batch is marked as
    processed. Consequently, the greatest synced event time now
    correctly aligns with the last synced event id after all
    outstanding events are processed.
    
    Testing:
        - Added an FE test and ran the existing tests.
    
    Change-Id: I26550441bc0d5e7736c3b900c3e5178103157446
    Reviewed-on: http://gerrit.cloudera.org:8080/23797
    Reviewed-by: Quanlong Huang <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../catalog/events/EventExecutorService.java       |  6 ++---
 .../events/MetastoreEventsProcessorTest.java       | 31 ++++++++++++++++++++++
 2 files changed, 34 insertions(+), 3 deletions(-)

diff --git 
a/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java 
b/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java
index e0e07a323..734ec6f6a 100644
--- 
a/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java
+++ 
b/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java
@@ -482,7 +482,7 @@ public class EventExecutorService {
     if (inProgressLog_.isEmpty() && isDelimiter) {
       // Event is considered implicitly processed and is directly recorded in 
the
       // processed log
-      addToProcessedLog(event.getEventId(), event.getEvent().getEventTime());
+      addToProcessedLog(event.getEventId(), event.getEventTime());
       return;
     }
     // Initialize the event dispatch time. It is used to calculate the time 
taken to
@@ -513,7 +513,7 @@ public class EventExecutorService {
     // dispatched for processing
     event.debugLog("Complete process time: {}",
         PrintUtils.printTimeMs(currentTime - event.getDispatchTime()));
-    processedLog_.put(eventId, (long) event.getEvent().getEventTime());
+    processedLog_.put(eventId, event.getEventTime());
     Preconditions.checkState(!event.isDelimiter());
     // Remove all the subsequent delimiter events until a non-delimiter event 
is
     // encountered
@@ -523,7 +523,7 @@ public class EventExecutorService {
       Map.Entry<Long, MetastoreEvent> entry = it.next();
       event = entry.getValue();
       if (!event.isDelimiter()) break;
-      processedLog_.put(entry.getKey(), (long) 
event.getEvent().getEventTime());
+      processedLog_.put(entry.getKey(), event.getEventTime());
       it.remove();
     }
     LOG.debug("Current count of dispatched events that are being tracked: {} ",
diff --git 
a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
 
b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
index 1043806c5..eaed60b45 100644
--- 
a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
+++ 
b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
@@ -101,6 +101,7 @@ import 
org.apache.impala.catalog.events.MetastoreEventsProcessor.MetaDataFilter;
 import org.apache.impala.catalog.FileMetadataLoadOpts;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.ImpalaException;
+import org.apache.impala.common.Metrics;
 import org.apache.impala.common.Pair;
 import org.apache.impala.common.Reference;
 import org.apache.impala.common.TransactionException;
@@ -4365,6 +4366,36 @@ public class MetastoreEventsProcessorTest {
     }
   }
 
+  @Test
+  public void testLastSyncAndGreatestSyncEventMetrics() throws Exception {
+    String tblName = "test_last_sync_and_greatest_sync_event";
+    createDatabase(TEST_DB_NAME, null);
+    createTable(tblName, true);
+    List<List<String>> partVals =
+        Arrays.asList(
+            Collections.singletonList("1"),
+            Collections.singletonList("2"));
+    addPartitions(TEST_DB_NAME, tblName, partVals);
+    partVals  = Collections.singletonList(Collections.singletonList("1"));
+    alterPartitionsParams(TEST_DB_NAME, tblName, "key", "val", partVals);
+    // HMS notification event time has second-level timestamp granularity. Add 
a delay to
+    // ensure consecutive alter partition events have distinct eventTime 
values.
+    Thread.sleep(1000);
+    partVals  = Collections.singletonList(Collections.singletonList("2"));
+    alterPartitionsParams(TEST_DB_NAME, tblName, "key", "val", partVals);
+    // Both alter partition events are batched and processed together with
+    // BatchPartitionEvent. First alter partition event and the second alter 
partition in
+    // the batch have different notification event time.
+    eventsProcessor_.processEvents();
+    Metrics metrics = eventsProcessor_.getMetrics();
+    assertEquals(
+        
metrics.getGauge(MetastoreEventsProcessor.LAST_SYNCED_ID_METRIC).getValue(),
+        
metrics.getGauge(MetastoreEventsProcessor.GREATEST_SYNCED_EVENT_ID).getValue());
+    assertEquals(
+        
metrics.getGauge(MetastoreEventsProcessor.LAST_SYNCED_EVENT_TIME).getValue(),
+        
metrics.getGauge(MetastoreEventsProcessor.GREATEST_SYNCED_EVENT_TIME).getValue());
+  }
+
   private void createDatabase(String catName, String dbName,
       Map<String, String> params) throws TException {
     try(MetaStoreClient msClient = catalog_.getMetaStoreClient()) {

Reply via email to