This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 68c831176 IMPALA-12460: Add lag and histogram of event processing in 
the log
68c831176 is described below

commit 68c831176b82612f0146887e571554f20ce29c1e
Author: stiga-huang <[email protected]>
AuthorDate: Mon Sep 25 19:44:41 2023 +0800

    IMPALA-12460: Add lag and histogram of event processing in the log
    
    This patch logs the lag of the event processing which is
    (latestEventTime - lastSyncedEventTime) at the end of processing an
    event batch. If the batch is slow to process, we also log the top-10
    expensive events and the top-10 targets that contribute to this. Admins
    can decide whether to disable event processing on some tables.
    
    For table and partition level events, the target name is the table name.
    For db level events, the target name is the db name. For events that
    don't have db/table names, e.g. COMMIT_TXN, the target name is a
    constant string - "CLUSTER_WIDE".
    
    Log parsing example for expensive events:
    -- In shell:
    $ grep 'expensive events' catalogd.INFO > expensive-events.log
    -- In Python:
    import re
    EVENT_PATTERN = re.compile(r"\(type=(\w+), id=(\d+), target=([\w\.]+), 
duration_ms=(\d+)\)")
    re.findall(EVENT_PATTERN, log_line)
    
    Log parsing example for top targets in event processing:
    -- In shell:
    $ grep 'targets in event processing' catalogd.INFO > slow-targets.log
    -- In Python:
    import re
    re.findall(r"\(target=([\w\.]+), duration_ms=(\d+)\)", log_line)
    
    This patch also adds a thread annotation on the event-processor thread
    so we know what event is under processing using the jstacks.
    
    The unit of the event time is second. This patch fixes some variable
    names that show them with ms.
    
    Tests:
     - Manually add some delay in event processing, verify the logs and
       jstacks.
    
    Change-Id: Ib9421b5e26bfa2324217ec9695fbd81636727d22
    Reviewed-on: http://gerrit.cloudera.org:8080/20507
    Reviewed-by: Daniel Becker <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../impala/catalog/events/MetastoreEvents.java     |   6 ++
 .../catalog/events/MetastoreEventsProcessor.java   | 102 ++++++++++++++++-----
 2 files changed, 87 insertions(+), 21 deletions(-)

