This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 2b28a5f60 add total dag count metric for dag state store (#3752)
2b28a5f60 is described below
commit 2b28a5f60a810fb61212e056b9d66eadbcaf85e4
Author: meethngala <[email protected]>
AuthorDate: Thu Aug 31 16:08:06 2023 -0700
add total dag count metric for dag state store (#3752)
Co-authored-by: Meeth Gala <[email protected]>
---
.../org/apache/gobblin/metrics/ServiceMetricNames.java | 4 ++++
.../service/modules/orchestration/FSDagStateStore.java | 16 ++++++++++++++++
.../modules/orchestration/MysqlDagStateStore.java | 14 +++++++++++++-
3 files changed, 33 insertions(+), 1 deletion(-)
diff --git
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
index b70a7dd45..1a544a3df 100644
---
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
+++
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
@@ -65,4 +65,8 @@ public class ServiceMetricNames {
public static final String HELIX_LEADER_STATE = "HelixLeaderState";
public static final String FLOWGRAPH_UPDATE_FAILED_METER =
GOBBLIN_SERVICE_PREFIX + ".FlowgraphUpdateFailed";
+
+ public static final String DAG_COUNT_MYSQL_DAG_STATE_COUNT =
GOBBLIN_SERVICE_PREFIX + ".MysqlDagStateStore" + ".totalDagCount";
+
+ public static final String DAG_COUNT_FS_DAG_STATE_COUNT =
GOBBLIN_SERVICE_PREFIX + ".FsDagStateStore" + ".totalDagCount";
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
index 238c19edc..f54268d9f 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
@@ -41,6 +41,10 @@ import com.typesafe.config.Config;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareCounter;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.runtime.spec_serde.GsonSerDe;
import org.apache.gobblin.service.modules.flowgraph.Dag;
@@ -48,6 +52,7 @@ import
org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
import
org.apache.gobblin.service.modules.spec.JobExecutionPlanListDeserializer;
import org.apache.gobblin.service.modules.spec.JobExecutionPlanListSerializer;
+import org.apache.gobblin.util.ConfigUtils;
@Alpha
@@ -59,6 +64,10 @@ public class FSDagStateStore implements DagStateStore {
private final String dagCheckpointDir;
private final GsonSerDe<List<JobExecutionPlan>> serDe;
+ private MetricContext metricContext;
+
+ private ContextAwareCounter totalDagCount;
+
public FSDagStateStore(Config config, Map<URI, TopologySpec>
topologySpecMap) throws IOException {
this.dagCheckpointDir = config.getString(DAG_STATESTORE_DIR);
File checkpointDir = new File(this.dagCheckpointDir);
@@ -77,6 +86,9 @@ public class FSDagStateStore implements DagStateStore {
* */
Type typeToken = new TypeToken<List<JobExecutionPlan>>(){}.getType();
this.serDe = new GsonSerDe<>(typeToken, serializer, deserializer);
+ this.metricContext = Instrumented.getMetricContext(new
org.apache.gobblin.configuration.State(ConfigUtils.configToProperties(config)),
+ this.getClass());
+ this.totalDagCount =
this.metricContext.contextAwareCounter(ServiceMetricNames.DAG_COUNT_FS_DAG_STATE_COUNT);
}
/**
@@ -94,6 +106,7 @@ public class FSDagStateStore implements DagStateStore {
Files.write(serializedDag, tmpCheckpointFile, Charsets.UTF_8);
Files.move(tmpCheckpointFile, checkpointFile);
+ this.totalDagCount.inc();
}
/**
@@ -115,6 +128,9 @@ public class FSDagStateStore implements DagStateStore {
File checkpointFile = new File(this.dagCheckpointDir, fileName);
if (!checkpointFile.delete()) {
log.error("Could not delete checkpoint file: {}",
checkpointFile.getName());
+ } else {
+
+ this.totalDagCount.dec();
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
index 2a11fef26..c578c699b 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
@@ -26,11 +26,15 @@ import java.util.Set;
import java.util.stream.Collectors;
import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metastore.MysqlDagStateStoreFactory;
import org.apache.gobblin.metastore.MysqlStateStore;
import org.apache.gobblin.metastore.MysqlStateStoreEntryManager;
import org.apache.gobblin.metastore.StateStore;
import org.apache.gobblin.metastore.predicates.StateStorePredicate;
+import org.apache.gobblin.metrics.ContextAwareCounter;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.runtime.spec_serde.GsonSerDe;
import org.apache.gobblin.service.ServiceConfigKeys;
@@ -39,6 +43,7 @@ import
org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
import
org.apache.gobblin.service.modules.spec.JobExecutionPlanListDeserializer;
import org.apache.gobblin.service.modules.spec.JobExecutionPlanListSerializer;
+import org.apache.gobblin.util.ConfigUtils;
import com.google.common.base.Joiner;
import com.google.common.base.Predicates;
@@ -77,7 +82,8 @@ public class MysqlDagStateStore implements DagStateStore {
private MysqlStateStore<State> mysqlStateStore;
private final GsonSerDe<List<JobExecutionPlan>> serDe;
private JobExecutionPlanDagFactory jobExecPlanDagFactory;
-
+ private MetricContext metricContext;
+ private ContextAwareCounter totalDagCount;
public MysqlDagStateStore(Config config, Map<URI, TopologySpec>
topologySpecMap) {
if (config.hasPath(CONFIG_PREFIX)) {
config = config.getConfig(CONFIG_PREFIX).withFallback(config);
@@ -91,6 +97,10 @@ public class MysqlDagStateStore implements DagStateStore {
}.getType();
this.serDe = new GsonSerDe<>(typeToken, serializer, deserializer);
this.jobExecPlanDagFactory = new JobExecutionPlanDagFactory();
+ this.metricContext = Instrumented.getMetricContext(new
org.apache.gobblin.configuration.State(ConfigUtils.configToProperties(config)),
+ this.getClass());
+ this.totalDagCount =
this.metricContext.contextAwareCounter(ServiceMetricNames.DAG_COUNT_MYSQL_DAG_STATE_COUNT);
+
}
/**
@@ -108,6 +118,7 @@ public class MysqlDagStateStore implements DagStateStore {
public void writeCheckpoint(Dag<JobExecutionPlan> dag)
throws IOException {
mysqlStateStore.put(getStoreNameFromDagId(generateDagId(dag).toString()),
getTableNameFromDagId(generateDagId(dag).toString()), convertDagIntoState(dag));
+ this.totalDagCount.inc();
}
@Override
@@ -120,6 +131,7 @@ public class MysqlDagStateStore implements DagStateStore {
public void cleanUp(String dagId)
throws IOException {
mysqlStateStore.delete(getStoreNameFromDagId(dagId),
getTableNameFromDagId(dagId));
+ this.totalDagCount.dec();
}
@Override