This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 1cf8f5065acb79e4377492c6e67c312249e42a78 Author: stiga-huang <[email protected]> AuthorDate: Tue May 23 16:50:58 2023 +0800 IMPALA-12053: Expose event-processor error message in WebUI When the event-processor goes into the ERROR/NEEDS_INVALIDATE state, we can only check logs to get the detailed information. This is inconvenient in triaging failures. This patch exposes the error message in the /events WebUI. It includes the timestamp string and the stacktrace of the exception. This patch makes the /events page visable. Also modifies the test code of EventProcessorUtils.wait_for_synced_event_id() to print the error message if the event processor is down. A trivial bug of lastProcessedEvent is not updated (IMPALA-11588) is also fixed in this patch. Refactored the variable to be a member of the class so internal methods can update it before processing each event. Some new metrics are not added in the /events page, e.g. latest-event-id, latest-event-time-ms, last-synced-event-time-ms. This patch addes them and also add a metric of event-processing-delay-ms which is latest-event-time-ms minors last-synced-event-time-ms. Tests: - Manually inject codes to fail the event processor and verified the WebUI. - Ran metadata/test_event_processing.py when the event processor is in ERROR state. Verified the error message is shown up in test output. Change-Id: I077375422bc3d24eed57c95c6b05ac408228f083 Reviewed-on: http://gerrit.cloudera.org:8080/19916 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/catalog/catalog-server.cc | 8 ++- common/thrift/JniCatalog.thrift | 4 +- .../impala/catalog/events/MetastoreEvents.java | 2 +- .../catalog/events/MetastoreEventsProcessor.java | 58 +++++++++++++++++----- tests/util/event_processor_utils.py | 15 +++++- www/events.tmpl | 5 ++ 6 files changed, 75 insertions(+), 17 deletions(-) diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc index f2a5ffc30..cb31bc0d2 100644 --- a/be/src/catalog/catalog-server.cc +++ b/be/src/catalog/catalog-server.cc @@ -366,7 +366,7 @@ void CatalogServer::RegisterWebpages(Webserver* webserver, bool metrics_only) { false); webserver->RegisterUrlCallback(EVENT_WEB_PAGE, EVENT_METRICS_TEMPLATE, [this](const auto& args, auto* doc) { this->EventMetricsUrlCallback(args, doc); }, - false); + true); webserver->RegisterUrlCallback(CATALOG_OPERATIONS_WEB_PAGE, CATALOG_OPERATIONS_TEMPLATE, [this](const auto& args, auto* doc) { this->OperationUsageUrlCallback(args, doc); }, true); @@ -677,6 +677,12 @@ void CatalogServer::EventMetricsUrlCallback( event_processor_summary_response.summary.c_str(), document->GetAllocator()); document->AddMember( "event_processor_metrics", event_processor_summary, document->GetAllocator()); + if (event_processor_summary_response.__isset.error_msg) { + Value error_msg( + event_processor_summary_response.error_msg.c_str(), document->GetAllocator()); + document->AddMember( + "event_processor_error_msg", error_msg, document->GetAllocator()); + } } void CatalogServer::CatalogObjectsUrlCallback(const Webserver::WebRequest& req, diff --git a/common/thrift/JniCatalog.thrift b/common/thrift/JniCatalog.thrift index 56398eaf0..d8699ce0a 100755 --- a/common/thrift/JniCatalog.thrift +++ b/common/thrift/JniCatalog.thrift @@ -1025,4 +1025,6 @@ struct TEventProcessorMetricsSummaryResponse { // summary view of the events processor which can include status, // metrics and other details 1: required string summary -} \ No newline at end of file + // Error messages if the events processor goes into ERROR/NEEDS_INVALIDATE states + 2: optional string error_msg +} diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java index fabdfc37d..aae2122c4 100644 --- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java +++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java @@ -1156,7 +1156,7 @@ public class MetastoreEvents { // determined that the event needs to be processed instead of skipped, or we // somehow missed the previous create database event. throw new MetastoreNotificationException( - debugString("Unable to process event", e)); + debugString("Unable to process event"), e); } } diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java index 7c2d6d368..19b7ddc36 100644 --- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java +++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java @@ -23,6 +23,7 @@ import com.codahale.metrics.Timer; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -33,6 +34,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; +import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter; import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; @@ -232,6 +234,14 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor { public static final String STATUS_METRIC = "status"; // last synced event id public static final String LAST_SYNCED_ID_METRIC = "last-synced-event-id"; + // last synced event time + public static final String LAST_SYNCED_EVENT_TIME = "last-synced-event-time-ms"; + // latest event id in Hive metastore + public static final String LATEST_EVENT_ID = "latest-event-id"; + // event time of the latest event in Hive metastore + public static final String LATEST_EVENT_TIME = "latest-event-time-ms"; + // delay(ms) in events processing + public static final String EVENT_PROCESSING_DELAY = "event-processing-delay-ms"; // metric name for number of tables which are refreshed by event processor so far public static final String NUMBER_OF_TABLE_REFRESHES = "tables-refreshed"; // number of times events processor refreshed a partition @@ -489,9 +499,15 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor { // current status of this event processor private EventProcessorStatus eventProcessorStatus_ = EventProcessorStatus.STOPPED; + // error message when event processor comes into ERROR/NEEDS_INVALIDATE states + private String eventProcessorErrorMsg_ = null; + // event factory which is used to get or create MetastoreEvents private final MetastoreEventFactory metastoreEventFactory_; + // keeps track of the current event that we are processing + private NotificationEvent currentEvent_; + // keeps track of the last event id which we have synced to private final AtomicLong lastSyncedEventId_ = new AtomicLong(-1); private final AtomicLong lastSyncedEventTimeMs_ = new AtomicLong(0); @@ -588,10 +604,13 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor { metrics_.addTimer(EVENTS_PROCESS_DURATION_METRIC); metrics_.addMeter(EVENTS_RECEIVED_METRIC); metrics_.addCounter(EVENTS_SKIPPED_METRIC); - metrics_.addGauge(STATUS_METRIC, - (Gauge<String>) () -> getStatus().toString()); - metrics_.addGauge(LAST_SYNCED_ID_METRIC, - (Gauge<Long>) () -> lastSyncedEventId_.get()); + metrics_.addGauge(STATUS_METRIC, (Gauge<String>) () -> getStatus().toString()); + metrics_.addGauge(LAST_SYNCED_ID_METRIC, (Gauge<Long>) lastSyncedEventId_::get); + metrics_.addGauge(LAST_SYNCED_EVENT_TIME, (Gauge<Long>) lastSyncedEventTimeMs_::get); + metrics_.addGauge(LATEST_EVENT_ID, (Gauge<Long>) latestEventId_::get); + metrics_.addGauge(LATEST_EVENT_TIME, (Gauge<Long>) latestEventTimeMs_::get); + metrics_.addGauge(EVENT_PROCESSING_DELAY, + (Gauge<Long>) () -> latestEventTimeMs_.get() - lastSyncedEventTimeMs_.get()); metrics_.addCounter(NUMBER_OF_TABLE_REFRESHES); metrics_.addCounter(NUMBER_OF_PARTITION_REFRESHES); metrics_.addCounter(NUMBER_OF_TABLES_ADDED); @@ -837,7 +856,7 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor { */ @Override public void processEvents() { - NotificationEvent lastProcessedEvent = null; + currentEvent_ = null; try { EventProcessorStatus currentStatus = eventProcessorStatus_; if (currentStatus != EventProcessorStatus.ACTIVE) { @@ -856,14 +875,20 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor { "may be unavailable. Will retry.", ex); } catch(MetastoreNotificationNeedsInvalidateException ex) { updateStatus(EventProcessorStatus.NEEDS_INVALIDATE); - LOG.error("Event processing needs a invalidate command to resolve the state", ex); + String msg = "Event processing needs a invalidate command to resolve the state"; + LOG.error(msg, ex); + eventProcessorErrorMsg_ = LocalDateTime.now().toString() + '\n' + msg + '\n' + + ExceptionUtils.getFullStackTrace(ex); } catch (Exception ex) { // There are lot of Preconditions which can throw RuntimeExceptions when we // process events this catch all exception block is needed so that the scheduler // thread does not die silently updateStatus(EventProcessorStatus.ERROR); - LOG.error("Unexpected exception received while processing event", ex); - dumpEventInfoToLog(lastProcessedEvent); + String msg = "Unexpected exception received while processing event"; + LOG.error(msg, ex); + eventProcessorErrorMsg_ = LocalDateTime.now().toString() + '\n' + msg + '\n' + + ExceptionUtils.getFullStackTrace(ex); + dumpEventInfoToLog(currentEvent_); } } @@ -973,6 +998,9 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor { TEventProcessorMetricsSummaryResponse summaryResponse = new TEventProcessorMetricsSummaryResponse(); summaryResponse.setSummary(metrics_.toString()); + if (eventProcessorErrorMsg_ != null) { + summaryResponse.setError_msg(eventProcessorErrorMsg_); + } return summaryResponse; } @@ -988,7 +1016,7 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor { @VisibleForTesting protected void processEvents(List<NotificationEvent> events) throws MetastoreNotificationException { - NotificationEvent lastProcessedEvent = null; + currentEvent_ = null; // update the events received metric before returning metrics_.getMeter(EVENTS_RECEIVED_METRIC).mark(events.size()); if (events.isEmpty()) return; @@ -1009,7 +1037,7 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor { if (eventProcessorStatus_ != EventProcessorStatus.ACTIVE) { break; } - lastProcessedEvent = event.metastoreNotificationEvent_; + currentEvent_ = event.metastoreNotificationEvent_; event.processIfEnabled(); deleteEventLog_.garbageCollect(event.getEventId()); lastSyncedEventId_.set(event.getEventId()); @@ -1019,7 +1047,7 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor { } catch (CatalogException e) { throw new MetastoreNotificationException(String.format( "Unable to process event %d of type %s. Event processing will be stopped.", - lastProcessedEvent.getEventId(), lastProcessedEvent.getEventType()), e); + currentEvent_.getEventId(), currentEvent_.getEventType()), e); } finally { long elapsed_ns = context.stop(); lastEventProcessDurationNs_.set(elapsed_ns); @@ -1037,7 +1065,9 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor { private void dumpEventInfoToLog(NotificationEvent event) { if (event == null) { - LOG.error("Notification event is null"); + String error = "Notification event is null"; + LOG.error(error); + eventProcessorErrorMsg_ += '\n' + error; return; } StringBuilder msg = @@ -1049,7 +1079,9 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor { msg.append("Table name: ").append(event.getTableName()).append("\n"); } msg.append("Event message: ").append(event.getMessage()).append("\n"); - LOG.error(msg.toString()); + String msgStr = msg.toString(); + LOG.error(msgStr); + eventProcessorErrorMsg_ += '\n' + msgStr; } /** diff --git a/tests/util/event_processor_utils.py b/tests/util/event_processor_utils.py index 08223f9e8..d485a0432 100644 --- a/tests/util/event_processor_utils.py +++ b/tests/util/event_processor_utils.py @@ -60,7 +60,9 @@ class EventProcessorUtils(object): break status = EventProcessorUtils.get_event_processor_status() if status not in ["ACTIVE", "PAUSED"]: - raise Exception("Event processor is not working. Status: {0}".format(status)) + error_msg = EventProcessorUtils.get_event_processor_error_msg() + raise Exception("Event processor is not working. Status: {0}. Error msg: {1}" + .format(status, error_msg)) made_progress = current_synced_id > last_synced_id if t >= end_time: raise Exception( @@ -115,6 +117,17 @@ class EventProcessorUtils(object): pairs = [strip_pair(kv.split(':')) for kv in metrics if kv] return dict(pairs) + @staticmethod + def get_event_processor_error_msg(): + """Scrapes the catalog's /events webpage and return the error message (if exists) of + the event processor""" + response = requests.get("%s/events?json" % EventProcessorUtils.DEFAULT_CATALOG_URL) + assert response.status_code == requests.codes.ok + res_json = json.loads(response.text) + if "event_processor_error_msg" in res_json: + return res_json["event_processor_error_msg"].strip() + return None + @staticmethod def get_int_metric(metric_key, default_val=None): """Returns the int value of event processor metric from the /events catalogd debug diff --git a/www/events.tmpl b/www/events.tmpl index 484cfeae2..cdc16cae3 100644 --- a/www/events.tmpl +++ b/www/events.tmpl @@ -22,4 +22,9 @@ under the License. <h3>Event Processor Summary</h3> <pre>{{event_processor_metrics}}</pre> +{{?event_processor_error_msg}} +<h3>Error Message</h3> +<pre>{{event_processor_error_msg}}</pre> +{{/event_processor_error_msg}} + {{> www/common-footer.tmpl }}
