This is an automated email from the ASF dual-hosted git repository.

laszlog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 5add447a9c97c32cd0938c76813f1780c3308ef0
Author: stiga-huang <[email protected]>
AuthorDate: Mon Nov 27 20:37:03 2023 +0800

    IMPALA-12577: Correctly update last-synced-event-time
    
    There are two metrics tracking the progress of HMS event processing:
    last-synced-event-id and last-synced-event-time. We have several places
    that only update last-synced-event-id without updating
    last-synced-event-time which is used to calculate the lag of event
    processing. Users might set up alerts on the lag so we'd better keep it
    updated.
    
    Tests
     - Add code to verify the metrics in MetastoreEventsProcessorTest
     - Fix a potential NullPointer error in HiveJdbcClientPool.java
     - Ran CORE test
    
    Change-Id: I033ace83221bc5e0c84c19aac78a41c1eeda31f2
    Reviewed-on: http://gerrit.cloudera.org:8080/20732
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../catalog/events/MetastoreEventsProcessor.java   | 53 ++++++++++++++++++----
 .../SynchronousHMSEventProcessorForTests.java      | 27 +++++++++--
 .../apache/impala/testutil/HiveJdbcClientPool.java |  6 ++-
 3 files changed, 71 insertions(+), 15 deletions(-)

diff --git 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
index 87650d702..f066c7078 100644
--- 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
+++ 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
@@ -569,6 +569,7 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
     this.catalog_ = Preconditions.checkNotNull(catalogOpExecutor.getCatalog());
     validateConfigs();
     lastSyncedEventId_.set(startSyncFromId);
+    lastSyncedEventTimeSecs_.set(getEventTimeFromHMS(startSyncFromId));
     initMetrics();
     metastoreEventFactory_ = new MetastoreEventFactory(catalogOpExecutor);
     pollingFrequencyInSec_ = pollingFrequencyInSec;
@@ -757,6 +758,41 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
     return latestEventId;
   }
 
+  /**
+   * Fetch the event from HMS specified by 'eventId'
+   * @return null if the event has been cleaned up or any error occurs.
+   */
+  private NotificationEvent getEventFromHMS(MetaStoreClient msClient, long 
eventId) {
+    NotificationEventRequest eventRequest = new NotificationEventRequest();
+    eventRequest.setLastEvent(eventId - 1);
+    eventRequest.setMaxEvents(1);
+    try {
+      NotificationEventResponse response = MetastoreShim.getNextNotification(
+          msClient.getHiveClient(), eventRequest, false);
+      Iterator<NotificationEvent> eventIter = response.getEventsIterator();
+      if (!eventIter.hasNext()) {
+        LOG.warn("Unable to fetch event {}. It has been cleaned up", eventId);
+        return null;
+      }
+      return eventIter.next();
+    } catch (TException e) {
+      LOG.warn("Unable to fetch event {}", eventId, e);
+    }
+    return null;
+  }
+
+  /**
+   * Get the event time by fetching the specified event from HMS.
+   * @return 0 if the event has been cleaned up or any error occurs.
+   */
+  private int getEventTimeFromHMS(long eventId) {
+    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+      NotificationEvent event = getEventFromHMS(msClient, eventId);
+      if (event != null) return event.getEventTime();
+    }
+    return 0;
+  }
+
   /**
    * Starts the event processor from a given event id
    */
@@ -775,6 +811,7 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
       }
     }
     lastSyncedEventId_.set(fromEventId);
+    lastSyncedEventTimeSecs_.set(getEventTimeFromHMS(fromEventId));
     updateStatus(EventProcessorStatus.ACTIVE);
     LOG.info(String.format(
         "Metastore event processing restarted. Last synced event id was 
updated "
@@ -964,16 +1001,9 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
         return;
       }
       // Fetch the last event to get its eventTime.
-      NotificationEventRequest eventRequest = new NotificationEventRequest();
-      eventRequest.setLastEvent(currentEventId - 1);
-      eventRequest.setMaxEvents(1);
-      NotificationEventResponse response = MetastoreShim.getNextNotification(
-          msClient.getHiveClient(), eventRequest, false);
-      Iterator<NotificationEvent> eventIter = response.getEventsIterator();
+      NotificationEvent event = getEventFromHMS(msClient, currentEventId);
       // Events could be empty if they are just cleaned up.
-      if (!eventIter.hasNext()) return;
-      NotificationEvent event = eventIter.next();
-      Preconditions.checkState(event.getEventId() == currentEventId);
+      if (event == null) return;
       long lastSyncedEventId = lastSyncedEventId_.get();
       long lastSyncedEventTime = lastSyncedEventTimeSecs_.get();
       long currentEventTime = event.getEventTime();
@@ -1090,6 +1120,7 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
         // Possible to receive empty list due to event skip list in 
notification event
         // request. Update the last synced event id with current event id on 
metastore
         lastSyncedEventId_.set(currentEventId);
+        lastSyncedEventTimeSecs_.set(getEventTimeFromHMS(currentEventId));
       }
       return;
     }
