This is an automated email from the ASF dual-hosted git repository. zihanli58 pushed a commit to branch revert-3593-GOBBLIN-1736 in repository https://gitbox.apache.org/repos/asf/gobblin.git
commit 4f9932b2cf5321f2c1fb862d3acc93e4da26fd9d Author: Zihan Li <[email protected]> AuthorDate: Mon Nov 14 10:29:04 2022 -0800 Revert "[GOBBLIN-1736] Add metrics for change stream monitor and mysql quota manager (#3593)" This reverts commit 47b9bb0024c6005925a6b958c0e0563a35ecb8bb. --- .../apache/gobblin/runtime/metrics/RuntimeMetrics.java | 6 ------ .../modules/orchestration/AbstractUserQuotaManager.java | 5 ----- .../modules/orchestration/MysqlUserQuotaManager.java | 12 +----------- .../service/monitoring/DagActionStoreChangeMonitor.java | 12 ++++-------- .../service/monitoring/SpecStoreChangeMonitor.java | 15 ++++++--------- 5 files changed, 11 insertions(+), 39 deletions(-) diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java index c2889ea95..c63419d54 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java @@ -40,16 +40,10 @@ public class RuntimeMetrics { public static final String GOBBLIN_SPEC_STORE_MONITOR_FAILED_ADDED_SPECS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.specStoreMonitor.failed.added.specs"; public static final String GOBBLIN_SPEC_STORE_MONITOR_DELETED_SPECS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.specStoreMonitor.deleted.specs"; public static final String GOBBLIN_SPEC_STORE_MONITOR_UNEXPECTED_ERRORS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.specStoreMonitor.unexpected.errors"; - public static final String GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED= ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.specStoreMonitor.message.processed"; public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.dagActionStore.kills.invoked"; - public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED= ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.dagActionStoreMonitor.message.processed"; public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.dagActionStore.resumes.invoked"; public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.dagActionStore.unexpected.errors"; - public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_UNEXPECTED_ERRORS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.mysql.quota.manager.unexpected.errors"; - public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_QUOTA_REQUESTS_EXCEEDED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.mysql.quota.manager.quotaRequests.exceeded"; - public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_TIME_TO_CHECK_QUOTA = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.mysql.quota.manager.time.to.check.quota"; - // Metadata keys public static final String TOPIC = "topic"; public static final String GROUP_ID = "groupId"; diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java index 5f83f5290..b92d6d610 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java @@ -27,8 +27,6 @@ import lombok.AllArgsConstructor; import lombok.Setter; import lombok.extern.slf4j.Slf4j; -import org.apache.gobblin.instrumented.Instrumented; -import org.apache.gobblin.metrics.MetricContext; import org.apache.gobblin.util.ConfigUtils; @@ -44,13 +42,10 @@ abstract public class AbstractUserQuotaManager implements UserQuotaManager { public static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE; private final Map<String, Integer> perUserQuota; private final Map<String, Integer> perFlowGroupQuota; - protected MetricContext metricContext; private final int defaultQuota; public AbstractUserQuotaManager(Config config) { - this.metricContext = Instrumented.getMetricContext(new org.apache.gobblin.configuration.State(ConfigUtils.configToProperties(config)), - this.getClass()); this.defaultQuota = ConfigUtils.getInt(config, USER_JOB_QUOTA_KEY, DEFAULT_USER_JOB_QUOTA); ImmutableMap.Builder<String, Integer> userMapBuilder = ImmutableMap.builder(); ImmutableMap.Builder<String, Integer> flowGroupMapBuilder = ImmutableMap.builder(); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java index 41a9a03a8..93eb29e24 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java @@ -17,8 +17,6 @@ package org.apache.gobblin.service.modules.orchestration; -import com.codahale.metrics.Meter; -import com.codahale.metrics.Timer; import java.io.IOException; import java.sql.Connection; import java.sql.PreparedStatement; @@ -40,7 +38,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.exception.QuotaExceededException; import org.apache.gobblin.metastore.MysqlStateStore; -import org.apache.gobblin.runtime.metrics.RuntimeMetrics; import org.apache.gobblin.service.ServiceConfigKeys; import org.apache.gobblin.service.modules.flowgraph.Dag; import org.apache.gobblin.service.modules.spec.JobExecutionPlan; @@ -56,8 +53,6 @@ public class MysqlUserQuotaManager extends AbstractUserQuotaManager { public final static String CONFIG_PREFIX= "MysqlUserQuotaManager"; public final MysqlQuotaStore quotaStore; public final RunningDagIdsStore runningDagIds; - private Meter quotaExceedsRequests; - private Meter failedQuotaCheck; @Inject @@ -71,8 +66,6 @@ public class MysqlUserQuotaManager extends AbstractUserQuotaManager { } this.quotaStore = createQuotaStore(quotaStoreConfig); this.runningDagIds = createRunningDagStore(quotaStoreConfig); - this.failedQuotaCheck = this.metricContext.contextAwareMeter(RuntimeMetrics.GOBBLIN_MYSQL_QUOTA_MANAGER_UNEXPECTED_ERRORS); - this.quotaExceedsRequests = this.metricContext.contextAwareMeter(RuntimeMetrics.GOBBLIN_MYSQL_QUOTA_MANAGER_QUOTA_REQUESTS_EXCEEDED); } void addDagId(Connection connection, String dagId) throws IOException { @@ -94,21 +87,18 @@ public class MysqlUserQuotaManager extends AbstractUserQuotaManager { @Override public void checkQuota(Collection<Dag.DagNode<JobExecutionPlan>> dagNodes) throws IOException { - try (Connection connection = this.quotaStore.dataSource.getConnection(); - Timer.Context context = metricContext.timer(RuntimeMetrics.GOBBLIN_MYSQL_QUOTA_MANAGER_TIME_TO_CHECK_QUOTA).time()) { + try (Connection connection = this.quotaStore.dataSource.getConnection()) { connection.setAutoCommit(false); for (Dag.DagNode<JobExecutionPlan> dagNode : dagNodes) { QuotaCheck quotaCheck = increaseAndCheckQuota(connection, dagNode); if ((!quotaCheck.proxyUserCheck || !quotaCheck.requesterCheck || !quotaCheck.flowGroupCheck)) { connection.rollback(); - quotaExceedsRequests.mark(); throw new QuotaExceededException(quotaCheck.requesterMessage); } } connection.commit(); } catch (SQLException e) { - this.failedQuotaCheck.mark(); throw new IOException(e); } } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java index 1e9bdaffe..52be719f1 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java @@ -33,7 +33,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.kafka.client.DecodeableKafkaRecord; import org.apache.gobblin.metrics.ContextAwareMeter; -import org.apache.gobblin.metrics.event.EventSubmitter; import org.apache.gobblin.runtime.api.DagActionStore; import org.apache.gobblin.runtime.api.SpecNotFoundException; import org.apache.gobblin.runtime.kafka.HighLevelConsumer; @@ -51,10 +50,9 @@ public class DagActionStoreChangeMonitor extends HighLevelConsumer { public static final String DAG_ACTION_CHANGE_MONITOR_PREFIX = "dagActionChangeStore"; // Metrics - private ContextAwareMeter killsInvoked; - private ContextAwareMeter resumesInvoked; - private ContextAwareMeter unexpectedErrors; - private ContextAwareMeter messageProcessedMeter; + ContextAwareMeter killsInvoked; + ContextAwareMeter resumesInvoked; + ContextAwareMeter unexpectedErrors; protected CacheLoader<String, String> cacheLoader = new CacheLoader<String, String>() { @Override @@ -94,8 +92,7 @@ public class DagActionStoreChangeMonitor extends HighLevelConsumer { partitioned and processed by only one thread (and corresponding queue). */ protected void processMessage(DecodeableKafkaRecord message) { - // This will also include the heathCheck message so that we can rely on this to monitor the health of this Monitor - messageProcessedMeter.mark(); + // TODO: Add metric that service is healthy and we're continuously processing messages. String key = (String) message.getKey(); DagActionStoreChangeEvent value = (DagActionStoreChangeEvent) message.getValue(); @@ -174,7 +171,6 @@ public class DagActionStoreChangeMonitor extends HighLevelConsumer { this.killsInvoked = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED); this.resumesInvoked = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED); this.unexpectedErrors = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS); - this.messageProcessedMeter = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED); } } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java index 7b3d0c75d..f58cb551c 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java @@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.text.StringEscapeUtils; +import com.codahale.metrics.Meter; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; @@ -34,7 +35,6 @@ import com.typesafe.config.ConfigValueFactory; import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.kafka.client.DecodeableKafkaRecord; -import org.apache.gobblin.metrics.ContextAwareMeter; import org.apache.gobblin.runtime.api.FlowSpec; import org.apache.gobblin.runtime.api.Spec; import org.apache.gobblin.runtime.kafka.HighLevelConsumer; @@ -54,11 +54,10 @@ public class SpecStoreChangeMonitor extends HighLevelConsumer { public static final String SPEC_STORE_CHANGE_MONITOR_PREFIX = "specStoreChangeMonitor"; // Metrics - private ContextAwareMeter successfullyAddedSpecs; - private ContextAwareMeter messageProcessedMeter; - private ContextAwareMeter failedAddedSpecs; - private ContextAwareMeter deletedSpecs; - private ContextAwareMeter unexpectedErrors; + private Meter successfullyAddedSpecs; + private Meter failedAddedSpecs; + private Meter deletedSpecs; + private Meter unexpectedErrors; protected CacheLoader<String, String> cacheLoader = new CacheLoader<String, String>() { @Override @@ -98,8 +97,7 @@ public class SpecStoreChangeMonitor extends HighLevelConsumer { associated with it), a given message itself will be partitioned and assigned to only one queue. */ protected void processMessage(DecodeableKafkaRecord message) { - // This will also include the heathCheck message so that we can rely on this to monitor the health of this Monitor - messageProcessedMeter.mark(); + // TODO: Add metric that service is healthy and we're continuously processing messages. String key = (String) message.getKey(); GenericStoreChangeEvent value = (GenericStoreChangeEvent) message.getValue(); @@ -170,6 +168,5 @@ public class SpecStoreChangeMonitor extends HighLevelConsumer { this.failedAddedSpecs = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MONITOR_FAILED_ADDED_SPECS); this.deletedSpecs = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MONITOR_DELETED_SPECS); this.unexpectedErrors = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MONITOR_UNEXPECTED_ERRORS); - this.messageProcessedMeter = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED); } } \ No newline at end of file
