This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new dc785d110 [GOBBLIN-1937] Quantify Missed Work Completed by Reminders 
(#3808)
dc785d110 is described below

commit dc785d1101a01d8b4c55ac913ac5cd63b71ede6b
Author: umustafi <[email protected]>
AuthorDate: Fri Oct 27 13:04:16 2023 -0700

    [GOBBLIN-1937] Quantify Missed Work Completed by Reminders (#3808)
    
    * Quantify Missed Work Completed by Reminders
       Also fix bug to filter out heartbeat events before extracting field
    
    * Refactor changeMonitorUtils & add delimiter to metrics prefix
    
    * Re-order params to group similar ones
    
    ---------
    
    Co-authored-by: Urmi Mustafi <[email protected]>
---
 .../apache/gobblin/metrics/ServiceMetricNames.java | 48 ++++++++--------
 .../gobblin/runtime/metrics/RuntimeMetrics.java    | 65 +++++++++++-----------
 .../service/modules/orchestration/DagManager.java  |  2 +-
 .../modules/orchestration/FlowTriggerHandler.java  |  5 ++
 .../service/monitoring/ChangeMonitorUtils.java     | 16 ++++--
 .../monitoring/DagActionStoreChangeMonitor.java    | 31 ++++++-----
 .../service/monitoring/SpecStoreChangeMonitor.java |  8 ++-
 7 files changed, 98 insertions(+), 77 deletions(-)

diff --git 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
index aff097bbd..5550513c4 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
@@ -20,37 +20,39 @@ public class ServiceMetricNames {
   // These prefixes can be used to distinguish metrics reported by 
GobblinService from other metrics reported by Gobblin
   // This can be used in conjunction with MetricNameRegexFilter to filter out 
metrics in any MetricReporter
   public static final String GOBBLIN_SERVICE_PREFIX = "GobblinService";
+  public static final String GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER = 
GOBBLIN_SERVICE_PREFIX + ".";
   public static final String GOBBLIN_JOB_METRICS_PREFIX = "JobMetrics";
 
   // Flow Compilation Meters and Timer
-  public static final String FLOW_COMPILATION_SUCCESSFUL_METER = 
GOBBLIN_SERVICE_PREFIX + "flowCompilation.successful";
-  public static final String FLOW_COMPILATION_FAILED_METER = 
GOBBLIN_SERVICE_PREFIX + "flowCompilation.failed";
-  public static final String FLOW_COMPILATION_TIMER = GOBBLIN_SERVICE_PREFIX + 
"flowCompilation.time";
-  public static final String DATA_AUTHORIZATION_TIMER = GOBBLIN_SERVICE_PREFIX 
+ "flowCompilation.dataAuthorization.time";
+  public static final String FLOW_COMPILATION_SUCCESSFUL_METER = 
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "flowCompilation.successful";
+  public static final String FLOW_COMPILATION_FAILED_METER = 
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "flowCompilation.failed";
+  public static final String FLOW_COMPILATION_TIMER = 
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "flowCompilation.time";
+  public static final String DATA_AUTHORIZATION_TIMER = 
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + 
"flowCompilation.dataAuthorization.time";
 
   // Flow Orchestration Meters and Timer
-  public static final String FLOW_ORCHESTRATION_SUCCESSFUL_METER = 
GOBBLIN_SERVICE_PREFIX + ".flowOrchestration.successful";
-  public static final String FLOW_ORCHESTRATION_FAILED_METER = 
GOBBLIN_SERVICE_PREFIX + ".flowOrchestration.failed";
-  public static final String FLOW_ORCHESTRATION_TIMER = GOBBLIN_SERVICE_PREFIX 
+ ".flowOrchestration.time";
-  public static final String FLOW_ORCHESTRATION_DELAY = GOBBLIN_SERVICE_PREFIX 
+ ".flowOrchestration.delay";
+  public static final String FLOW_ORCHESTRATION_SUCCESSFUL_METER = 
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "flowOrchestration.successful";
+  public static final String FLOW_ORCHESTRATION_FAILED_METER = 
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "flowOrchestration.failed";
+  public static final String FLOW_ORCHESTRATION_TIMER = 
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "flowOrchestration.time";
+  public static final String FLOW_ORCHESTRATION_DELAY = 
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "flowOrchestration.delay";
 
   // Flow Trigger Handler
-  public static final String FLOW_TRIGGER_HANDLER_PREFIX = 
"flowTriggerHandler";
-  public static final String GOBBLIN_FLOW_TRIGGER_HANDLER_NUM_FLOWS_SUBMITTED 
= ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "." + FLOW_TRIGGER_HANDLER_PREFIX 
+ ".numFlowsSubmitted";
-  public static final String FLOW_TRIGGER_HANDLER_LEASE_OBTAINED_COUNT = 
GOBBLIN_SERVICE_PREFIX + "." + FLOW_TRIGGER_HANDLER_PREFIX + ".leaseObtained";
-  public static final String FLOW_TRIGGER_HANDLER_LEASED_TO_ANOTHER_COUNT = 
GOBBLIN_SERVICE_PREFIX + "." + FLOW_TRIGGER_HANDLER_PREFIX + ".leasedToAnother";
-  public static final String FLOW_TRIGGER_HANDLER_NO_LONGER_LEASING_COUNT = 
GOBBLIN_SERVICE_PREFIX + "." + FLOW_TRIGGER_HANDLER_PREFIX + ".noLongerLeasing";
-  public static final String FLOW_TRIGGER_HANDLER_JOB_DOES_NOT_EXIST_COUNT = 
GOBBLIN_SERVICE_PREFIX + "." + FLOW_TRIGGER_HANDLER_PREFIX + 
".jobDoesNotExistInScheduler";
-  public static final String FLOW_TRIGGER_HANDLER_FAILED_TO_SET_REMINDER_COUNT 
= GOBBLIN_SERVICE_PREFIX + "." + FLOW_TRIGGER_HANDLER_PREFIX + 
".failedToSetReminderCount";
+  public static final String FLOW_TRIGGER_HANDLER_PREFIX = 
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "flowTriggerHandler";
+  public static final String GOBBLIN_FLOW_TRIGGER_HANDLER_NUM_FLOWS_SUBMITTED 
= FLOW_TRIGGER_HANDLER_PREFIX + ".numFlowsSubmitted";
+  public static final String FLOW_TRIGGER_HANDLER_LEASE_OBTAINED_COUNT = 
FLOW_TRIGGER_HANDLER_PREFIX + ".leaseObtained";
+  public static final String FLOW_TRIGGER_HANDLER_LEASED_TO_ANOTHER_COUNT = 
FLOW_TRIGGER_HANDLER_PREFIX + ".leasedToAnother";
+  public static final String FLOW_TRIGGER_HANDLER_NO_LONGER_LEASING_COUNT = 
FLOW_TRIGGER_HANDLER_PREFIX + ".noLongerLeasing";
+  public static final String FLOW_TRIGGER_HANDLER_JOB_DOES_NOT_EXIST_COUNT = 
FLOW_TRIGGER_HANDLER_PREFIX + ".jobDoesNotExistInScheduler";
+  public static final String FLOW_TRIGGER_HANDLER_FAILED_TO_SET_REMINDER_COUNT 
= FLOW_TRIGGER_HANDLER_PREFIX + ".failedToSetReminderCount";
+  public static final String 
FLOW_TRIGGER_HANDLER_LEASES_OBTAINED_DUE_TO_REMINDER_COUNT = 
FLOW_TRIGGER_HANDLER_PREFIX + ".leasesObtainedDueToReminderCount";
 
   // DagManager Related Metrics
-  public static final String DAG_MANAGER_PREFIX = GOBBLIN_SERVICE_PREFIX + 
".dagManager";
+  public static final String DAG_MANAGER_PREFIX = 
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "dagManager";
   public static final String
       DAG_MANAGER_FAILED_LAUNCH_EVENTS_ON_STARTUP_COUNT = DAG_MANAGER_PREFIX + 
".failedLaunchEventsOnStartupCount";
   public static final String FLOW_FAILED_FORWARD_TO_DAG_MANAGER_COUNT = 
DAG_MANAGER_PREFIX + ".flowFailedForwardToDagManagerCount";
 
   //Job status poll timer
-  public static final String JOB_STATUS_POLLED_TIMER = GOBBLIN_SERVICE_PREFIX 
+ ".jobStatusPoll.time";
+  public static final String JOB_STATUS_POLLED_TIMER = 
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "jobStatusPoll.time";
 
   public static final String CREATE_FLOW_METER = "CreateFlow";
   public static final String DELETE_FLOW_METER = "DeleteFlow";
@@ -59,9 +61,9 @@ public class ServiceMetricNames {
   public static final String START_SLA_EXCEEDED_FLOWS_METER = 
"StartSLAExceededFlows";
   public static final String SLA_EXCEEDED_FLOWS_METER = "SlaExceededFlows";
   public static final String FAILED_FLOW_METER = "FailedFlows";
-  public static final String SCHEDULED_FLOW_METER = GOBBLIN_SERVICE_PREFIX + 
".ScheduledFlows";
-  public static final String NON_SCHEDULED_FLOW_METER = GOBBLIN_SERVICE_PREFIX 
+ ".NonScheduledFlows";
-  public static final String SKIPPED_FLOWS = GOBBLIN_SERVICE_PREFIX + 
".SkippedFlows";
+  public static final String SCHEDULED_FLOW_METER = 
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "ScheduledFlows";
+  public static final String NON_SCHEDULED_FLOW_METER = 
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "NonScheduledFlows";
+  public static final String SKIPPED_FLOWS = 
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "SkippedFlows";
   public static final String RUNNING_FLOWS_COUNTER = "RunningFlows";
   public static final String SERVICE_USERS = "ServiceUsers";
   public static final String COMPILED = "Compiled";
@@ -70,9 +72,9 @@ public class ServiceMetricNames {
 
   public static final String HELIX_LEADER_STATE = "HelixLeaderState";
 
-  public static final String FLOWGRAPH_UPDATE_FAILED_METER = 
GOBBLIN_SERVICE_PREFIX + ".FlowgraphUpdateFailed";
+  public static final String FLOWGRAPH_UPDATE_FAILED_METER = 
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "FlowgraphUpdateFailed";
 
-  public static final String DAG_COUNT_MYSQL_DAG_STATE_COUNT = 
GOBBLIN_SERVICE_PREFIX + ".MysqlDagStateStore" + ".totalDagCount";
+  public static final String DAG_COUNT_MYSQL_DAG_STATE_COUNT = 
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "MysqlDagStateStore" + ".totalDagCount";
 
-  public static final String DAG_COUNT_FS_DAG_STATE_COUNT = 
GOBBLIN_SERVICE_PREFIX + ".FsDagStateStore" + ".totalDagCount";
+  public static final String DAG_COUNT_FS_DAG_STATE_COUNT = 
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "FsDagStateStore" + ".totalDagCount";
 }
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
index 86adb4671..de5cb2e99 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
@@ -37,47 +37,50 @@ public class RuntimeMetrics {
   public static final String GOBBLIN_JOB_MONITOR_SLAEVENT_REJECTEDEVENTS = 
"gobblin.jobMonitor.slaevent.rejectedevents";
   public static final String GOBBLIN_JOB_MONITOR_KAFKA_MESSAGE_PARSE_FAILURES =
       "gobblin.jobMonitor.kafka.messageParseFailures";
-  public static final String 
GOBBLIN_SPEC_STORE_MONITOR_SUCCESSFULLY_ADDED_SPECS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".specStoreMonitor.successful.added.specs";
-  public static final String GOBBLIN_SPEC_STORE_MONITOR_FAILED_ADDED_SPECS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".specStoreMonitor.failed.added.specs";
-  public static final String GOBBLIN_SPEC_STORE_MONITOR_DELETED_SPECS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".specStoreMonitor.deleted.specs";
-  public static final String GOBBLIN_SPEC_STORE_MONITOR_UNEXPECTED_ERRORS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".specStoreMonitor.unexpected.errors";
-  public static final String GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED= 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".specStoreMonitor.message.processed";
-  public static final String 
GOBBLIN_SPEC_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS =
-      ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".specstoreMonitor.produce.to.consume.delay";
-  public static final String DAG_ACTION_STORE_MONITOR_PREFIX = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor";
-  public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED = 
DAG_ACTION_STORE_MONITOR_PREFIX + ".kills.invoked";
-  public static final String 
GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED = 
DAG_ACTION_STORE_MONITOR_PREFIX + ".message.processed";
-  public static final String 
GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGES_FILTERED_OUT = 
DAG_ACTION_STORE_MONITOR_PREFIX + ".messagesFilteredOut";
-  public static final String 
GOBBLIN_DAG_ACTION_STORE_MONITOR_MALFORMED_MESSAGES_SKIPPED = 
DAG_ACTION_STORE_MONITOR_PREFIX + ".malformedMessagedSkipped";
-  public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED 
= DAG_ACTION_STORE_MONITOR_PREFIX + ".resumes.invoked";
-  public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_FLOWS_LAUNCHED = 
DAG_ACTION_STORE_MONITOR_PREFIX + ".flows.launched";
+  public static final String SPEC_STORE_MONITOR_PREFIX = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "specStoreMonitor.";
+  public static final String 
GOBBLIN_SPEC_STORE_MONITOR_SUCCESSFULLY_ADDED_SPECS = SPEC_STORE_MONITOR_PREFIX 
+ "successful.added.specs";
+  public static final String GOBBLIN_SPEC_STORE_MONITOR_FAILED_ADDED_SPECS = 
SPEC_STORE_MONITOR_PREFIX + "failed.added.specs";
+  public static final String GOBBLIN_SPEC_STORE_MONITOR_DELETED_SPECS = 
SPEC_STORE_MONITOR_PREFIX + "deleted.specs";
+  public static final String GOBBLIN_SPEC_STORE_MONITOR_UNEXPECTED_ERRORS = 
SPEC_STORE_MONITOR_PREFIX + "unexpected.errors";
+  public static final String GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED = 
SPEC_STORE_MONITOR_PREFIX + "message.processed";
+  public static final String GOBBLIN_SPEC_STORE_DUPLICATE_MESSAGES = 
SPEC_STORE_MONITOR_PREFIX + "duplicateMessages";
+  public static final String GOBBLIN_SPEC_STORE_HEARTBEAT_MESSAGES = 
SPEC_STORE_MONITOR_PREFIX + "heartbeatMessages";
+  public static final String 
GOBBLIN_SPEC_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS = SPEC_STORE_MONITOR_PREFIX 
+ "produce.to.consume.delay";
+  public static final String DAG_ACTION_STORE_MONITOR_PREFIX = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + 
"dagActionStoreMonitor.";
+  public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED = 
DAG_ACTION_STORE_MONITOR_PREFIX + "kills.invoked";
+  public static final String 
GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED = 
DAG_ACTION_STORE_MONITOR_PREFIX + "message.processed";
+  public static final String 
GOBBLIN_DAG_ACTION_STORE_MONITOR_DUPLICATE_MESSAGES = 
DAG_ACTION_STORE_MONITOR_PREFIX + "duplicateMessages";
+  public static final String 
GOBBLIN_DAG_ACTION_STORE_MONITOR_HEARTBEAT_MESSAGES = 
DAG_ACTION_STORE_MONITOR_PREFIX + "heartbeatMessages";
+  public static final String 
GOBBLIN_DAG_ACTION_STORE_MONITOR_NULL_DAG_ACTION_TYPE_MESSAGES = 
DAG_ACTION_STORE_MONITOR_PREFIX + "nullDagActionTypeMessages";
+  public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED 
= DAG_ACTION_STORE_MONITOR_PREFIX + "resumes.invoked";
+  public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_FLOWS_LAUNCHED = 
DAG_ACTION_STORE_MONITOR_PREFIX + "flows.launched";
 
-  public static final String 
GOBBLIN_DAG_ACTION_STORE_FAILED_FLOW_LAUNCHED_SUBMISSIONS = 
DAG_ACTION_STORE_MONITOR_PREFIX + ".failedFlowLaunchSubmissions";
-  public static final String 
GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS = 
DAG_ACTION_STORE_MONITOR_PREFIX + ".unexpected.errors";
+  public static final String 
GOBBLIN_DAG_ACTION_STORE_FAILED_FLOW_LAUNCHED_SUBMISSIONS = 
DAG_ACTION_STORE_MONITOR_PREFIX + "failedFlowLaunchSubmissions";
+  public static final String 
GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS = 
DAG_ACTION_STORE_MONITOR_PREFIX + "unexpected.errors";
   public static final String
-      GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS = 
DAG_ACTION_STORE_MONITOR_PREFIX + ".produce.to.consume.delay";
-  public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_UNEXPECTED_ERRORS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
"gobblin.mysql.quota.manager.unexpected.errors";
-  public static final String 
GOBBLIN_MYSQL_QUOTA_MANAGER_QUOTA_REQUESTS_EXCEEDED = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
"gobblin.mysql.quota.manager.quotaRequests.exceeded";
-  public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_TIME_TO_CHECK_QUOTA = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
"gobblin.mysql.quota.manager.time.to.check.quota";
+      GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS = 
DAG_ACTION_STORE_MONITOR_PREFIX + "produce.to.consume.delay";
+  public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_UNEXPECTED_ERRORS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + 
"gobblin.mysql.quota.manager.unexpected.errors";
+  public static final String 
GOBBLIN_MYSQL_QUOTA_MANAGER_QUOTA_REQUESTS_EXCEEDED = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + 
"gobblin.mysql.quota.manager.quotaRequests.exceeded";
+  public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_TIME_TO_CHECK_QUOTA = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + 
"gobblin.mysql.quota.manager.time.to.check.quota";
 
   // The following metrics are used to identify the bottlenecks for 
initializing the job scheduler
   public static final String
-      GOBBLIN_JOB_SCHEDULER_GET_SPECS_DURING_STARTUP_PER_SPEC_RATE_NANOS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".jobScheduler.getSpecsDuringStartupPerSpecRateNanos";
-  public static final String GOBBLIN_JOB_SCHEDULER_LOAD_SPECS_BATCH_SIZE = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".jobScheduler.loadSpecBatchSize";
+      GOBBLIN_JOB_SCHEDULER_GET_SPECS_DURING_STARTUP_PER_SPEC_RATE_NANOS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + 
"jobScheduler.getSpecsDuringStartupPerSpecRateNanos";
+  public static final String GOBBLIN_JOB_SCHEDULER_LOAD_SPECS_BATCH_SIZE = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + 
"jobScheduler.loadSpecBatchSize";
   public static final String
-      GOBBLIN_JOB_SCHEDULER_TIME_TO_INITIALIZE_SCHEDULER_NANOS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".jobScheduler.timeToInitializeSchedulerNanos";
+      GOBBLIN_JOB_SCHEDULER_TIME_TO_INITIALIZE_SCHEDULER_NANOS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + 
"jobScheduler.timeToInitializeSchedulerNanos";
   public static final String
-      GOBBLIN_JOB_SCHEDULER_TIME_TO_OBTAIN_SPEC_URIS_NANOS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".jobScheduler.timeToObtainSpecUrisNanos";
+      GOBBLIN_JOB_SCHEDULER_TIME_TO_OBTAIN_SPEC_URIS_NANOS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + 
"jobScheduler.timeToObtainSpecUrisNanos";
   public static final String
-      GOBBLIN_JOB_SCHEDULER_INDIVIDUAL_GET_SPEC_SPEED_NANOS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".jobScheduler.individualGetSpecSpeedNanos";
+      GOBBLIN_JOB_SCHEDULER_INDIVIDUAL_GET_SPEC_SPEED_NANOS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + 
"jobScheduler.individualGetSpecSpeedNanos";
   public static final String
-      GOBBLIN_JOB_SCHEDULER_EACH_COMPLETE_ADD_SPEC_NANOS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".jobScheduler.eachCompleteAddSpecNanos";
+      GOBBLIN_JOB_SCHEDULER_EACH_COMPLETE_ADD_SPEC_NANOS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + 
"jobScheduler.eachCompleteAddSpecNanos";
   public static final String
-      GOBBLIN_JOB_SCHEDULER_EACH_SPEC_COMPILATION_NANOS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".jobScheduler.eachSpecCompilationNanos";
-  public static final String GOBBLIN_JOB_SCHEDULER_EACH_SCHEDULE_JOB_NANOS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".jobScheduler.eachScheduleJobNanos";
-  public static final String GOBBLIN_JOB_SCHEDULER_TOTAL_GET_SPEC_TIME_NANOS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".jobScheduler.totalGetSpecTimeNanos";
-  public static final String GOBBLIN_JOB_SCHEDULER_TOTAL_ADD_SPEC_TIME_NANOS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".jobScheduler.totalAddSpecTimeNanos";
-  public static final String 
GOBBLIN_JOB_SCHEDULER_NUM_JOBS_SCHEDULED_DURING_STARTUP = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".jobScheduler.numJobsScheduledDuringStartup";
+      GOBBLIN_JOB_SCHEDULER_EACH_SPEC_COMPILATION_NANOS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + 
"jobScheduler.eachSpecCompilationNanos";
+  public static final String GOBBLIN_JOB_SCHEDULER_EACH_SCHEDULE_JOB_NANOS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + 
"jobScheduler.eachScheduleJobNanos";
+  public static final String GOBBLIN_JOB_SCHEDULER_TOTAL_GET_SPEC_TIME_NANOS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + 
"jobScheduler.totalGetSpecTimeNanos";
+  public static final String GOBBLIN_JOB_SCHEDULER_TOTAL_ADD_SPEC_TIME_NANOS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + 
"jobScheduler.totalAddSpecTimeNanos";
+  public static final String 
GOBBLIN_JOB_SCHEDULER_NUM_JOBS_SCHEDULED_DURING_STARTUP = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + 
"jobScheduler.numJobsScheduledDuringStartup";
 
   // Metadata keys
   public static final String TOPIC = "topic";
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index b4ec9c0ce..c1553da39 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -138,7 +138,7 @@ public class DagManager extends AbstractIdleService {
   private static final long DAG_FLOW_STATUS_TOLERANCE_TIME_MILLIS = 
TimeUnit.MINUTES.toMillis(5);
   public static final String FAILED_DAG_POLLING_INTERVAL = 
FAILED_DAG_STATESTORE_PREFIX + ".retention.pollingIntervalMinutes";
   public static final Integer DEFAULT_FAILED_DAG_POLLING_INTERVAL = 60;
-  public static final String DAG_MANAGER_HEARTBEAT = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagManager.heartbeat-%s";
+  public static final String DAG_MANAGER_HEARTBEAT = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + 
"dagManager.heartbeat-%s";
   // Default job start SLA time if configured, measured in minutes. Default is 
10 minutes
   private static final String JOB_START_SLA_TIME = DAG_MANAGER_PREFIX + 
ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME;
   private static final String JOB_START_SLA_UNITS = DAG_MANAGER_PREFIX + 
ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT;
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
index c5a5bb8e0..af65390ec 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
@@ -79,6 +79,7 @@ public class FlowTriggerHandler {
   private ContextAwareCounter noLongerLeasingStatusCount;
   private ContextAwareCounter jobDoesNotExistInSchedulerCount;
   private ContextAwareCounter failedToSetEventReminderCount;
+  private ContextAwareMeter leasesObtainedDueToReminderCount;
 
   @Inject
   public FlowTriggerHandler(Config config, Optional<MultiActiveLeaseArbiter> 
leaseDeterminationStore,
@@ -96,6 +97,7 @@ public class FlowTriggerHandler {
     this.noLongerLeasingStatusCount = 
this.metricContext.contextAwareCounter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_NO_LONGER_LEASING_COUNT);
     this.jobDoesNotExistInSchedulerCount = 
this.metricContext.contextAwareCounter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_JOB_DOES_NOT_EXIST_COUNT);
     this.failedToSetEventReminderCount = 
this.metricContext.contextAwareCounter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_FAILED_TO_SET_REMINDER_COUNT);
+    this.leasesObtainedDueToReminderCount = 
this.metricContext.contextAwareMeter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_LEASES_OBTAINED_DUE_TO_REMINDER_COUNT);
   }
 
   /**
@@ -116,6 +118,9 @@ public class FlowTriggerHandler {
       // id. From this point onwards, always use the newer version of the flow 
action to easily track the action through
       // orchestration and execution.
       if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+        if (isReminderEvent) {
+          this.leasesObtainedDueToReminderCount.mark();
+        }
         MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus = 
(MultiActiveLeaseArbiter.LeaseObtainedStatus)
             leaseAttemptStatus;
         this.leaseObtainedCount.inc();
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/ChangeMonitorUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/ChangeMonitorUtils.java
index a2d68fbc0..9e121c8b7 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/ChangeMonitorUtils.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/ChangeMonitorUtils.java
@@ -20,6 +20,8 @@ package org.apache.gobblin.service.monitoring;
 import com.google.common.cache.LoadingCache;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+
 
 @Slf4j
 public final class ChangeMonitorUtils {
@@ -28,20 +30,24 @@ public final class ChangeMonitorUtils {
   }
 
   /**
-   * Performs checks for duplicate messages and heartbeat operation prior to 
processing a message. Returns true if
-   * the pre-conditions above don't apply and we should proceed processing the 
change event
+   * Performs checks for duplicate messages, heartbeat message types, or null 
dag action types all of which cannot or
+   * should not be processed. Returns true if the pre-conditions above don't 
apply, and we should proceed processing
+   * the change event
    */
-  public static boolean shouldProcessMessage(String changeIdentifier, 
LoadingCache<String, String> cache,
-      String operation, String timestamp) {
+  public static boolean isValidAndUniqueMessage(String changeIdentifier, 
String operation, String timestamp,
+      LoadingCache<String, String> cache, ContextAwareMeter 
duplicateMessagesMeter,
+      ContextAwareMeter heartbeatMessagesMeter) {
     // If we've already processed a message with this timestamp and key before 
then skip duplicate message
     if (cache.getIfPresent(changeIdentifier) != null) {
-      log.info("Duplicate change event with identifier {}", changeIdentifier);
+      log.debug("Duplicate change event with identifier {}", changeIdentifier);
+      duplicateMessagesMeter.mark();
       return false;
     }
 
     // If event is a heartbeat type then log it and skip processing
     if (operation.equals("HEARTBEAT")) {
       log.debug("Received heartbeat message from time {}", timestamp);
+      heartbeatMessagesMeter.mark();
       return false;
     }
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index b05e73105..1190a1e46 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -61,8 +61,9 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
   private ContextAwareMeter failedFlowLaunchSubmissions;
   private ContextAwareMeter unexpectedErrors;
   private ContextAwareMeter messageProcessedMeter;
-  private ContextAwareMeter messageFilteredOutMeter;
-  private ContextAwareMeter malformedMessagesSkippedMeter;
+  private ContextAwareMeter duplicateMessagesMeter;
+  private ContextAwareMeter heartbeatMessagesMeter;
+  private ContextAwareMeter nullDagActionTypeMessagesMeter;
   private ContextAwareGauge produceToConsumeDelayMillis; // Reports delay from 
all partitions in one gauge
 
   private volatile Long produceToConsumeDelayValue = -1L;
@@ -124,24 +125,23 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
     String flowName = value.getFlowName();
     String flowExecutionId = value.getFlowExecutionId();
 
-    if (value.getDagAction() == null) {
-      log.warn("Skipping null dag action type received for flow group: {} 
name: {} executionId: {} tid: {} operation: "
-          + "{}", flowGroup, flowName, flowExecutionId, tid, operation);
-      this.malformedMessagesSkippedMeter.mark();
-      return;
-    }
-    DagActionStore.FlowActionType dagActionType = 
DagActionStore.FlowActionType.valueOf(value.getDagAction().toString());
-
     produceToConsumeDelayValue = calcMillisSince(produceTimestamp);
     log.debug("Processing Dag Action message for flow group: {} name: {} 
executionId: {} tid: {} operation: {} lag: {}",
         flowGroup, flowName, flowExecutionId, tid, operation, 
produceToConsumeDelayValue);
 
     String changeIdentifier = tid + key;
-    if (!ChangeMonitorUtils.shouldProcessMessage(changeIdentifier, 
dagActionsSeenCache, operation,
-        produceTimestamp.toString())) {
-      this.messageFilteredOutMeter.mark();
+    if (!ChangeMonitorUtils.isValidAndUniqueMessage(changeIdentifier, 
operation, produceTimestamp.toString(),
+        dagActionsSeenCache, duplicateMessagesMeter, heartbeatMessagesMeter)) {
       return;
     }
+    // check after filtering out heartbeat messages expected to have 
`dagActionValue == null`
+    if (value.getDagAction() == null) {
+      log.warn("Skipping null dag action type received for identifier {} ", 
changeIdentifier);
+      nullDagActionTypeMessagesMeter.mark();
+      return;
+    }
+
+    DagActionStore.FlowActionType dagActionType = 
DagActionStore.FlowActionType.valueOf(value.getDagAction().toString());
 
     // Used to easily log information to identify the dag action
     DagActionStore.DagAction dagAction = new 
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
@@ -231,8 +231,9 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
     this.failedFlowLaunchSubmissions = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_FAILED_FLOW_LAUNCHED_SUBMISSIONS);
     this.unexpectedErrors = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS);
     this.messageProcessedMeter = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED);
-    this.messageFilteredOutMeter = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGES_FILTERED_OUT);
-    this.malformedMessagesSkippedMeter = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_MALFORMED_MESSAGES_SKIPPED);
+    this.duplicateMessagesMeter = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_DUPLICATE_MESSAGES);
+    this.heartbeatMessagesMeter = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_HEARTBEAT_MESSAGES);
+    this.nullDagActionTypeMessagesMeter = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_NULL_DAG_ACTION_TYPE_MESSAGES);
     this.produceToConsumeDelayMillis = 
