This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 47b9bb002 [GOBBLIN-1736] Add metrics for change stream monitor and
mysql quota manager (#3593)
47b9bb002 is described below
commit 47b9bb0024c6005925a6b958c0e0563a35ecb8bb
Author: Zihan Li <[email protected]>
AuthorDate: Mon Nov 14 10:01:48 2022 -0800
[GOBBLIN-1736] Add metrics for change stream monitor and mysql quota
manager (#3593)
* address comments
* use connectionmanager when httpclient is not cloesable
* [GOBBLIN-1730] Include flow execution id when try to cancel/submit job
using SimpleKafkaSpecProducer
* remove unnecessary dependency
* [GOBBLIN-1736] Add metrics for change stream monitor and mysql quota
manager
* Revert "remove unnecessary dependency"
This reverts commit d6871dc4caa382ea431bd254959c42230b60471c.
* Revert "[GOBBLIN-1730] Include flow execution id when try to
cancel/submit job using SimpleKafkaSpecProducer"
This reverts commit f13e6ead489cec226c10db36d82642ba818dd60b.
* address comments
* address comments
Co-authored-by: Zihan Li <[email protected]>
---
.../apache/gobblin/runtime/metrics/RuntimeMetrics.java | 6 ++++++
.../modules/orchestration/AbstractUserQuotaManager.java | 5 +++++
.../modules/orchestration/MysqlUserQuotaManager.java | 12 +++++++++++-
.../service/monitoring/DagActionStoreChangeMonitor.java | 12 ++++++++----
.../service/monitoring/SpecStoreChangeMonitor.java | 15 +++++++++------
5 files changed, 39 insertions(+), 11 deletions(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
index c63419d54..c2889ea95 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
@@ -40,10 +40,16 @@ public class RuntimeMetrics {
public static final String GOBBLIN_SPEC_STORE_MONITOR_FAILED_ADDED_SPECS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
"gobblin.specStoreMonitor.failed.added.specs";
public static final String GOBBLIN_SPEC_STORE_MONITOR_DELETED_SPECS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
"gobblin.specStoreMonitor.deleted.specs";
public static final String GOBBLIN_SPEC_STORE_MONITOR_UNEXPECTED_ERRORS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
"gobblin.specStoreMonitor.unexpected.errors";
+ public static final String GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED=
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
"gobblin.specStoreMonitor.message.processed";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
"gobblin.dagActionStore.kills.invoked";
+ public static final String
GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED=
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
"gobblin.dagActionStoreMonitor.message.processed";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED
= ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
"gobblin.dagActionStore.resumes.invoked";
public static final String
GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
"gobblin.dagActionStore.unexpected.errors";
+ public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_UNEXPECTED_ERRORS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
"gobblin.mysql.quota.manager.unexpected.errors";
+ public static final String
GOBBLIN_MYSQL_QUOTA_MANAGER_QUOTA_REQUESTS_EXCEEDED =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
"gobblin.mysql.quota.manager.quotaRequests.exceeded";
+ public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_TIME_TO_CHECK_QUOTA =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
"gobblin.mysql.quota.manager.time.to.check.quota";
+
// Metadata keys
public static final String TOPIC = "topic";
public static final String GROUP_ID = "groupId";
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java
index b92d6d610..5f83f5290 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java
@@ -27,6 +27,8 @@ import lombok.AllArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.util.ConfigUtils;
@@ -42,10 +44,13 @@ abstract public class AbstractUserQuotaManager implements
UserQuotaManager {
public static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE;
private final Map<String, Integer> perUserQuota;
private final Map<String, Integer> perFlowGroupQuota;
+ protected MetricContext metricContext;
private final int defaultQuota;
public AbstractUserQuotaManager(Config config) {
+ this.metricContext = Instrumented.getMetricContext(new
org.apache.gobblin.configuration.State(ConfigUtils.configToProperties(config)),
+ this.getClass());
this.defaultQuota = ConfigUtils.getInt(config, USER_JOB_QUOTA_KEY,
DEFAULT_USER_JOB_QUOTA);
ImmutableMap.Builder<String, Integer> userMapBuilder =
ImmutableMap.builder();
ImmutableMap.Builder<String, Integer> flowGroupMapBuilder =
ImmutableMap.builder();
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
index 93eb29e24..41a9a03a8 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
@@ -17,6 +17,8 @@
package org.apache.gobblin.service.modules.orchestration;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
@@ -38,6 +40,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.exception.QuotaExceededException;
import org.apache.gobblin.metastore.MysqlStateStore;
+import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
@@ -53,6 +56,8 @@ public class MysqlUserQuotaManager extends
AbstractUserQuotaManager {
public final static String CONFIG_PREFIX= "MysqlUserQuotaManager";
public final MysqlQuotaStore quotaStore;
public final RunningDagIdsStore runningDagIds;
+ private Meter quotaExceedsRequests;
+ private Meter failedQuotaCheck;
@Inject
@@ -66,6 +71,8 @@ public class MysqlUserQuotaManager extends
AbstractUserQuotaManager {
}
this.quotaStore = createQuotaStore(quotaStoreConfig);
this.runningDagIds = createRunningDagStore(quotaStoreConfig);
+ this.failedQuotaCheck =
this.metricContext.contextAwareMeter(RuntimeMetrics.GOBBLIN_MYSQL_QUOTA_MANAGER_UNEXPECTED_ERRORS);
+ this.quotaExceedsRequests =
this.metricContext.contextAwareMeter(RuntimeMetrics.GOBBLIN_MYSQL_QUOTA_MANAGER_QUOTA_REQUESTS_EXCEEDED);
}
void addDagId(Connection connection, String dagId) throws IOException {
@@ -87,18 +94,21 @@ public class MysqlUserQuotaManager extends
AbstractUserQuotaManager {
@Override
public void checkQuota(Collection<Dag.DagNode<JobExecutionPlan>> dagNodes)
throws IOException {
- try (Connection connection = this.quotaStore.dataSource.getConnection()) {
+ try (Connection connection = this.quotaStore.dataSource.getConnection();
+ Timer.Context context =
metricContext.timer(RuntimeMetrics.GOBBLIN_MYSQL_QUOTA_MANAGER_TIME_TO_CHECK_QUOTA).time())
{
connection.setAutoCommit(false);
for (Dag.DagNode<JobExecutionPlan> dagNode : dagNodes) {
QuotaCheck quotaCheck = increaseAndCheckQuota(connection, dagNode);
if ((!quotaCheck.proxyUserCheck || !quotaCheck.requesterCheck ||
!quotaCheck.flowGroupCheck)) {
connection.rollback();
+ quotaExceedsRequests.mark();
throw new QuotaExceededException(quotaCheck.requesterMessage);
}
}
connection.commit();
} catch (SQLException e) {
+ this.failedQuotaCheck.mark();
throw new IOException(e);
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index 52be719f1..1e9bdaffe 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -33,6 +33,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.runtime.api.DagActionStore;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
@@ -50,9 +51,10 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
public static final String DAG_ACTION_CHANGE_MONITOR_PREFIX =
"dagActionChangeStore";
// Metrics
- ContextAwareMeter killsInvoked;
- ContextAwareMeter resumesInvoked;
- ContextAwareMeter unexpectedErrors;
+ private ContextAwareMeter killsInvoked;
+ private ContextAwareMeter resumesInvoked;
+ private ContextAwareMeter unexpectedErrors;
+ private ContextAwareMeter messageProcessedMeter;
protected CacheLoader<String, String> cacheLoader = new CacheLoader<String,
String>() {
@Override
@@ -92,7 +94,8 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
partitioned and processed by only one thread (and corresponding queue).
*/
protected void processMessage(DecodeableKafkaRecord message) {
- // TODO: Add metric that service is healthy and we're continuously
processing messages.
+ // This will also include the heathCheck message so that we can rely on
this to monitor the health of this Monitor
+ messageProcessedMeter.mark();
String key = (String) message.getKey();
DagActionStoreChangeEvent value = (DagActionStoreChangeEvent)
message.getValue();
@@ -171,6 +174,7 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
this.killsInvoked =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED);
this.resumesInvoked =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED);
this.unexpectedErrors =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS);
+ this.messageProcessedMeter =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED);
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
index f58cb551c..7b3d0c75d 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
@@ -24,7 +24,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.text.StringEscapeUtils;
-import com.codahale.metrics.Meter;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
@@ -35,6 +34,7 @@ import com.typesafe.config.ConfigValueFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
+import org.apache.gobblin.metrics.ContextAwareMeter;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
@@ -54,10 +54,11 @@ public class SpecStoreChangeMonitor extends
HighLevelConsumer {
public static final String SPEC_STORE_CHANGE_MONITOR_PREFIX =
"specStoreChangeMonitor";
// Metrics
- private Meter successfullyAddedSpecs;
- private Meter failedAddedSpecs;
- private Meter deletedSpecs;
- private Meter unexpectedErrors;
+ private ContextAwareMeter successfullyAddedSpecs;
+ private ContextAwareMeter messageProcessedMeter;
+ private ContextAwareMeter failedAddedSpecs;
+ private ContextAwareMeter deletedSpecs;
+ private ContextAwareMeter unexpectedErrors;
protected CacheLoader<String, String> cacheLoader = new CacheLoader<String,
String>() {
@Override
@@ -97,7 +98,8 @@ public class SpecStoreChangeMonitor extends HighLevelConsumer
{
associated with it), a given message itself will be partitioned and assigned
to only one queue.
*/
protected void processMessage(DecodeableKafkaRecord message) {
- // TODO: Add metric that service is healthy and we're continuously
processing messages.
+ // This will also include the heathCheck message so that we can rely on
this to monitor the health of this Monitor
+ messageProcessedMeter.mark();
String key = (String) message.getKey();
GenericStoreChangeEvent value = (GenericStoreChangeEvent)
message.getValue();
@@ -168,5 +170,6 @@ public class SpecStoreChangeMonitor extends
HighLevelConsumer {
this.failedAddedSpecs =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MONITOR_FAILED_ADDED_SPECS);
this.deletedSpecs =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MONITOR_DELETED_SPECS);
this.unexpectedErrors =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MONITOR_UNEXPECTED_ERRORS);
+ this.messageProcessedMeter =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED);
}
}
\ No newline at end of file