This is an automated email from the ASF dual-hosted git repository.

dbecker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit effc9df933b46eb5b0acf55a858606415425505f
Author: stiga-huang <[email protected]>
AuthorDate: Fri Feb 2 20:22:29 2024 +0800

    IMPALA-12782: Show info of the event processing in /events webUI
    
    The /events page of catalogd shows the metrics and status of the
    event-processor. This patch adds more info in this page, including
     - lag info
     - current event batch that's being processed
    See the screenshot attached in the JIRA for how it looks like.
    
    Also moves the error message to the top to highlight the error status.
    Fixes the issue of not updating latest event id when event processor is
    stopped. Also fixes the issue of error message not cleared after global
    INVALIDATE METADATA.
    
    Adds a debug action, catalogd_event_processing_delay, to inject a sleep
    while processing an event. So the web page can be captured more easily.
    
    Also adds a missing test for showing the error message of
    event-processing in the /events page.
    
    Tests:
     - Add e2e test to verify the content of the page.
    
    Change-Id: I2e7d4952c7fd04ae89b6751204499bf9dd99f57c
    Reviewed-on: http://gerrit.cloudera.org:8080/20986
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/catalog/catalog-server.cc                   | 79 +++++++++++++++++++---
 be/src/util/json-util.h                            | 21 ++++++
 common/thrift/JniCatalog.thrift                    | 25 +++++++
 .../impala/catalog/events/MetastoreEvents.java     |  2 +
 .../catalog/events/MetastoreEventsProcessor.java   | 70 ++++++++++++++++---
 .../java/org/apache/impala/util/DebugUtils.java    |  3 +
 tests/custom_cluster/test_web_pages.py             | 66 ++++++++++++++++++
 www/events.tmpl                                    | 65 +++++++++++++++++-
 8 files changed, 311 insertions(+), 20 deletions(-)

diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index 5f74a772c..9f4d2213e 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -30,6 +30,7 @@
 #include "util/collection-metrics.h"
 #include "util/debug-util.h"
 #include "util/event-metrics.h"
+#include "util/json-util.h"
 #include "util/logging-support.h"
 #include "util/metrics.h"
 #include "util/pretty-printer.h"
