This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 47b9bb002 [GOBBLIN-1736] Add metrics for change stream monitor and 
mysql quota manager (#3593)
47b9bb002 is described below

commit 47b9bb0024c6005925a6b958c0e0563a35ecb8bb
Author: Zihan Li <[email protected]>
AuthorDate: Mon Nov 14 10:01:48 2022 -0800

    [GOBBLIN-1736] Add metrics for change stream monitor and mysql quota 
manager (#3593)
    
    * address comments
    
    * use connectionmanager when httpclient is not cloesable
    
    * [GOBBLIN-1730] Include flow execution id when try to cancel/submit job 
using SimpleKafkaSpecProducer
    
    * remove unnecessary dependency
    
    * [GOBBLIN-1736] Add metrics for change stream monitor and mysql quota 
manager
    
    * Revert "remove unnecessary dependency"
    
    This reverts commit d6871dc4caa382ea431bd254959c42230b60471c.
    
    * Revert "[GOBBLIN-1730] Include flow execution id when try to 
cancel/submit job using SimpleKafkaSpecProducer"
    
    This reverts commit f13e6ead489cec226c10db36d82642ba818dd60b.
    
    * address comments
    
    * address comments
    
    Co-authored-by: Zihan Li <[email protected]>
---
 .../apache/gobblin/runtime/metrics/RuntimeMetrics.java    |  6 ++++++
 .../modules/orchestration/AbstractUserQuotaManager.java   |  5 +++++
 .../modules/orchestration/MysqlUserQuotaManager.java      | 12 +++++++++++-
 .../service/monitoring/DagActionStoreChangeMonitor.java   | 12 ++++++++----
 .../service/monitoring/SpecStoreChangeMonitor.java        | 15 +++++++++------
 5 files changed, 39 insertions(+), 11 deletions(-)

diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
index c63419d54..c2889ea95 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
@@ -40,10 +40,16 @@ public class RuntimeMetrics {
   public static final String GOBBLIN_SPEC_STORE_MONITOR_FAILED_ADDED_SPECS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
"gobblin.specStoreMonitor.failed.added.specs";
   public static final String GOBBLIN_SPEC_STORE_MONITOR_DELETED_SPECS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
"gobblin.specStoreMonitor.deleted.specs";
   public static final String GOBBLIN_SPEC_STORE_MONITOR_UNEXPECTED_ERRORS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
"gobblin.specStoreMonitor.unexpected.errors";
+  public static final String GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED= 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
"gobblin.specStoreMonitor.message.processed";
   public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
"gobblin.dagActionStore.kills.invoked";
+  public static final String 
GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED= 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
"gobblin.dagActionStoreMonitor.message.processed";
   public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED 
= ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
"gobblin.dagActionStore.resumes.invoked";
   public static final String 
GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
"gobblin.dagActionStore.unexpected.errors";
 
+  public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_UNEXPECTED_ERRORS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
"gobblin.mysql.quota.manager.unexpected.errors";
+  public static final String 
GOBBLIN_MYSQL_QUOTA_MANAGER_QUOTA_REQUESTS_EXCEEDED = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
"gobblin.mysql.quota.manager.quotaRequests.exceeded";
+  public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_TIME_TO_CHECK_QUOTA = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
"gobblin.mysql.quota.manager.time.to.check.quota";
+
   // Metadata keys
   public static final String TOPIC = "topic";
   public static final String GROUP_ID = "groupId";
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java
index b92d6d610..5f83f5290 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java
@@ -27,6 +27,8 @@ import lombok.AllArgsConstructor;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.util.ConfigUtils;
 
 
@@ -42,10 +44,13 @@ abstract public class AbstractUserQuotaManager implements 
UserQuotaManager {
   public static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE;
   private final Map<String, Integer> perUserQuota;
   private final Map<String, Integer> perFlowGroupQuota;
+  protected MetricContext metricContext;
 
   private final int defaultQuota;
 
   public AbstractUserQuotaManager(Config config) {
+    this.metricContext = Instrumented.getMetricContext(new 
org.apache.gobblin.configuration.State(ConfigUtils.configToProperties(config)),
+        this.getClass());
     this.defaultQuota = ConfigUtils.getInt(config, USER_JOB_QUOTA_KEY, 
DEFAULT_USER_JOB_QUOTA);
     ImmutableMap.Builder<String, Integer> userMapBuilder = 
ImmutableMap.builder();
     ImmutableMap.Builder<String, Integer> flowGroupMapBuilder = 
ImmutableMap.builder();
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
index 93eb29e24..41a9a03a8 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
@@ -17,6 +17,8 @@
 
 package org.apache.gobblin.service.modules.orchestration;
 
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
 import java.io.IOException;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
@@ -38,6 +40,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.exception.QuotaExceededException;
 import org.apache.gobblin.metastore.MysqlStateStore;
+import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
@@ -53,6 +56,8 @@ public class MysqlUserQuotaManager extends 
AbstractUserQuotaManager {
   public final static String CONFIG_PREFIX= "MysqlUserQuotaManager";
   public final MysqlQuotaStore quotaStore;
   public final RunningDagIdsStore runningDagIds;
+  private Meter quotaExceedsRequests;
+  private Meter failedQuotaCheck;
 
 
   @Inject
@@ -66,6 +71,8 @@ public class MysqlUserQuotaManager extends 
AbstractUserQuotaManager {
     }
     this.quotaStore = createQuotaStore(quotaStoreConfig);
     this.runningDagIds = createRunningDagStore(quotaStoreConfig);
+    this.failedQuotaCheck = 
this.metricContext.contextAwareMeter(RuntimeMetrics.GOBBLIN_MYSQL_QUOTA_MANAGER_UNEXPECTED_ERRORS);
+    this.quotaExceedsRequests = 
this.metricContext.contextAwareMeter(RuntimeMetrics.GOBBLIN_MYSQL_QUOTA_MANAGER_QUOTA_REQUESTS_EXCEEDED);
   }
 
   void addDagId(Connection connection, String dagId) throws IOException {
@@ -87,18 +94,21 @@ public class MysqlUserQuotaManager extends 
AbstractUserQuotaManager {
 
   @Override
   public void checkQuota(Collection<Dag.DagNode<JobExecutionPlan>> dagNodes) 
throws IOException {
-    try (Connection connection = this.quotaStore.dataSource.getConnection()) {
+    try (Connection connection = this.quotaStore.dataSource.getConnection();
+        Timer.Context context = 
metricContext.timer(RuntimeMetrics.GOBBLIN_MYSQL_QUOTA_MANAGER_TIME_TO_CHECK_QUOTA).time())
 {
       connection.setAutoCommit(false);
 
       for (Dag.DagNode<JobExecutionPlan> dagNode : dagNodes) {
         QuotaCheck quotaCheck = increaseAndCheckQuota(connection, dagNode);
         if ((!quotaCheck.proxyUserCheck || !quotaCheck.requesterCheck || 
!quotaCheck.flowGroupCheck)) {
           connection.rollback();
+          quotaExceedsRequests.mark();
           throw new QuotaExceededException(quotaCheck.requesterMessage);
         }
       }
       connection.commit();
     } catch (SQLException e) {
+      this.failedQuotaCheck.mark();
       throw new IOException(e);
     }
   }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index 52be719f1..1e9bdaffe 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -33,6 +33,7 @@ import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
 import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.runtime.api.DagActionStore;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
@@ -50,9 +51,10 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
   public static final String DAG_ACTION_CHANGE_MONITOR_PREFIX = 
"dagActionChangeStore";
 
   // Metrics
-  ContextAwareMeter killsInvoked;
-  ContextAwareMeter resumesInvoked;
-  ContextAwareMeter unexpectedErrors;
+  private ContextAwareMeter killsInvoked;
+  private ContextAwareMeter resumesInvoked;
+  private ContextAwareMeter unexpectedErrors;
+  private ContextAwareMeter messageProcessedMeter;
 
   protected CacheLoader<String, String> cacheLoader = new CacheLoader<String, 
String>() {
     @Override
@@ -92,7 +94,8 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
   partitioned and processed by only one thread (and corresponding queue).
    */
   protected void processMessage(DecodeableKafkaRecord message) {
-    // TODO: Add metric that service is healthy and we're continuously 
processing messages.
+    // This will also include the heathCheck message so that we can rely on 
this to monitor the health of this Monitor
+    messageProcessedMeter.mark();
     String key = (String) message.getKey();
     DagActionStoreChangeEvent value = (DagActionStoreChangeEvent) 
message.getValue();
 
@@ -171,6 +174,7 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
     this.killsInvoked = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED);
     this.resumesInvoked = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED);
     this.unexpectedErrors = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS);
+    this.messageProcessedMeter = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED);
   }
 
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
index f58cb551c..7b3d0c75d 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
@@ -24,7 +24,6 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.text.StringEscapeUtils;
 
-import com.codahale.metrics.Meter;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
@@ -35,6 +34,7 @@ import com.typesafe.config.ConfigValueFactory;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
+import org.apache.gobblin.metrics.ContextAwareMeter;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
@@ -54,10 +54,11 @@ public class SpecStoreChangeMonitor extends 
HighLevelConsumer {
   public static final String SPEC_STORE_CHANGE_MONITOR_PREFIX = 
"specStoreChangeMonitor";
 
   // Metrics
-  private Meter successfullyAddedSpecs;
-  private Meter failedAddedSpecs;
-  private Meter deletedSpecs;
-  private Meter unexpectedErrors;
+  private ContextAwareMeter successfullyAddedSpecs;
+  private ContextAwareMeter messageProcessedMeter;
+  private ContextAwareMeter failedAddedSpecs;
+  private ContextAwareMeter deletedSpecs;
+  private ContextAwareMeter unexpectedErrors;
 
   protected CacheLoader<String, String> cacheLoader = new CacheLoader<String, 
String>() {
     @Override
@@ -97,7 +98,8 @@ public class SpecStoreChangeMonitor extends HighLevelConsumer 
{
   associated with it), a given message itself will be partitioned and assigned 
to only one queue.
    */
   protected void processMessage(DecodeableKafkaRecord message) {
-    // TODO: Add metric that service is healthy and we're continuously 
processing messages.
+    // This will also include the heathCheck message so that we can rely on 
this to monitor the health of this Monitor
+    messageProcessedMeter.mark();
     String key = (String) message.getKey();
     GenericStoreChangeEvent value = (GenericStoreChangeEvent) 
message.getValue();
 
@@ -168,5 +170,6 @@ public class SpecStoreChangeMonitor extends 
HighLevelConsumer {
     this.failedAddedSpecs = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MONITOR_FAILED_ADDED_SPECS);
     this.deletedSpecs = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MONITOR_DELETED_SPECS);
     this.unexpectedErrors = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MONITOR_UNEXPECTED_ERRORS);
+    this.messageProcessedMeter = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED);
   }
 }
\ No newline at end of file

Reply via email to