this.getMetricContext().newContextAwareGauge(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS,
 () -> produceToConsumeDelayValue);
     this.getMetricContext().register(this.produceToConsumeDelayMillis);
   }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
index f029c08c6..8b197a352 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
@@ -59,6 +59,8 @@ public class SpecStoreChangeMonitor extends HighLevelConsumer 
{
   private ContextAwareMeter failedAddedSpecs;
   private ContextAwareMeter deletedSpecs;
   private ContextAwareMeter unexpectedErrors;
+  private ContextAwareMeter duplicateMessagesMeter;
+  private ContextAwareMeter heartbeatMessagesMeter;
   private ContextAwareGauge produceToConsumeDelayMillis; // Reports delay from 
all partitions in one gauge
 
   private volatile Long produceToConsumeDelayValue = -1L;
@@ -115,8 +117,8 @@ public class SpecStoreChangeMonitor extends 
HighLevelConsumer {
         produceToConsumeDelayValue);
 
     String changeIdentifier = tid + key;
-    if (!ChangeMonitorUtils.shouldProcessMessage(changeIdentifier, 
specChangesSeenCache, operation,
-        produceTimestamp.toString())) {
+    if (!ChangeMonitorUtils.isValidAndUniqueMessage(changeIdentifier, 
operation, produceTimestamp.toString(),
+        specChangesSeenCache, duplicateMessagesMeter, heartbeatMessagesMeter)) 
{
       return;
     }
 
@@ -179,6 +181,8 @@ public class SpecStoreChangeMonitor extends 
HighLevelConsumer {
     this.deletedSpecs = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MONITOR_DELETED_SPECS);
     this.unexpectedErrors = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MONITOR_UNEXPECTED_ERRORS);
     this.messageProcessedMeter = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED);
+    this.duplicateMessagesMeter = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_DUPLICATE_MESSAGES);
+    this.heartbeatMessagesMeter = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_HEARTBEAT_MESSAGES);
     this.produceToConsumeDelayMillis = 
this.getMetricContext().newContextAwareGauge(RuntimeMetrics.GOBBLIN_SPEC_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS,
 () -> produceToConsumeDelayValue);
     this.getMetricContext().register(this.produceToConsumeDelayMillis);
   }

Reply via email to