@@ -957,24 +958,84 @@ void CatalogServer::GetCatalogUsage(Document* document) {
 
 void CatalogServer::EventMetricsUrlCallback(
     const Webserver::WebRequest& req, Document* document) {
+  auto& allocator = document->GetAllocator();
   TEventProcessorMetricsSummaryResponse event_processor_summary_response;
   Status status = 
catalog_->GetEventProcessorSummary(&event_processor_summary_response);
   if (!status.ok()) {
-    Value error(status.GetDetail().c_str(), document->GetAllocator());
-    document->AddMember("error", error, document->GetAllocator());
+    Value error(status.GetDetail().c_str(), allocator);
+    document->AddMember("error", error, allocator);
     return;
   }
 
   Value event_processor_summary(
-      event_processor_summary_response.summary.c_str(), 
document->GetAllocator());
-  document->AddMember(
-      "event_processor_metrics", event_processor_summary, 
document->GetAllocator());
+      event_processor_summary_response.summary.c_str(), allocator);
+  document->AddMember("event_processor_metrics", event_processor_summary, 
allocator);
   if (event_processor_summary_response.__isset.error_msg) {
-    Value error_msg(
-        event_processor_summary_response.error_msg.c_str(), 
document->GetAllocator());
-    document->AddMember(
-        "event_processor_error_msg", error_msg, document->GetAllocator());
+    Value error_msg(event_processor_summary_response.error_msg.c_str(), 
allocator);
+    document->AddMember("event_processor_error_msg", error_msg, allocator);
+  }
+  const TEventBatchProgressInfo& progress_info =
+      event_processor_summary_response.progress;
+  JsonObjWrapper progress_info_obj(allocator);
+  // Add lag info
+  progress_info_obj.AddMember("last_synced_event_id", 
progress_info.last_synced_event_id);
+  progress_info_obj.AddMember("last_synced_event_time_s",
+      progress_info.last_synced_event_time_s);
+  progress_info_obj.AddMember("latest_event_id", 
progress_info.latest_event_id);
+  progress_info_obj.AddMember("latest_event_time_s", 
progress_info.latest_event_time_s);
+  int64_t lag_time = max(0L,
+      progress_info.latest_event_time_s - 
progress_info.last_synced_event_time_s);
+  progress_info_obj.AddMember("lag_time",
+      PrettyPrinter::Print(lag_time, TUnit::TIME_S));
+  progress_info_obj.AddMember("last_synced_event_time",
+      ToStringFromUnix(progress_info.last_synced_event_time_s));
+  progress_info_obj.AddMember("latest_event_time",
+      ToStringFromUnix(progress_info.latest_event_time_s));
+  // Add current batch info
+  if (progress_info.num_hms_events > 0) {
+    int progress = 0;
+    if (progress_info.num_filtered_events > 0) {
+      progress =
+          100 * progress_info.current_event_index / 
progress_info.num_filtered_events;
+    }
+    int64_t now_ms = UnixMillis();
+    int64_t elapsed_ms = max(0L, now_ms - 
progress_info.current_batch_start_time_ms);
+    progress_info_obj.AddMember("num_hms_events", 
progress_info.num_hms_events);
+    progress_info_obj.AddMember("num_filtered_events", 
progress_info.num_filtered_events);
+    progress_info_obj.AddMember("num_synced_events", 
progress_info.current_event_index);
+    progress_info_obj.AddMember("synced_percent", progress);
+    progress_info_obj.AddMember("min_event_id", progress_info.min_event_id);
+    progress_info_obj.AddMember("max_event_id", progress_info.max_event_id);
+    progress_info_obj.AddMember("min_event_time",
+        ToStringFromUnix(progress_info.min_event_time_s));
+    progress_info_obj.AddMember("max_event_time",
+        ToStringFromUnix(progress_info.max_event_time_s));
+    progress_info_obj.AddMember("start_time",
+        ToStringFromUnixMillis(progress_info.current_batch_start_time_ms));
+    progress_info_obj.AddMember("elapsed_time",
+        PrettyPrinter::Print(elapsed_ms, TUnit::TIME_MS));
+    progress_info_obj.AddMember("start_time_of_event",
+        ToStringFromUnixMillis(progress_info.current_event_start_time_ms));
+    progress_info_obj.AddMember("elapsed_time_current_event",
+        PrettyPrinter::Print(max(0L,
+            now_ms - progress_info.current_event_start_time_ms), 
TUnit::TIME_MS));
+    if (progress_info.__isset.current_event) {
+      JsonObjWrapper current_event(allocator);
+      current_event.AddMember("event_id", progress_info.current_event.eventId);
+      current_event.AddMember("event_time",
+          ToStringFromUnix(progress_info.current_event.eventTime));
+      current_event.AddMember("event_type", 
progress_info.current_event.eventType);
+      current_event.AddMember("cat_name", progress_info.current_event.catName);
+      current_event.AddMember("db_name", progress_info.current_event.dbName);
+      current_event.AddMember("tbl_name", 
progress_info.current_event.tableName);
+      progress_info_obj.value.AddMember("current_event", current_event.value, 
allocator);
+    }
+    if (progress_info.current_event_batch_size > 1) {
+      progress_info_obj.AddMember("current_event_batch_size",
+          progress_info.current_event_batch_size);
+    }
   }
+  document->AddMember("progress-info", progress_info_obj.value, allocator);
 }
 
 void CatalogServer::CatalogObjectsUrlCallback(const Webserver::WebRequest& req,
diff --git a/be/src/util/json-util.h b/be/src/util/json-util.h
index deabe97c0..cbccb7b04 100644
--- a/be/src/util/json-util.h
+++ b/be/src/util/json-util.h
@@ -74,4 +74,25 @@ void ProtobufToJson(const google::protobuf::Message& pb, 
rapidjson::Document* do
     rapidjson::Value* obj);
 } // namespace impala
 