@@ -1100,7 +1131,9 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
       List<MetastoreEvent> filteredEvents =
           metastoreEventFactory_.getFilteredEvents(events, metrics_);
       if (filteredEvents.isEmpty()) {
-        lastSyncedEventId_.set(events.get(events.size() - 1).getEventId());
+        NotificationEvent e = events.get(events.size() - 1);
+        lastSyncedEventId_.set(e.getEventId());
+        lastSyncedEventTimeSecs_.set(e.getEventTime());
         return;
       }
       for (MetastoreEvent event : filteredEvents) {
diff --git 
a/fe/src/test/java/org/apache/impala/catalog/events/SynchronousHMSEventProcessorForTests.java
 
b/fe/src/test/java/org/apache/impala/catalog/events/SynchronousHMSEventProcessorForTests.java
index a82398105..d121be01d 100644
--- 
a/fe/src/test/java/org/apache/impala/catalog/events/SynchronousHMSEventProcessorForTests.java
+++ 
b/fe/src/test/java/org/apache/impala/catalog/events/SynchronousHMSEventProcessorForTests.java
@@ -17,12 +17,10 @@
 
 package org.apache.impala.catalog.events;
 
-import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
-import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
 import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.common.Metrics;
 import org.apache.impala.service.CatalogOpExecutor;
+import org.junit.Assert;
 
 /**
  * A test MetastoreEventProcessor which executes in the same thread. Useful 
for testing
@@ -45,4 +43,27 @@ public class SynchronousHMSEventProcessorForTests extends 
MetastoreEventsProcess
   public void startScheduler() {
     // nothing to do here; there is no background thread for this processor
   }
+
+  @Override
+  public void processEvents() {
+    super.processEvents();
+    super.updateLatestEventId();
+    verifyEventSyncedMetrics();
+  }
+
+  private void verifyEventSyncedMetrics() {
+    Metrics metrics = getMetrics();
+    long lastSyncedEventId = (Long) metrics.getGauge(
+        MetastoreEventsProcessor.LAST_SYNCED_ID_METRIC).getValue();
+    long latestEventId = (Long) metrics.getGauge(
+        MetastoreEventsProcessor.LATEST_EVENT_ID).getValue();
+    long lastSyncedEventTime = (Long) metrics.getGauge(
+        MetastoreEventsProcessor.LAST_SYNCED_EVENT_TIME).getValue();
+    long latestEventTime = (Long) metrics.getGauge(
+        MetastoreEventsProcessor.LATEST_EVENT_TIME).getValue();
+    if (lastSyncedEventId == latestEventId) {
+      Assert.assertEquals("Incorrect last synced event time for event " + 
latestEventId,
+          latestEventTime, lastSyncedEventTime);
+    }
+  }
 }
diff --git 
a/fe/src/test/java/org/apache/impala/testutil/HiveJdbcClientPool.java 
b/fe/src/test/java/org/apache/impala/testutil/HiveJdbcClientPool.java
index ff22a8887..da6386d58 100644
--- a/fe/src/test/java/org/apache/impala/testutil/HiveJdbcClientPool.java
+++ b/fe/src/test/java/org/apache/impala/testutil/HiveJdbcClientPool.java
@@ -152,8 +152,10 @@ public class HiveJdbcClientPool implements Closeable {
     while (closedCount > 0) {
       try {
         HiveJdbcClient client = freeClients_.poll(5 * 60, TimeUnit.SECONDS);
-        if (client.stmt_ != null) { client.stmt_.close(); }
-        if (client.conn_ != null) { client.conn_.close(); }
+        if (client != null) {
+          if (client.stmt_ != null) { client.stmt_.close(); }
+          if (client.conn_ != null) { client.conn_.close(); }
+        }
         closedCount--;
       } catch (Exception e) {
         throw new RuntimeException(e);

Reply via email to