This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new dc785d110 [GOBBLIN-1937] Quantify Missed Work Completed by Reminders
(#3808)
dc785d110 is described below
commit dc785d1101a01d8b4c55ac913ac5cd63b71ede6b
Author: umustafi <[email protected]>
AuthorDate: Fri Oct 27 13:04:16 2023 -0700
[GOBBLIN-1937] Quantify Missed Work Completed by Reminders (#3808)
* Quantify Missed Work Completed by Reminders
Also fix bug to filter out heartbeat events before extracting field
* Refactor changeMonitorUtils & add delimiter to metrics prefix
* Re-order params to group similar ones
---------
Co-authored-by: Urmi Mustafi <[email protected]>
---
.../apache/gobblin/metrics/ServiceMetricNames.java | 48 ++++++++--------
.../gobblin/runtime/metrics/RuntimeMetrics.java | 65 +++++++++++-----------
.../service/modules/orchestration/DagManager.java | 2 +-
.../modules/orchestration/FlowTriggerHandler.java | 5 ++
.../service/monitoring/ChangeMonitorUtils.java | 16 ++++--
.../monitoring/DagActionStoreChangeMonitor.java | 31 ++++++-----
.../service/monitoring/SpecStoreChangeMonitor.java | 8 ++-
7 files changed, 98 insertions(+), 77 deletions(-)
diff --git
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
index aff097bbd..5550513c4 100644
---
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
+++
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
@@ -20,37 +20,39 @@ public class ServiceMetricNames {
// These prefixes can be used to distinguish metrics reported by
GobblinService from other metrics reported by Gobblin
// This can be used in conjunction with MetricNameRegexFilter to filter out
metrics in any MetricReporter
public static final String GOBBLIN_SERVICE_PREFIX = "GobblinService";
+ public static final String GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER =
GOBBLIN_SERVICE_PREFIX + ".";
public static final String GOBBLIN_JOB_METRICS_PREFIX = "JobMetrics";
// Flow Compilation Meters and Timer
- public static final String FLOW_COMPILATION_SUCCESSFUL_METER =
GOBBLIN_SERVICE_PREFIX + "flowCompilation.successful";
- public static final String FLOW_COMPILATION_FAILED_METER =
GOBBLIN_SERVICE_PREFIX + "flowCompilation.failed";
- public static final String FLOW_COMPILATION_TIMER = GOBBLIN_SERVICE_PREFIX +
"flowCompilation.time";
- public static final String DATA_AUTHORIZATION_TIMER = GOBBLIN_SERVICE_PREFIX
+ "flowCompilation.dataAuthorization.time";
+ public static final String FLOW_COMPILATION_SUCCESSFUL_METER =
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "flowCompilation.successful";
+ public static final String FLOW_COMPILATION_FAILED_METER =
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "flowCompilation.failed";
+ public static final String FLOW_COMPILATION_TIMER =
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "flowCompilation.time";
+ public static final String DATA_AUTHORIZATION_TIMER =
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER +
"flowCompilation.dataAuthorization.time";
// Flow Orchestration Meters and Timer
- public static final String FLOW_ORCHESTRATION_SUCCESSFUL_METER =
GOBBLIN_SERVICE_PREFIX + ".flowOrchestration.successful";
- public static final String FLOW_ORCHESTRATION_FAILED_METER =
GOBBLIN_SERVICE_PREFIX + ".flowOrchestration.failed";
- public static final String FLOW_ORCHESTRATION_TIMER = GOBBLIN_SERVICE_PREFIX
+ ".flowOrchestration.time";
- public static final String FLOW_ORCHESTRATION_DELAY = GOBBLIN_SERVICE_PREFIX
+ ".flowOrchestration.delay";
+ public static final String FLOW_ORCHESTRATION_SUCCESSFUL_METER =
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "flowOrchestration.successful";
+ public static final String FLOW_ORCHESTRATION_FAILED_METER =
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "flowOrchestration.failed";
+ public static final String FLOW_ORCHESTRATION_TIMER =
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "flowOrchestration.time";
+ public static final String FLOW_ORCHESTRATION_DELAY =
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "flowOrchestration.delay";
// Flow Trigger Handler
- public static final String FLOW_TRIGGER_HANDLER_PREFIX =
"flowTriggerHandler";
- public static final String GOBBLIN_FLOW_TRIGGER_HANDLER_NUM_FLOWS_SUBMITTED
= ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "." + FLOW_TRIGGER_HANDLER_PREFIX
+ ".numFlowsSubmitted";
- public static final String FLOW_TRIGGER_HANDLER_LEASE_OBTAINED_COUNT =
GOBBLIN_SERVICE_PREFIX + "." + FLOW_TRIGGER_HANDLER_PREFIX + ".leaseObtained";
- public static final String FLOW_TRIGGER_HANDLER_LEASED_TO_ANOTHER_COUNT =
GOBBLIN_SERVICE_PREFIX + "." + FLOW_TRIGGER_HANDLER_PREFIX + ".leasedToAnother";
- public static final String FLOW_TRIGGER_HANDLER_NO_LONGER_LEASING_COUNT =
GOBBLIN_SERVICE_PREFIX + "." + FLOW_TRIGGER_HANDLER_PREFIX + ".noLongerLeasing";
- public static final String FLOW_TRIGGER_HANDLER_JOB_DOES_NOT_EXIST_COUNT =
GOBBLIN_SERVICE_PREFIX + "." + FLOW_TRIGGER_HANDLER_PREFIX +
".jobDoesNotExistInScheduler";
- public static final String FLOW_TRIGGER_HANDLER_FAILED_TO_SET_REMINDER_COUNT
= GOBBLIN_SERVICE_PREFIX + "." + FLOW_TRIGGER_HANDLER_PREFIX +
".failedToSetReminderCount";
+ public static final String FLOW_TRIGGER_HANDLER_PREFIX =
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "flowTriggerHandler";
+ public static final String GOBBLIN_FLOW_TRIGGER_HANDLER_NUM_FLOWS_SUBMITTED
= FLOW_TRIGGER_HANDLER_PREFIX + ".numFlowsSubmitted";
+ public static final String FLOW_TRIGGER_HANDLER_LEASE_OBTAINED_COUNT =
FLOW_TRIGGER_HANDLER_PREFIX + ".leaseObtained";
+ public static final String FLOW_TRIGGER_HANDLER_LEASED_TO_ANOTHER_COUNT =
FLOW_TRIGGER_HANDLER_PREFIX + ".leasedToAnother";
+ public static final String FLOW_TRIGGER_HANDLER_NO_LONGER_LEASING_COUNT =
FLOW_TRIGGER_HANDLER_PREFIX + ".noLongerLeasing";
+ public static final String FLOW_TRIGGER_HANDLER_JOB_DOES_NOT_EXIST_COUNT =
FLOW_TRIGGER_HANDLER_PREFIX + ".jobDoesNotExistInScheduler";
+ public static final String FLOW_TRIGGER_HANDLER_FAILED_TO_SET_REMINDER_COUNT
= FLOW_TRIGGER_HANDLER_PREFIX + ".failedToSetReminderCount";
+ public static final String
FLOW_TRIGGER_HANDLER_LEASES_OBTAINED_DUE_TO_REMINDER_COUNT =
FLOW_TRIGGER_HANDLER_PREFIX + ".leasesObtainedDueToReminderCount";
// DagManager Related Metrics
- public static final String DAG_MANAGER_PREFIX = GOBBLIN_SERVICE_PREFIX +
".dagManager";
+ public static final String DAG_MANAGER_PREFIX =
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "dagManager";
public static final String
DAG_MANAGER_FAILED_LAUNCH_EVENTS_ON_STARTUP_COUNT = DAG_MANAGER_PREFIX +
".failedLaunchEventsOnStartupCount";
public static final String FLOW_FAILED_FORWARD_TO_DAG_MANAGER_COUNT =
DAG_MANAGER_PREFIX + ".flowFailedForwardToDagManagerCount";
//Job status poll timer
- public static final String JOB_STATUS_POLLED_TIMER = GOBBLIN_SERVICE_PREFIX
+ ".jobStatusPoll.time";
+ public static final String JOB_STATUS_POLLED_TIMER =
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "jobStatusPoll.time";
public static final String CREATE_FLOW_METER = "CreateFlow";
public static final String DELETE_FLOW_METER = "DeleteFlow";
@@ -59,9 +61,9 @@ public class ServiceMetricNames {
public static final String START_SLA_EXCEEDED_FLOWS_METER =
"StartSLAExceededFlows";
public static final String SLA_EXCEEDED_FLOWS_METER = "SlaExceededFlows";
public static final String FAILED_FLOW_METER = "FailedFlows";
- public static final String SCHEDULED_FLOW_METER = GOBBLIN_SERVICE_PREFIX +
".ScheduledFlows";
- public static final String NON_SCHEDULED_FLOW_METER = GOBBLIN_SERVICE_PREFIX
+ ".NonScheduledFlows";
- public static final String SKIPPED_FLOWS = GOBBLIN_SERVICE_PREFIX +
".SkippedFlows";
+ public static final String SCHEDULED_FLOW_METER =
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "ScheduledFlows";
+ public static final String NON_SCHEDULED_FLOW_METER =
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "NonScheduledFlows";
+ public static final String SKIPPED_FLOWS =
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "SkippedFlows";
public static final String RUNNING_FLOWS_COUNTER = "RunningFlows";
public static final String SERVICE_USERS = "ServiceUsers";
public static final String COMPILED = "Compiled";
@@ -70,9 +72,9 @@ public class ServiceMetricNames {
public static final String HELIX_LEADER_STATE = "HelixLeaderState";
- public static final String FLOWGRAPH_UPDATE_FAILED_METER =
GOBBLIN_SERVICE_PREFIX + ".FlowgraphUpdateFailed";
+ public static final String FLOWGRAPH_UPDATE_FAILED_METER =
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "FlowgraphUpdateFailed";
- public static final String DAG_COUNT_MYSQL_DAG_STATE_COUNT =
GOBBLIN_SERVICE_PREFIX + ".MysqlDagStateStore" + ".totalDagCount";
+ public static final String DAG_COUNT_MYSQL_DAG_STATE_COUNT =
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "MysqlDagStateStore" + ".totalDagCount";
- public static final String DAG_COUNT_FS_DAG_STATE_COUNT =
GOBBLIN_SERVICE_PREFIX + ".FsDagStateStore" + ".totalDagCount";
+ public static final String DAG_COUNT_FS_DAG_STATE_COUNT =
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "FsDagStateStore" + ".totalDagCount";
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
index 86adb4671..de5cb2e99 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
@@ -37,47 +37,50 @@ public class RuntimeMetrics {
public static final String GOBBLIN_JOB_MONITOR_SLAEVENT_REJECTEDEVENTS =
"gobblin.jobMonitor.slaevent.rejectedevents";
public static final String GOBBLIN_JOB_MONITOR_KAFKA_MESSAGE_PARSE_FAILURES =
"gobblin.jobMonitor.kafka.messageParseFailures";
- public static final String
GOBBLIN_SPEC_STORE_MONITOR_SUCCESSFULLY_ADDED_SPECS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".specStoreMonitor.successful.added.specs";
- public static final String GOBBLIN_SPEC_STORE_MONITOR_FAILED_ADDED_SPECS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".specStoreMonitor.failed.added.specs";
- public static final String GOBBLIN_SPEC_STORE_MONITOR_DELETED_SPECS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".specStoreMonitor.deleted.specs";
- public static final String GOBBLIN_SPEC_STORE_MONITOR_UNEXPECTED_ERRORS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".specStoreMonitor.unexpected.errors";
- public static final String GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED=
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".specStoreMonitor.message.processed";
- public static final String
GOBBLIN_SPEC_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS =
- ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".specstoreMonitor.produce.to.consume.delay";
- public static final String DAG_ACTION_STORE_MONITOR_PREFIX =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagActionStoreMonitor";
- public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED =
DAG_ACTION_STORE_MONITOR_PREFIX + ".kills.invoked";
- public static final String
GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED =
DAG_ACTION_STORE_MONITOR_PREFIX + ".message.processed";
- public static final String
GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGES_FILTERED_OUT =
DAG_ACTION_STORE_MONITOR_PREFIX + ".messagesFilteredOut";
- public static final String
GOBBLIN_DAG_ACTION_STORE_MONITOR_MALFORMED_MESSAGES_SKIPPED =
DAG_ACTION_STORE_MONITOR_PREFIX + ".malformedMessagedSkipped";
- public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED
= DAG_ACTION_STORE_MONITOR_PREFIX + ".resumes.invoked";
- public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_FLOWS_LAUNCHED =
DAG_ACTION_STORE_MONITOR_PREFIX + ".flows.launched";
+ public static final String SPEC_STORE_MONITOR_PREFIX =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "specStoreMonitor.";
+ public static final String
GOBBLIN_SPEC_STORE_MONITOR_SUCCESSFULLY_ADDED_SPECS = SPEC_STORE_MONITOR_PREFIX
+ "successful.added.specs";
+ public static final String GOBBLIN_SPEC_STORE_MONITOR_FAILED_ADDED_SPECS =
SPEC_STORE_MONITOR_PREFIX + "failed.added.specs";
+ public static final String GOBBLIN_SPEC_STORE_MONITOR_DELETED_SPECS =
SPEC_STORE_MONITOR_PREFIX + "deleted.specs";
+ public static final String GOBBLIN_SPEC_STORE_MONITOR_UNEXPECTED_ERRORS =
SPEC_STORE_MONITOR_PREFIX + "unexpected.errors";
+ public static final String GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED =
SPEC_STORE_MONITOR_PREFIX + "message.processed";
+ public static final String GOBBLIN_SPEC_STORE_DUPLICATE_MESSAGES =
SPEC_STORE_MONITOR_PREFIX + "duplicateMessages";
+ public static final String GOBBLIN_SPEC_STORE_HEARTBEAT_MESSAGES =
SPEC_STORE_MONITOR_PREFIX + "heartbeatMessages";
+ public static final String
GOBBLIN_SPEC_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS = SPEC_STORE_MONITOR_PREFIX
+ "produce.to.consume.delay";
+ public static final String DAG_ACTION_STORE_MONITOR_PREFIX =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER +
"dagActionStoreMonitor.";
+ public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED =
DAG_ACTION_STORE_MONITOR_PREFIX + "kills.invoked";
+ public static final String
GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED =
DAG_ACTION_STORE_MONITOR_PREFIX + "message.processed";
+ public static final String
GOBBLIN_DAG_ACTION_STORE_MONITOR_DUPLICATE_MESSAGES =
DAG_ACTION_STORE_MONITOR_PREFIX + "duplicateMessages";
+ public static final String
GOBBLIN_DAG_ACTION_STORE_MONITOR_HEARTBEAT_MESSAGES =
DAG_ACTION_STORE_MONITOR_PREFIX + "heartbeatMessages";
+ public static final String
GOBBLIN_DAG_ACTION_STORE_MONITOR_NULL_DAG_ACTION_TYPE_MESSAGES =
DAG_ACTION_STORE_MONITOR_PREFIX + "nullDagActionTypeMessages";
+ public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED
= DAG_ACTION_STORE_MONITOR_PREFIX + "resumes.invoked";
+ public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_FLOWS_LAUNCHED =
DAG_ACTION_STORE_MONITOR_PREFIX + "flows.launched";
- public static final String
GOBBLIN_DAG_ACTION_STORE_FAILED_FLOW_LAUNCHED_SUBMISSIONS =
DAG_ACTION_STORE_MONITOR_PREFIX + ".failedFlowLaunchSubmissions";
- public static final String
GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS =
DAG_ACTION_STORE_MONITOR_PREFIX + ".unexpected.errors";
+ public static final String
GOBBLIN_DAG_ACTION_STORE_FAILED_FLOW_LAUNCHED_SUBMISSIONS =
DAG_ACTION_STORE_MONITOR_PREFIX + "failedFlowLaunchSubmissions";
+ public static final String
GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS =
DAG_ACTION_STORE_MONITOR_PREFIX + "unexpected.errors";
public static final String
- GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS =
DAG_ACTION_STORE_MONITOR_PREFIX + ".produce.to.consume.delay";
- public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_UNEXPECTED_ERRORS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
"gobblin.mysql.quota.manager.unexpected.errors";
- public static final String
GOBBLIN_MYSQL_QUOTA_MANAGER_QUOTA_REQUESTS_EXCEEDED =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
"gobblin.mysql.quota.manager.quotaRequests.exceeded";
- public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_TIME_TO_CHECK_QUOTA =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
"gobblin.mysql.quota.manager.time.to.check.quota";
+ GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS =
DAG_ACTION_STORE_MONITOR_PREFIX + "produce.to.consume.delay";
+ public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_UNEXPECTED_ERRORS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER +
"gobblin.mysql.quota.manager.unexpected.errors";
+ public static final String
GOBBLIN_MYSQL_QUOTA_MANAGER_QUOTA_REQUESTS_EXCEEDED =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER +
"gobblin.mysql.quota.manager.quotaRequests.exceeded";
+ public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_TIME_TO_CHECK_QUOTA =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER +
"gobblin.mysql.quota.manager.time.to.check.quota";
// The following metrics are used to identify the bottlenecks for
initializing the job scheduler
public static final String
- GOBBLIN_JOB_SCHEDULER_GET_SPECS_DURING_STARTUP_PER_SPEC_RATE_NANOS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".jobScheduler.getSpecsDuringStartupPerSpecRateNanos";
- public static final String GOBBLIN_JOB_SCHEDULER_LOAD_SPECS_BATCH_SIZE =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".jobScheduler.loadSpecBatchSize";
+ GOBBLIN_JOB_SCHEDULER_GET_SPECS_DURING_STARTUP_PER_SPEC_RATE_NANOS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER +
"jobScheduler.getSpecsDuringStartupPerSpecRateNanos";
+ public static final String GOBBLIN_JOB_SCHEDULER_LOAD_SPECS_BATCH_SIZE =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER +
"jobScheduler.loadSpecBatchSize";
public static final String
- GOBBLIN_JOB_SCHEDULER_TIME_TO_INITIALIZE_SCHEDULER_NANOS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".jobScheduler.timeToInitializeSchedulerNanos";
+ GOBBLIN_JOB_SCHEDULER_TIME_TO_INITIALIZE_SCHEDULER_NANOS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER +
"jobScheduler.timeToInitializeSchedulerNanos";
public static final String
- GOBBLIN_JOB_SCHEDULER_TIME_TO_OBTAIN_SPEC_URIS_NANOS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".jobScheduler.timeToObtainSpecUrisNanos";
+ GOBBLIN_JOB_SCHEDULER_TIME_TO_OBTAIN_SPEC_URIS_NANOS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER +
"jobScheduler.timeToObtainSpecUrisNanos";
public static final String
- GOBBLIN_JOB_SCHEDULER_INDIVIDUAL_GET_SPEC_SPEED_NANOS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".jobScheduler.individualGetSpecSpeedNanos";
+ GOBBLIN_JOB_SCHEDULER_INDIVIDUAL_GET_SPEC_SPEED_NANOS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER +
"jobScheduler.individualGetSpecSpeedNanos";
public static final String
- GOBBLIN_JOB_SCHEDULER_EACH_COMPLETE_ADD_SPEC_NANOS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".jobScheduler.eachCompleteAddSpecNanos";
+ GOBBLIN_JOB_SCHEDULER_EACH_COMPLETE_ADD_SPEC_NANOS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER +
"jobScheduler.eachCompleteAddSpecNanos";
public static final String
- GOBBLIN_JOB_SCHEDULER_EACH_SPEC_COMPILATION_NANOS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".jobScheduler.eachSpecCompilationNanos";
- public static final String GOBBLIN_JOB_SCHEDULER_EACH_SCHEDULE_JOB_NANOS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".jobScheduler.eachScheduleJobNanos";
- public static final String GOBBLIN_JOB_SCHEDULER_TOTAL_GET_SPEC_TIME_NANOS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".jobScheduler.totalGetSpecTimeNanos";
- public static final String GOBBLIN_JOB_SCHEDULER_TOTAL_ADD_SPEC_TIME_NANOS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".jobScheduler.totalAddSpecTimeNanos";
- public static final String
GOBBLIN_JOB_SCHEDULER_NUM_JOBS_SCHEDULED_DURING_STARTUP =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".jobScheduler.numJobsScheduledDuringStartup";
+ GOBBLIN_JOB_SCHEDULER_EACH_SPEC_COMPILATION_NANOS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER +
"jobScheduler.eachSpecCompilationNanos";
+ public static final String GOBBLIN_JOB_SCHEDULER_EACH_SCHEDULE_JOB_NANOS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER +
"jobScheduler.eachScheduleJobNanos";
+ public static final String GOBBLIN_JOB_SCHEDULER_TOTAL_GET_SPEC_TIME_NANOS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER +
"jobScheduler.totalGetSpecTimeNanos";
+ public static final String GOBBLIN_JOB_SCHEDULER_TOTAL_ADD_SPEC_TIME_NANOS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER +
"jobScheduler.totalAddSpecTimeNanos";
+ public static final String
GOBBLIN_JOB_SCHEDULER_NUM_JOBS_SCHEDULED_DURING_STARTUP =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER +
"jobScheduler.numJobsScheduledDuringStartup";
// Metadata keys
public static final String TOPIC = "topic";
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index b4ec9c0ce..c1553da39 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -138,7 +138,7 @@ public class DagManager extends AbstractIdleService {
private static final long DAG_FLOW_STATUS_TOLERANCE_TIME_MILLIS =
TimeUnit.MINUTES.toMillis(5);
public static final String FAILED_DAG_POLLING_INTERVAL =
FAILED_DAG_STATESTORE_PREFIX + ".retention.pollingIntervalMinutes";
public static final Integer DEFAULT_FAILED_DAG_POLLING_INTERVAL = 60;
- public static final String DAG_MANAGER_HEARTBEAT =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagManager.heartbeat-%s";
+ public static final String DAG_MANAGER_HEARTBEAT =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER +
"dagManager.heartbeat-%s";
// Default job start SLA time if configured, measured in minutes. Default is
10 minutes
private static final String JOB_START_SLA_TIME = DAG_MANAGER_PREFIX +
ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME;
private static final String JOB_START_SLA_UNITS = DAG_MANAGER_PREFIX +
ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT;
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
index c5a5bb8e0..af65390ec 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
@@ -79,6 +79,7 @@ public class FlowTriggerHandler {
private ContextAwareCounter noLongerLeasingStatusCount;
private ContextAwareCounter jobDoesNotExistInSchedulerCount;
private ContextAwareCounter failedToSetEventReminderCount;
+ private ContextAwareMeter leasesObtainedDueToReminderCount;
@Inject
public FlowTriggerHandler(Config config, Optional<MultiActiveLeaseArbiter>
leaseDeterminationStore,
@@ -96,6 +97,7 @@ public class FlowTriggerHandler {
this.noLongerLeasingStatusCount =
this.metricContext.contextAwareCounter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_NO_LONGER_LEASING_COUNT);
this.jobDoesNotExistInSchedulerCount =
this.metricContext.contextAwareCounter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_JOB_DOES_NOT_EXIST_COUNT);
this.failedToSetEventReminderCount =
this.metricContext.contextAwareCounter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_FAILED_TO_SET_REMINDER_COUNT);
+ this.leasesObtainedDueToReminderCount =
this.metricContext.contextAwareMeter(ServiceMetricNames.FLOW_TRIGGER_HANDLER_LEASES_OBTAINED_DUE_TO_REMINDER_COUNT);
}
/**
@@ -116,6 +118,9 @@ public class FlowTriggerHandler {
// id. From this point onwards, always use the newer version of the flow
action to easily track the action through
// orchestration and execution.
if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+ if (isReminderEvent) {
+ this.leasesObtainedDueToReminderCount.mark();
+ }
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus =
(MultiActiveLeaseArbiter.LeaseObtainedStatus)
leaseAttemptStatus;
this.leaseObtainedCount.inc();
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/ChangeMonitorUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/ChangeMonitorUtils.java
index a2d68fbc0..9e121c8b7 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/ChangeMonitorUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/ChangeMonitorUtils.java
@@ -20,6 +20,8 @@ package org.apache.gobblin.service.monitoring;
import com.google.common.cache.LoadingCache;
import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+
@Slf4j
public final class ChangeMonitorUtils {
@@ -28,20 +30,24 @@ public final class ChangeMonitorUtils {
}
/**
- * Performs checks for duplicate messages and heartbeat operation prior to
processing a message. Returns true if
- * the pre-conditions above don't apply and we should proceed processing the
change event
+ * Performs checks for duplicate messages, heartbeat message types, or null
dag action types all of which cannot or
+ * should not be processed. Returns true if the pre-conditions above don't
apply, and we should proceed processing
+ * the change event
*/
- public static boolean shouldProcessMessage(String changeIdentifier,
LoadingCache<String, String> cache,
- String operation, String timestamp) {
+ public static boolean isValidAndUniqueMessage(String changeIdentifier,
String operation, String timestamp,
+ LoadingCache<String, String> cache, ContextAwareMeter
duplicateMessagesMeter,
+ ContextAwareMeter heartbeatMessagesMeter) {
// If we've already processed a message with this timestamp and key before
then skip duplicate message
if (cache.getIfPresent(changeIdentifier) != null) {
- log.info("Duplicate change event with identifier {}", changeIdentifier);
+ log.debug("Duplicate change event with identifier {}", changeIdentifier);
+ duplicateMessagesMeter.mark();
return false;
}
// If event is a heartbeat type then log it and skip processing
if (operation.equals("HEARTBEAT")) {
log.debug("Received heartbeat message from time {}", timestamp);
+ heartbeatMessagesMeter.mark();
return false;
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index b05e73105..1190a1e46 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -61,8 +61,9 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
private ContextAwareMeter failedFlowLaunchSubmissions;
private ContextAwareMeter unexpectedErrors;
private ContextAwareMeter messageProcessedMeter;
- private ContextAwareMeter messageFilteredOutMeter;
- private ContextAwareMeter malformedMessagesSkippedMeter;
+ private ContextAwareMeter duplicateMessagesMeter;
+ private ContextAwareMeter heartbeatMessagesMeter;
+ private ContextAwareMeter nullDagActionTypeMessagesMeter;
private ContextAwareGauge produceToConsumeDelayMillis; // Reports delay from
all partitions in one gauge
private volatile Long produceToConsumeDelayValue = -1L;
@@ -124,24 +125,23 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
String flowName = value.getFlowName();
String flowExecutionId = value.getFlowExecutionId();
- if (value.getDagAction() == null) {
- log.warn("Skipping null dag action type received for flow group: {}
name: {} executionId: {} tid: {} operation: "
- + "{}", flowGroup, flowName, flowExecutionId, tid, operation);
- this.malformedMessagesSkippedMeter.mark();
- return;
- }
- DagActionStore.FlowActionType dagActionType =
DagActionStore.FlowActionType.valueOf(value.getDagAction().toString());
-
produceToConsumeDelayValue = calcMillisSince(produceTimestamp);
log.debug("Processing Dag Action message for flow group: {} name: {}
executionId: {} tid: {} operation: {} lag: {}",
flowGroup, flowName, flowExecutionId, tid, operation,
produceToConsumeDelayValue);
String changeIdentifier = tid + key;
- if (!ChangeMonitorUtils.shouldProcessMessage(changeIdentifier,
dagActionsSeenCache, operation,
- produceTimestamp.toString())) {
- this.messageFilteredOutMeter.mark();
+ if (!ChangeMonitorUtils.isValidAndUniqueMessage(changeIdentifier,
operation, produceTimestamp.toString(),
+ dagActionsSeenCache, duplicateMessagesMeter, heartbeatMessagesMeter)) {
return;
}
+ // check after filtering out heartbeat messages expected to have
`dagActionValue == null`
+ if (value.getDagAction() == null) {
+ log.warn("Skipping null dag action type received for identifier {} ",
changeIdentifier);
+ nullDagActionTypeMessagesMeter.mark();
+ return;
+ }
+
+ DagActionStore.FlowActionType dagActionType =
DagActionStore.FlowActionType.valueOf(value.getDagAction().toString());
// Used to easily log information to identify the dag action
DagActionStore.DagAction dagAction = new
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
@@ -231,8 +231,9 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
this.failedFlowLaunchSubmissions =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_FAILED_FLOW_LAUNCHED_SUBMISSIONS);
this.unexpectedErrors =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS);
this.messageProcessedMeter =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED);
- this.messageFilteredOutMeter =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGES_FILTERED_OUT);
- this.malformedMessagesSkippedMeter =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_MALFORMED_MESSAGES_SKIPPED);
+ this.duplicateMessagesMeter =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_DUPLICATE_MESSAGES);
+ this.heartbeatMessagesMeter =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_HEARTBEAT_MESSAGES);
+ this.nullDagActionTypeMessagesMeter =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_NULL_DAG_ACTION_TYPE_MESSAGES);
this.produceToConsumeDelayMillis =
this.getMetricContext().newContextAwareGauge(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS,
() -> produceToConsumeDelayValue);
this.getMetricContext().register(this.produceToConsumeDelayMillis);
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
index f029c08c6..8b197a352 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
@@ -59,6 +59,8 @@ public class SpecStoreChangeMonitor extends HighLevelConsumer
{
private ContextAwareMeter failedAddedSpecs;
private ContextAwareMeter deletedSpecs;
private ContextAwareMeter unexpectedErrors;
+ private ContextAwareMeter duplicateMessagesMeter;
+ private ContextAwareMeter heartbeatMessagesMeter;
private ContextAwareGauge produceToConsumeDelayMillis; // Reports delay from
all partitions in one gauge
private volatile Long produceToConsumeDelayValue = -1L;
@@ -115,8 +117,8 @@ public class SpecStoreChangeMonitor extends
HighLevelConsumer {
produceToConsumeDelayValue);
String changeIdentifier = tid + key;
- if (!ChangeMonitorUtils.shouldProcessMessage(changeIdentifier,
specChangesSeenCache, operation,
- produceTimestamp.toString())) {
+ if (!ChangeMonitorUtils.isValidAndUniqueMessage(changeIdentifier,
operation, produceTimestamp.toString(),
+ specChangesSeenCache, duplicateMessagesMeter, heartbeatMessagesMeter))
{
return;
}
@@ -179,6 +181,8 @@ public class SpecStoreChangeMonitor extends
HighLevelConsumer {
this.deletedSpecs =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MONITOR_DELETED_SPECS);
this.unexpectedErrors =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MONITOR_UNEXPECTED_ERRORS);
this.messageProcessedMeter =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED);
+ this.duplicateMessagesMeter =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_DUPLICATE_MESSAGES);
+ this.heartbeatMessagesMeter =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_HEARTBEAT_MESSAGES);
this.produceToConsumeDelayMillis =
this.getMetricContext().newContextAwareGauge(RuntimeMetrics.GOBBLIN_SPEC_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS,
() -> produceToConsumeDelayValue);
this.getMetricContext().register(this.produceToConsumeDelayMillis);
}