+/// A wrapper for creating a rapidjson::Value with new fields.
+struct JsonObjWrapper {
+  JsonObjWrapper(rapidjson::MemoryPoolAllocator<>& alloc):
+    value(rapidjson::kObjectType), allocator(alloc) {}
+
+  template<typename T>
+  void AddMember(const char* name, const T& val) {
+    rapidjson::Value field(val);
+    value.AddMember(rapidjson::StringRef(name), field, allocator);
+  }
+
+  rapidjson::Value value;
+  rapidjson::MemoryPoolAllocator<>& allocator;
+};
+
+template<>
+inline void JsonObjWrapper::AddMember(const char* name, const std::string& 
val) {
+  rapidjson::Value field(val.c_str(), allocator);
+  value.AddMember(rapidjson::StringRef(name), field, allocator);
+}
+
 #endif
diff --git a/common/thrift/JniCatalog.thrift b/common/thrift/JniCatalog.thrift
index 9f2e8139e..369de115a 100755
--- a/common/thrift/JniCatalog.thrift
+++ b/common/thrift/JniCatalog.thrift
@@ -1065,10 +1065,35 @@ struct TCopyTestCaseReq {
   1: required string input_path
 }
 
+struct TEventBatchProgressInfo {
+  // Number of original HMS events received in the current batch.
+  1: required i32 num_hms_events
+  // Number of filtered MetastoreEvents generated from the original HMS events.
+  2: required i32 num_filtered_events
+  3: required i32 current_event_index
+  // Number of HMS events represented by this filtered event. For most events 
this is 1.
+  // In case of BatchPartitionEvent this could be more than 1.
+  4: required i32 current_event_batch_size
+  5: required i64 min_event_id
+  6: required i64 min_event_time_s
+  7: required i64 max_event_id
+  8: required i64 max_event_time_s
+  // Timestamp when we start to process the current event batch
+  9: required i64 current_batch_start_time_ms
+  // Timestamp when we start to process the current event
+  10: required i64 current_event_start_time_ms
+  11: required i64 last_synced_event_id
+  12: required i64 last_synced_event_time_s
+  13: required i64 latest_event_id
+  14: required i64 latest_event_time_s
+  15: optional hive_metastore.NotificationEvent current_event
+}
+
 struct TEventProcessorMetricsSummaryResponse {
   // summary view of the events processor which can include status,
   // metrics and other details
   1: required string summary
   // Error messages if the events processor goes into ERROR/NEEDS_INVALIDATE 
states
   2: optional string error_msg
+  3: optional TEventBatchProgressInfo progress
 }
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index 2bb6d8682..bf861957f 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -693,6 +693,8 @@ public class MetastoreEvents {
           return;
         }
       }
+      DebugUtils.executeDebugAction(
+          BackendConfig.INSTANCE.debugActions(), 
DebugUtils.EVENT_PROCESSING_DELAY);
       process();
       injectErrorIfNeeded();
     }
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
index 09b931326..e6a683a3c 100644
--- 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
+++ 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
@@ -61,6 +61,7 @@ import org.apache.impala.common.PrintUtils;
 import org.apache.impala.compat.MetastoreShim;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.service.CatalogOpExecutor;
+import org.apache.impala.thrift.TEventBatchProgressInfo;
 import org.apache.impala.thrift.TEventProcessorMetrics;
 import org.apache.impala.thrift.TEventProcessorMetricsSummaryResponse;
 import org.apache.impala.util.MetaStoreUtil;
@@ -525,6 +526,12 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
 
   // keeps track of the current event that we are processing
   private NotificationEvent currentEvent_;
+  private List<NotificationEvent> currentEventBatch_;
+  private MetastoreEvent currentFilteredEvent_;
+  private List<MetastoreEvent> currentFilteredEvents_;
+  private long currentBatchStartTimeMs_ = 0;
+  private long currentEventStartTimeMs_ = 0;
+  private int currentEventIndex_ = 0;
 
   // keeps track of the last event id which we have synced to
   private final AtomicLong lastSyncedEventId_ = new AtomicLong(-1);
