This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch revert-3593-GOBBLIN-1736
in repository https://gitbox.apache.org/repos/asf/gobblin.git

commit 4f9932b2cf5321f2c1fb862d3acc93e4da26fd9d
Author: Zihan Li <[email protected]>
AuthorDate: Mon Nov 14 10:29:04 2022 -0800

    Revert "[GOBBLIN-1736] Add metrics for change stream monitor and mysql 
quota manager (#3593)"
    
    This reverts commit 47b9bb0024c6005925a6b958c0e0563a35ecb8bb.
---
 .../apache/gobblin/runtime/metrics/RuntimeMetrics.java    |  6 ------
 .../modules/orchestration/AbstractUserQuotaManager.java   |  5 -----
 .../modules/orchestration/MysqlUserQuotaManager.java      | 12 +-----------
 .../service/monitoring/DagActionStoreChangeMonitor.java   | 12 ++++--------
 .../service/monitoring/SpecStoreChangeMonitor.java        | 15 ++++++---------
 5 files changed, 11 insertions(+), 39 deletions(-)

diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
index c2889ea95..c63419d54 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
@@ -40,16 +40,10 @@ public class RuntimeMetrics {
   public static final String GOBBLIN_SPEC_STORE_MONITOR_FAILED_ADDED_SPECS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
"gobblin.specStoreMonitor.failed.added.specs";
   public static final String GOBBLIN_SPEC_STORE_MONITOR_DELETED_SPECS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
"gobblin.specStoreMonitor.deleted.specs";
   public static final String GOBBLIN_SPEC_STORE_MONITOR_UNEXPECTED_ERRORS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
"gobblin.specStoreMonitor.unexpected.errors";
-  public static final String GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED= 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
"gobblin.specStoreMonitor.message.processed";
   public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
"gobblin.dagActionStore.kills.invoked";
-  public static final String 
GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED= 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
"gobblin.dagActionStoreMonitor.message.processed";
   public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED 
= ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
"gobblin.dagActionStore.resumes.invoked";
   public static final String 
GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
"gobblin.dagActionStore.unexpected.errors";
 
-  public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_UNEXPECTED_ERRORS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
"gobblin.mysql.quota.manager.unexpected.errors";
-  public static final String 
GOBBLIN_MYSQL_QUOTA_MANAGER_QUOTA_REQUESTS_EXCEEDED = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
"gobblin.mysql.quota.manager.quotaRequests.exceeded";
-  public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_TIME_TO_CHECK_QUOTA = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
"gobblin.mysql.quota.manager.time.to.check.quota";
-
   // Metadata keys
   public static final String TOPIC = "topic";
   public static final String GROUP_ID = "groupId";
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java
index 5f83f5290..b92d6d610 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java
@@ -27,8 +27,6 @@ import lombok.AllArgsConstructor;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 
-import org.apache.gobblin.instrumented.Instrumented;
-import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.util.ConfigUtils;
 
 
@@ -44,13 +42,10 @@ abstract public class AbstractUserQuotaManager implements 
UserQuotaManager {
   public static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE;
   private final Map<String, Integer> perUserQuota;
   private final Map<String, Integer> perFlowGroupQuota;
-  protected MetricContext metricContext;
 
   private final int defaultQuota;
 
   public AbstractUserQuotaManager(Config config) {
-    this.metricContext = Instrumented.getMetricContext(new 
org.apache.gobblin.configuration.State(ConfigUtils.configToProperties(config)),
-        this.getClass());
     this.defaultQuota = ConfigUtils.getInt(config, USER_JOB_QUOTA_KEY, 
DEFAULT_USER_JOB_QUOTA);
     ImmutableMap.Builder<String, Integer> userMapBuilder = 
ImmutableMap.builder();
     ImmutableMap.Builder<String, Integer> flowGroupMapBuilder = 
ImmutableMap.builder();
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
index 41a9a03a8..93eb29e24 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
@@ -17,8 +17,6 @@
 
 package org.apache.gobblin.service.modules.orchestration;
 
-import com.codahale.metrics.Meter;
-import com.codahale.metrics.Timer;
 import java.io.IOException;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
@@ -40,7 +38,6 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.exception.QuotaExceededException;
 import org.apache.gobblin.metastore.MysqlStateStore;
-import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
@@ -56,8 +53,6 @@ public class MysqlUserQuotaManager extends 
AbstractUserQuotaManager {
   public final static String CONFIG_PREFIX= "MysqlUserQuotaManager";
   public final MysqlQuotaStore quotaStore;
   public final RunningDagIdsStore runningDagIds;
-  private Meter quotaExceedsRequests;
-  private Meter failedQuotaCheck;
 
 
   @Inject
@@ -71,8 +66,6 @@ public class MysqlUserQuotaManager extends 
AbstractUserQuotaManager {
     }
     this.quotaStore = createQuotaStore(quotaStoreConfig);
     this.runningDagIds = createRunningDagStore(quotaStoreConfig);
-    this.failedQuotaCheck = 
this.metricContext.contextAwareMeter(RuntimeMetrics.GOBBLIN_MYSQL_QUOTA_MANAGER_UNEXPECTED_ERRORS);
-    this.quotaExceedsRequests = 
this.metricContext.contextAwareMeter(RuntimeMetrics.GOBBLIN_MYSQL_QUOTA_MANAGER_QUOTA_REQUESTS_EXCEEDED);
   }
 
   void addDagId(Connection connection, String dagId) throws IOException {
@@ -94,21 +87,18 @@ public class MysqlUserQuotaManager extends 
AbstractUserQuotaManager {
 
   @Override
   public void checkQuota(Collection<Dag.DagNode<JobExecutionPlan>> dagNodes) 
throws IOException {
-    try (Connection connection = this.quotaStore.dataSource.getConnection();
-        Timer.Context context = 
metricContext.timer(RuntimeMetrics.GOBBLIN_MYSQL_QUOTA_MANAGER_TIME_TO_CHECK_QUOTA).time())
 {
+    try (Connection connection = this.quotaStore.dataSource.getConnection()) {
       connection.setAutoCommit(false);
 
       for (Dag.DagNode<JobExecutionPlan> dagNode : dagNodes) {
         QuotaCheck quotaCheck = increaseAndCheckQuota(connection, dagNode);
         if ((!quotaCheck.proxyUserCheck || !quotaCheck.requesterCheck || 
!quotaCheck.flowGroupCheck)) {
           connection.rollback();
-          quotaExceedsRequests.mark();
           throw new QuotaExceededException(quotaCheck.requesterMessage);
         }
       }
       connection.commit();
     } catch (SQLException e) {
-      this.failedQuotaCheck.mark();
       throw new IOException(e);
     }
   }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index 1e9bdaffe..52be719f1 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -33,7 +33,6 @@ import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
 import org.apache.gobblin.metrics.ContextAwareMeter;
-import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.runtime.api.DagActionStore;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
@@ -51,10 +50,9 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
   public static final String DAG_ACTION_CHANGE_MONITOR_PREFIX = 
"dagActionChangeStore";
 
   // Metrics
-  private ContextAwareMeter killsInvoked;
-  private ContextAwareMeter resumesInvoked;
-  private ContextAwareMeter unexpectedErrors;
-  private ContextAwareMeter messageProcessedMeter;
+  ContextAwareMeter killsInvoked;
+  ContextAwareMeter resumesInvoked;
+  ContextAwareMeter unexpectedErrors;
 
   protected CacheLoader<String, String> cacheLoader = new CacheLoader<String, 
String>() {
     @Override
@@ -94,8 +92,7 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
   partitioned and processed by only one thread (and corresponding queue).
    */
   protected void processMessage(DecodeableKafkaRecord message) {
-    // This will also include the heathCheck message so that we can rely on 
this to monitor the health of this Monitor
-    messageProcessedMeter.mark();
+    // TODO: Add metric that service is healthy and we're continuously 
processing messages.
     String key = (String) message.getKey();
     DagActionStoreChangeEvent value = (DagActionStoreChangeEvent) 
message.getValue();
 
@@ -174,7 +171,6 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
     this.killsInvoked = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED);
     this.resumesInvoked = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED);
     this.unexpectedErrors = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS);
-    this.messageProcessedMeter = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED);
   }
 
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
index 7b3d0c75d..f58cb551c 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.text.StringEscapeUtils;
 
+import com.codahale.metrics.Meter;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
@@ -34,7 +35,6 @@ import com.typesafe.config.ConfigValueFactory;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
-import org.apache.gobblin.metrics.ContextAwareMeter;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
@@ -54,11 +54,10 @@ public class SpecStoreChangeMonitor extends 
HighLevelConsumer {
   public static final String SPEC_STORE_CHANGE_MONITOR_PREFIX = 
"specStoreChangeMonitor";
 
   // Metrics
-  private ContextAwareMeter successfullyAddedSpecs;
-  private ContextAwareMeter messageProcessedMeter;
-  private ContextAwareMeter failedAddedSpecs;
-  private ContextAwareMeter deletedSpecs;
-  private ContextAwareMeter unexpectedErrors;
+  private Meter successfullyAddedSpecs;
+  private Meter failedAddedSpecs;
+  private Meter deletedSpecs;
+  private Meter unexpectedErrors;
 
   protected CacheLoader<String, String> cacheLoader = new CacheLoader<String, 
String>() {
     @Override
@@ -98,8 +97,7 @@ public class SpecStoreChangeMonitor extends HighLevelConsumer 
{
   associated with it), a given message itself will be partitioned and assigned 
to only one queue.
    */
   protected void processMessage(DecodeableKafkaRecord message) {
-    // This will also include the heathCheck message so that we can rely on 
this to monitor the health of this Monitor
-    messageProcessedMeter.mark();
+    // TODO: Add metric that service is healthy and we're continuously 
processing messages.
     String key = (String) message.getKey();
     GenericStoreChangeEvent value = (GenericStoreChangeEvent) 
message.getValue();
 
@@ -170,6 +168,5 @@ public class SpecStoreChangeMonitor extends 
HighLevelConsumer {
     this.failedAddedSpecs = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MONITOR_FAILED_ADDED_SPECS);
     this.deletedSpecs = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MONITOR_DELETED_SPECS);
     this.unexpectedErrors = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MONITOR_UNEXPECTED_ERRORS);
-    this.messageProcessedMeter = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED);
   }
 }
\ No newline at end of file

Reply via email to