diff --git 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index 593a8de79..391325bc9 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -498,6 +498,12 @@ public class MetastoreEvents {
 
     public String getTableName() { return tblName_; }
 
+    public String getTargetName() {
+      if (dbName_ == null && tblName_ == null) return "CLUSTER_WIDE";
+      if (tblName_ == null) return dbName_;
+      return dbName_ + "." + tblName_;
+    }
+
     /**
      * Process this event if it is enabled based on the flags on this object
      *
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
index d3debde25..866c264e6 100644
--- 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
+++ 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
@@ -27,8 +27,10 @@ import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -45,6 +47,7 @@ import 
org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
 import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.catalog.CatalogServiceCatalog;
 import org.apache.impala.catalog.Db;
+import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.catalog.Table;
 import org.apache.impala.catalog.IncompleteTable;
@@ -235,13 +238,13 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
   // last synced event id
   public static final String LAST_SYNCED_ID_METRIC = "last-synced-event-id";
   // last synced event time
-  public static final String LAST_SYNCED_EVENT_TIME = 
"last-synced-event-time-ms";
+  public static final String LAST_SYNCED_EVENT_TIME = "last-synced-event-time";
   // latest event id in Hive metastore
   public static final String LATEST_EVENT_ID = "latest-event-id";
   // event time of the latest event in Hive metastore
-  public static final String LATEST_EVENT_TIME = "latest-event-time-ms";
-  // delay(ms) in events processing
-  public static final String EVENT_PROCESSING_DELAY = 
"event-processing-delay-ms";
+  public static final String LATEST_EVENT_TIME = "latest-event-time";
+  // delay(secs) in events processing
+  public static final String EVENT_PROCESSING_DELAY = "event-processing-delay";
   // metric name for number of tables which are refreshed by event processor 
so far
   public static final String NUMBER_OF_TABLE_REFRESHES = "tables-refreshed";
   // number of times events processor refreshed a partition
@@ -513,12 +516,12 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
 
   // keeps track of the last event id which we have synced to
   private final AtomicLong lastSyncedEventId_ = new AtomicLong(-1);
-  private final AtomicLong lastSyncedEventTimeMs_ = new AtomicLong(0);
+  private final AtomicLong lastSyncedEventTimeSecs_ = new AtomicLong(0);
 
   // The event id and eventTime of the latest event in HMS. Only used in 
metrics to show
   // how far we are lagging behind.
   private final AtomicLong latestEventId_ = new AtomicLong(0);
-  private final AtomicLong latestEventTimeMs_ = new AtomicLong(0);
+  private final AtomicLong latestEventTimeSecs_ = new AtomicLong(0);
 
   // The duration in nanoseconds of the processing of the last event batch.
   private final AtomicLong lastEventProcessDurationNs_ = new AtomicLong(0);
@@ -609,11 +612,12 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
     metrics_.addCounter(EVENTS_SKIPPED_METRIC);
     metrics_.addGauge(STATUS_METRIC, (Gauge<String>) () -> 
getStatus().toString());
     metrics_.addGauge(LAST_SYNCED_ID_METRIC, (Gauge<Long>) 
lastSyncedEventId_::get);
-    metrics_.addGauge(LAST_SYNCED_EVENT_TIME, (Gauge<Long>) 
lastSyncedEventTimeMs_::get);
+    metrics_.addGauge(LAST_SYNCED_EVENT_TIME,
+        (Gauge<Long>) lastSyncedEventTimeSecs_::get);
     metrics_.addGauge(LATEST_EVENT_ID, (Gauge<Long>) latestEventId_::get);
-    metrics_.addGauge(LATEST_EVENT_TIME, (Gauge<Long>) 
latestEventTimeMs_::get);
+    metrics_.addGauge(LATEST_EVENT_TIME, (Gauge<Long>) 
latestEventTimeSecs_::get);
     metrics_.addGauge(EVENT_PROCESSING_DELAY,
-        (Gauge<Long>) () -> latestEventTimeMs_.get() - 
lastSyncedEventTimeMs_.get());
+        (Gauge<Long>) () -> latestEventTimeSecs_.get() - 
lastSyncedEventTimeSecs_.get());
     metrics_.addCounter(NUMBER_OF_TABLE_REFRESHES);
     metrics_.addCounter(NUMBER_OF_PARTITION_REFRESHES);
     metrics_.addCounter(NUMBER_OF_TABLES_ADDED);
@@ -887,7 +891,7 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
       // EventProcessor to continue getting new events after HMS is back up.
       LOG.error("Unable to fetch the next batch of metastore events. Hive 
Metastore " +
         "may be unavailable. Will retry.", ex);
-    } catch(MetastoreNotificationNeedsInvalidateException ex) {
+    } catch (MetastoreNotificationNeedsInvalidateException ex) {
       updateStatus(EventProcessorStatus.NEEDS_INVALIDATE);
       String msg = "Event processing needs a invalidate command to resolve the 
state";
       LOG.error(msg, ex);
@@ -933,10 +937,18 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
       if (!eventIter.hasNext()) return;
       NotificationEvent event = eventIter.next();
       Preconditions.checkState(event.getEventId() == currentEventId);
-      LOG.info("Latest event in HMS: id={}, time={}", currentEventId,
-          event.getEventTime());
+      long lastSyncedEventId = lastSyncedEventId_.get();
+      long lastSyncedEventTime = lastSyncedEventTimeSecs_.get();
+      long currentEventTime = event.getEventTime();
       latestEventId_.set(currentEventId);
-      latestEventTimeMs_.set(event.getEventTime());
+      latestEventTimeSecs_.set(currentEventTime);
+      LOG.info("Latest event in HMS: id={}, time={}. Last synced event: id={}, 
time={}.",
+          currentEventId, currentEventTime, lastSyncedEventId, 
lastSyncedEventTime);
+      if (currentEventTime > lastSyncedEventTime) {
+        LOG.warn("Lag: {}. {} events pending to be processed.",
+            PrintUtils.printTimeMs((currentEventTime - lastSyncedEventTime) * 
1000),
+            currentEventId - lastSyncedEventId);
+      }
     } catch (Exception e) {
       LOG.error("Unable to update current notification event id. Last value: 
{}",
           latestEventId_, e);
@@ -955,9 +967,9 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
     eventProcessorMetrics.setLast_synced_event_id(getLastSyncedEventId());
     if (currentStatus != EventProcessorStatus.ACTIVE) return 
eventProcessorMetrics;
     // The following counters are only updated when event-processor is active.
-    
eventProcessorMetrics.setLast_synced_event_time(lastSyncedEventTimeMs_.get());
+    
eventProcessorMetrics.setLast_synced_event_time(lastSyncedEventTimeSecs_.get());
     eventProcessorMetrics.setLatest_event_id(latestEventId_.get());
-    eventProcessorMetrics.setLatest_event_time(latestEventTimeMs_.get());
+    eventProcessorMetrics.setLatest_event_time(latestEventTimeSecs_.get());
 
     long eventsReceived = metrics_.getMeter(EVENTS_RECEIVED_METRIC).getCount();
     long eventsSkipped = metrics_.getCounter(EVENTS_SKIPPED_METRIC).getCount();
@@ -1036,6 +1048,7 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
     if (events.isEmpty()) return;
     final Timer.Context context =
         metrics_.getTimer(EVENTS_PROCESS_DURATION_METRIC).time();
+    Map<MetastoreEvent, Long> eventProcessingTime = new HashMap<>();
     try {
       List<MetastoreEvent> filteredEvents =
           metastoreEventFactory_.getFilteredEvents(events, metrics_);
@@ -1052,10 +1065,18 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
             break;
           }
           currentEvent_ = event.metastoreNotificationEvent_;
-          event.processIfEnabled();
+          String targetName = event.getTargetName();
+          String desc = String.format("Processing %s on %s, eventId=%d",
+              event.getEventType(), targetName, event.getEventId());
+          try (ThreadNameAnnotator tna = new ThreadNameAnnotator(desc)) {
+            long startMs = System.currentTimeMillis();
+            event.processIfEnabled();
+            long elapsedTimeMs = System.currentTimeMillis() - startMs;
+            eventProcessingTime.put(event, elapsedTimeMs);
+          }
           deleteEventLog_.garbageCollect(event.getEventId());
           lastSyncedEventId_.set(event.getEventId());
-          lastSyncedEventTimeMs_.set(event.getEventTime());
+          lastSyncedEventTimeSecs_.set(event.getEventTime());
         }
       }
     } catch (CatalogException e) {
@@ -1063,11 +1084,50 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
           "Unable to process event %d of type %s. Event processing will be 
stopped.",
           currentEvent_.getEventId(), currentEvent_.getEventType()), e);
     } finally {
-      long elapsed_ns = context.stop();
-      lastEventProcessDurationNs_.set(elapsed_ns);
-      LOG.info("Time elapsed in processing event batch: {}",
-          PrintUtils.printTimeNs(elapsed_ns));
+      long elapsedNs = context.stop();
+      lastEventProcessDurationNs_.set(elapsedNs);
+      logEventMetrics(eventProcessingTime, elapsedNs);
+    }
+  }
+
+  private void logEventMetrics(Map<MetastoreEvent, Long> eventProcessingTime,
+      long elapsedNs) {
+    LOG.info("Time elapsed in processing event batch: {}",
+        PrintUtils.printTimeNs(elapsedNs));
+    // Only log the metrics when the processing on this batch is slow.
+    if (elapsedNs < HdfsTable.LOADING_WARNING_TIME_NS) return;
+    // Get the top-10 expensive events
+    List<Map.Entry<MetastoreEvent, Long>> eventList =
+        new ArrayList<>(eventProcessingTime.entrySet());
+    eventList.sort(Map.Entry.<MetastoreEvent, 
Long>comparingByValue().reversed());
+    int num = Math.min(10, eventList.size());
+    StringBuilder report = new StringBuilder("Top " + num + " expensive 
events: ");
+    for (Map.Entry<MetastoreEvent, Long> entry : eventList.subList(0, num)) {
+      MetastoreEvent event = entry.getKey();
+      long durationMs = entry.getValue();
+      report.append(String.format("(type=%s, id=%s, target=%s, duration_ms=%d) 
",
+          event.getEventType(), event.getEventId(), event.getTargetName(), 
durationMs));
+    }
+    // Get the top-10 expensive targets
+    Map<String, Long> durationPerTable = new HashMap<>();
+    for (MetastoreEvent event : eventProcessingTime.keySet()) {
+      String targetName = event.getTargetName();
+      long durationMs = durationPerTable.getOrDefault(targetName, 0L) +
+          eventProcessingTime.get(event);
+      durationPerTable.put(targetName, durationMs);
+    }
+    List<Map.Entry<String, Long>> targetList =
+        new ArrayList<>(durationPerTable.entrySet());
+    targetList.sort(Map.Entry.<String, Long>comparingByValue().reversed());
+    num = Math.min(10, targetList.size());
+    report.append("\nTop ").append(num).append(" targets in event processing: 
");
+    for (Map.Entry<String, Long> entry : targetList.subList(0, num)) {
+      String targetName = entry.getKey();
+      long durationMs = entry.getValue();
+      report.append(String.format("(target=%s, duration_ms=%d) ",
+          targetName, durationMs));
     }
+    LOG.warn(report.toString());
   }
 
   /**

Reply via email to