@@ -667,6 +674,7 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
   @Override
   public synchronized void start() {
     Preconditions.checkState(eventProcessorStatus_ != 
EventProcessorStatus.ACTIVE);
+    resetProgressInfo();
     startScheduler();
     updateStatus(EventProcessorStatus.ACTIVE);
     LOG.info(String.format("Successfully started metastore event processing."
@@ -822,6 +830,9 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
         return;
       }
     }
+    // Clear the error message of the last failure and reset the progress info
+    eventProcessorErrorMsg_ = null;
+    resetProgressInfo();
     lastSyncedEventId_.set(fromEventId);
     lastSyncedEventTimeSecs_.set(getEventTimeFromHMS(fromEventId));
     updateStatus(EventProcessorStatus.ACTIVE);
@@ -955,7 +966,6 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
    */
   @Override
   public void processEvents() {
-    currentEvent_ = null;
     try {
       EventProcessorStatus currentStatus = eventProcessorStatus_;
       if (currentStatus != EventProcessorStatus.ACTIVE) {
@@ -1025,7 +1035,7 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
   @VisibleForTesting
   public void updateLatestEventId() {
     EventProcessorStatus currentStatus = eventProcessorStatus_;
-    if (currentStatus != EventProcessorStatus.ACTIVE) {
+    if (currentStatus == EventProcessorStatus.DISABLED) {
       return;
     }
     try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
@@ -1130,6 +1140,35 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
     if (eventProcessorErrorMsg_ != null) {
       summaryResponse.setError_msg(eventProcessorErrorMsg_);
     }
+    TEventBatchProgressInfo progressInfo = new TEventBatchProgressInfo();
+    progressInfo.last_synced_event_id = lastSyncedEventId_.get();
+    progressInfo.last_synced_event_time_s = lastSyncedEventTimeSecs_.get();
+    progressInfo.latest_event_id = latestEventId_.get();
+    progressInfo.latest_event_time_s = latestEventTimeSecs_.get();
+    // Assign these lists to local variables in case they are replaced 
concurrently.
+    // It's best effort to make the members in 'progressInfo' consistent but 
we can't
+    // guarantee it.
+    List<NotificationEvent> eventBatch = currentEventBatch_;
+    List<MetastoreEvent> filteredEvents = currentFilteredEvents_;
+    if (eventBatch != null && !eventBatch.isEmpty()) {
+      int numHmsEvents = eventBatch.size();
+      progressInfo.num_hms_events = numHmsEvents;
+      progressInfo.min_event_id = eventBatch.get(0).getEventId();
+      progressInfo.min_event_time_s = eventBatch.get(0).getEventTime();
+      NotificationEvent lastEvent = eventBatch.get(numHmsEvents - 1);
+      progressInfo.max_event_id = lastEvent.getEventId();
+      progressInfo.max_event_time_s = lastEvent.getEventTime();
+      progressInfo.current_batch_start_time_ms = currentBatchStartTimeMs_;
+      progressInfo.current_event_start_time_ms = currentEventStartTimeMs_;
+      if (filteredEvents != null) {
+        progressInfo.num_filtered_events = filteredEvents.size();
+      }
+      progressInfo.current_event_index = currentEventIndex_;
+      progressInfo.current_event_batch_size = currentFilteredEvent_ != null ?
+          currentFilteredEvent_.getNumberOfEvents() : 0;
+      progressInfo.current_event = currentEvent_;
+    }
+    summaryResponse.setProgress(progressInfo);
     return summaryResponse;
   }
 
@@ -1138,6 +1177,16 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
     return metrics_;
   }
 
+  private void resetProgressInfo() {
+    currentEvent_ = null;
+    currentEventBatch_ = null;
+    currentFilteredEvent_ = null;
+    currentFilteredEvents_ = null;
+    currentBatchStartTimeMs_ = 0;
+    currentEventStartTimeMs_ = 0;
+    currentEventIndex_ = 0;
+  }
+
   /**
    * Process the given list of notification events. Useful for tests which 
provide a list
    * of events
@@ -1148,7 +1197,7 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
   @VisibleForTesting
   protected void processEvents(long currentEventId, List<NotificationEvent> 
events)
       throws MetastoreNotificationException {
-    currentEvent_ = null;
+    currentEventBatch_ = events;
     // update the events received metric before returning
     metrics_.getMeter(EVENTS_RECEIVED_METRIC).mark(events.size());
     if (events.isEmpty()) {
@@ -1162,17 +1211,19 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
     }
     final Timer.Context context =
         metrics_.getTimer(EVENTS_PROCESS_DURATION_METRIC).time();
+    currentBatchStartTimeMs_ = System.currentTimeMillis();
     Map<MetastoreEvent, Long> eventProcessingTime = new HashMap<>();
     try {
-      List<MetastoreEvent> filteredEvents =
+      currentFilteredEvents_ =
           metastoreEventFactory_.getFilteredEvents(events, metrics_);
-      if (filteredEvents.isEmpty()) {
+      if (currentFilteredEvents_.isEmpty()) {
         NotificationEvent e = events.get(events.size() - 1);
         lastSyncedEventId_.set(e.getEventId());
         lastSyncedEventTimeSecs_.set(e.getEventTime());
+        resetProgressInfo();
         return;
       }
-      for (MetastoreEvent event : filteredEvents) {
+      for (MetastoreEvent event : currentFilteredEvents_) {
         // synchronizing each event processing reduces the scope of the lock 
so the a
         // potential reset() during event processing is not blocked for longer 
than
         // necessary
@@ -1181,13 +1232,14 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
             break;
           }
           currentEvent_ = event.metastoreNotificationEvent_;
+          currentFilteredEvent_ = event;
           String targetName = event.getTargetName();
           String desc = String.format("Processing %s on %s, eventId=%d",
               event.getEventType(), targetName, event.getEventId());
           try (ThreadNameAnnotator tna = new ThreadNameAnnotator(desc)) {
-            long startMs = System.currentTimeMillis();
+            currentEventStartTimeMs_ = System.currentTimeMillis();
             event.processIfEnabled();
-            long elapsedTimeMs = System.currentTimeMillis() - startMs;
+            long elapsedTimeMs = System.currentTimeMillis() - 
currentEventStartTimeMs_;
             eventProcessingTime.put(event, elapsedTimeMs);
           } catch (Exception processingEx) {
             try {
@@ -1200,6 +1252,7 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
               throw processingEx;
             }
           }
+          currentEventIndex_++;
           deleteEventLog_.garbageCollect(event.getEventId());
           lastSyncedEventId_.set(event.getEventId());
           lastSyncedEventTimeSecs_.set(event.getEventTime());
@@ -1208,6 +1261,7 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
                   TimeUnit.SECONDS);
         }
       }
+      resetProgressInfo();
     } catch (CatalogException e) {
       throw new MetastoreNotificationException(String.format(
           "Unable to process event %d of type %s. Event processing will be 
stopped.",
diff --git a/fe/src/main/java/org/apache/impala/util/DebugUtils.java 
b/fe/src/main/java/org/apache/impala/util/DebugUtils.java
index fa85399ac..9bb77f54b 100644
--- a/fe/src/main/java/org/apache/impala/util/DebugUtils.java
+++ b/fe/src/main/java/org/apache/impala/util/DebugUtils.java
@@ -71,6 +71,9 @@ public class DebugUtils {
   public static final String GET_FILTERED_EVENTS_DELAY =
       "catalogd_get_filtered_events_delay";
 
+  // debug action label to inject a delay in processing each HMS event
+  public static final String EVENT_PROCESSING_DELAY = 
"catalogd_event_processing_delay";
+
   // debug action label for introducing delay in loading table metadata.
   public static final String LOAD_TABLES_DELAY = "impalad_load_tables_delay";
 
diff --git a/tests/custom_cluster/test_web_pages.py 
b/tests/custom_cluster/test_web_pages.py
index 9e521226e..c53bfbe09 100644
--- a/tests/custom_cluster/test_web_pages.py
+++ b/tests/custom_cluster/test_web_pages.py
@@ -28,6 +28,7 @@ from tests.beeswax.impala_beeswax import 
ImpalaBeeswaxException
 from tests.common.custom_cluster_test_suite import (
   DEFAULT_CLUSTER_SIZE,
   CustomClusterTestSuite)
+from tests.common.skip import SkipIfFS
 from tests.shell.util import run_impala_shell_cmd
 
 SMALL_QUERY_LOG_SIZE_IN_BYTES = 40 * 1024
@@ -436,3 +437,68 @@ class TestWebPage(CustomClusterTestSuite):
     
self.cluster.statestored.service.wait_for_live_subscribers(DEFAULT_CLUSTER_SIZE)
     # Verify the topic metrics again
     self._verify_topic_size_metrics()
+
+  @SkipIfFS.hive
+  @CustomClusterTestSuite.with_args(
+    catalogd_args="--hms_event_polling_interval_s=1 "
+                  "--debug_actions=catalogd_event_processing_delay:SLEEP@2000")
+  def test_event_processor_status(self, unique_database):
+    """Verify the /events page by using a long delay in event processing."""
+    self.execute_query("create table {}.part (i int) partitioned by (p 
int)".format(
+        unique_database))
+    insert_stmt = "insert into {}.part partition(p) select id, month from "\
+        "functional.alltypes".format(unique_database)
+    self.execute_query(insert_stmt)
+    # Run the same INSERT statement in Hive to get non-self events.
+    self.run_stmt_in_hive("set hive.exec.dynamic.partition.mode=nonstrict;" + 
insert_stmt)
+    page = requests.get("http://localhost:25020/events";).text
+    # Wait until the batched events are being processed
+    while "a batch of" not in page:
+      time.sleep(1)
+      page = requests.get("http://localhost:25020/events";).text
+    expected_lines = [
+      "Lag Info", "Lag time:", "Current Event Batch", "Metastore Event Batch:",
+      "Event ID starts from", "Event time starts from",
+      "Started processing the current batch at",
+      "Started processing the current event at",
+      "Current Metastore event being processed",
+      "(a batch of ", " events on the same table)",
+    ]
+    for expected in expected_lines:
+      assert expected in page, "Missing '%s' in events page:\n%s" % (expected, 
page)
+
+  @SkipIfFS.hive
+  @CustomClusterTestSuite.with_args(
+    catalogd_args="--hms_event_polling_interval_s=1 "
+                  "--invalidate_metadata_on_event_processing_failure=false "
+                  "--inject_process_event_failure_event_types=CREATE_TABLE")
+  def test_event_processor_error_message(self, unique_database):
+    """Verify the /events page show the error of event processing"""
+    self.run_stmt_in_hive("create table {}.tbl(i int)".format(unique_database))
+    # Wait enough time for the event to be processed
+    time.sleep(2)
+    page = requests.get("http://localhost:25020/events";).text
+    expected_lines = [
+      "Unexpected exception received while processing event",
+      "Event id:", "Event Type: CREATE_TABLE", "Event message:",
+    ]
+    for expected in expected_lines:
+      assert expected in page, "Missing '%s' in events page:\n%s" % (expected, 
page)
+
+    # Verify the latest event id still get updated
+    json_res = 
json.loads(requests.get("http://localhost:25020/events?json";).text)
+    old_latest_event_id = json_res["progress-info"]["latest_event_id"]
+    # Generate new events
+    self.run_stmt_in_hive("create table {}.tbl2(i 
int)".format(unique_database))
+    # Wait enough time for the event to be polled
+    time.sleep(2)
+    json_res = 
json.loads(requests.get("http://localhost:25020/events?json";).text)
+    new_latest_event_id = json_res["progress-info"]["latest_event_id"]
+    assert new_latest_event_id > old_latest_event_id
+    # Current event (the failed one) should not be cleared
+    assert "current_event" in json_res["progress-info"]
+
+    # Verify the error message disappears after a global INVALIDATE METADATA
+    self.execute_query("invalidate metadata")
+    page = requests.get("http://localhost:25020/events";).text
+    assert "Unexpected exception" not in page, "Still see error message:\n" + 
page
diff --git a/www/events.tmpl b/www/events.tmpl
index cdc16cae3..8a807185d 100644
--- a/www/events.tmpl
+++ b/www/events.tmpl
@@ -19,12 +19,71 @@ under the License.
 {{> www/common-header.tmpl }}
 {{^error}}
 
-<h3>Event Processor Summary</h3>
-<pre>{{event_processor_metrics}}</pre>
-
 {{?event_processor_error_msg}}
 <h3>Error Message</h3>
 <pre>{{event_processor_error_msg}}</pre>
 {{/event_processor_error_msg}}
 
+<h3>Lag Info:</h3>
+Lag time: {{progress-info.lag_time}}
+<table class="table table-hover table-bordered">
+  <tr>
+    <th>
+    <th>Event ID</th>
+    <th>Event Timestamp (s)</th>
+    <th>Event Time</th>
+  </tr>
+  <tr>
+    <td>Last Synced Event</td>
+    <td>{{progress-info.last_synced_event_id}}</td>
+    <td>{{progress-info.last_synced_event_time_s}}</td>
+    <td>{{progress-info.last_synced_event_time}}</td>
+  </tr>
+  <tr>
+    <td>Latest Event in Metastore</td>
+    <td>{{progress-info.latest_event_id}}</td>
+    <td>{{progress-info.latest_event_time_s}}</td>
+    <td>{{progress-info.latest_event_time}}</td>
+  </tr>
+</table>
+
+{{?progress-info.num_hms_events}}
+<h3>Current Event Batch</h3>
+<p>
+Metastore Event Batch: {{progress-info.num_hms_events}} events.</br>
+Event ID starts from {{progress-info.min_event_id}} to 
{{progress-info.max_event_id}}.</br>
+Event time starts from {{progress-info.min_event_time}} to 
{{progress-info.max_event_time}}.</br>
+After batching and filtering, there are {{progress-info.num_filtered_events}} 
events to be processed.
+{{progress-info.num_synced_events}} ({{progress-info.synced_percent}}%) have 
been processed.</br>
+Started processing the current batch at {{progress-info.start_time}} 
({{progress-info.elapsed_time}} ago).</br>
+Started processing the current event at {{progress-info.start_time_of_event}} 
({{progress-info.elapsed_time_current_event}} ago).
+</p>
+<p>Current Metastore event being processed
+{{?progress-info.current_event_batch_size}}
+(a batch of {{progress-info.current_event_batch_size}} events on the same 
table)
+{{/progress-info.current_event_batch_size}}
+:</p>
+<table class="table table-hover table-bordered">
+  <tr>
+    <th>Event ID</th>
+    <th>Event Time</th>
+    <th>Event Type</th>
+    <th>Catalog Name</th>
+    <th>Database Name</th>
+    <th>Table Name</th>
+  </tr>
+  <tr>
+    <td>{{progress-info.current_event.event_id}}</td>
+    <td>{{progress-info.current_event.event_time}}</td>
+    <td>{{progress-info.current_event.event_type}}</td>
+    <td>{{progress-info.current_event.cat_name}}</td>
+    <td>{{progress-info.current_event.db_name}}</td>
+    <td>{{progress-info.current_event.tbl_name}}</td>
+  </tr>
+</table>
+{{/progress-info.num_hms_events}}
+
+<h3>Event Processor Summary</h3>
+<pre>{{event_processor_metrics}}</pre>
+
 {{> www/common-footer.tmpl }}

Reply via email to