This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b28a5f60 add total dag count metric for dag state store (#3752)
2b28a5f60 is described below

commit 2b28a5f60a810fb61212e056b9d66eadbcaf85e4
Author: meethngala <[email protected]>
AuthorDate: Thu Aug 31 16:08:06 2023 -0700

    add total dag count metric for dag state store (#3752)
    
    Co-authored-by: Meeth Gala <[email protected]>
---
 .../org/apache/gobblin/metrics/ServiceMetricNames.java   |  4 ++++
 .../service/modules/orchestration/FSDagStateStore.java   | 16 ++++++++++++++++
 .../modules/orchestration/MysqlDagStateStore.java        | 14 +++++++++++++-
 3 files changed, 33 insertions(+), 1 deletion(-)

diff --git 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
index b70a7dd45..1a544a3df 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
@@ -65,4 +65,8 @@ public class ServiceMetricNames {
   public static final String HELIX_LEADER_STATE = "HelixLeaderState";
 
   public static final String FLOWGRAPH_UPDATE_FAILED_METER = 
GOBBLIN_SERVICE_PREFIX + ".FlowgraphUpdateFailed";
+
+  public static final String DAG_COUNT_MYSQL_DAG_STATE_COUNT = 
GOBBLIN_SERVICE_PREFIX + ".MysqlDagStateStore" + ".totalDagCount";
+
+  public static final String DAG_COUNT_FS_DAG_STATE_COUNT = 
GOBBLIN_SERVICE_PREFIX + ".FsDagStateStore" + ".totalDagCount";
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
index 238c19edc..f54268d9f 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
@@ -41,6 +41,10 @@ import com.typesafe.config.Config;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareCounter;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.runtime.spec_serde.GsonSerDe;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
@@ -48,6 +52,7 @@ import 
org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
 import 
org.apache.gobblin.service.modules.spec.JobExecutionPlanListDeserializer;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlanListSerializer;
+import org.apache.gobblin.util.ConfigUtils;
 
 
 @Alpha
@@ -59,6 +64,10 @@ public class FSDagStateStore implements DagStateStore {
   private final String dagCheckpointDir;
   private final GsonSerDe<List<JobExecutionPlan>> serDe;
 
+  private MetricContext metricContext;
+
+  private ContextAwareCounter totalDagCount;
+
   public FSDagStateStore(Config config, Map<URI, TopologySpec> 
topologySpecMap) throws IOException {
     this.dagCheckpointDir = config.getString(DAG_STATESTORE_DIR);
     File checkpointDir = new File(this.dagCheckpointDir);
@@ -77,6 +86,9 @@ public class FSDagStateStore implements DagStateStore {
      * */
     Type typeToken = new TypeToken<List<JobExecutionPlan>>(){}.getType();
     this.serDe = new GsonSerDe<>(typeToken, serializer, deserializer);
+    this.metricContext = Instrumented.getMetricContext(new 
org.apache.gobblin.configuration.State(ConfigUtils.configToProperties(config)),
+        this.getClass());
+    this.totalDagCount = 
this.metricContext.contextAwareCounter(ServiceMetricNames.DAG_COUNT_FS_DAG_STATE_COUNT);
   }
 
   /**
@@ -94,6 +106,7 @@ public class FSDagStateStore implements DagStateStore {
 
     Files.write(serializedDag, tmpCheckpointFile, Charsets.UTF_8);
     Files.move(tmpCheckpointFile, checkpointFile);
+    this.totalDagCount.inc();
   }
 
   /**
@@ -115,6 +128,9 @@ public class FSDagStateStore implements DagStateStore {
     File checkpointFile = new File(this.dagCheckpointDir, fileName);
     if (!checkpointFile.delete()) {
       log.error("Could not delete checkpoint file: {}", 
checkpointFile.getName());
+    } else {
+
+      this.totalDagCount.dec();
     }
   }
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
index 2a11fef26..c578c699b 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
@@ -26,11 +26,15 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metastore.MysqlDagStateStoreFactory;
 import org.apache.gobblin.metastore.MysqlStateStore;
 import org.apache.gobblin.metastore.MysqlStateStoreEntryManager;
 import org.apache.gobblin.metastore.StateStore;
 import org.apache.gobblin.metastore.predicates.StateStorePredicate;
+import org.apache.gobblin.metrics.ContextAwareCounter;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.runtime.spec_serde.GsonSerDe;
 import org.apache.gobblin.service.ServiceConfigKeys;
@@ -39,6 +43,7 @@ import 
org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
 import 
org.apache.gobblin.service.modules.spec.JobExecutionPlanListDeserializer;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlanListSerializer;
+import org.apache.gobblin.util.ConfigUtils;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Predicates;
@@ -77,7 +82,8 @@ public class MysqlDagStateStore implements DagStateStore {
   private MysqlStateStore<State> mysqlStateStore;
   private final GsonSerDe<List<JobExecutionPlan>> serDe;
   private JobExecutionPlanDagFactory jobExecPlanDagFactory;
-
+  private MetricContext metricContext;
+  private ContextAwareCounter totalDagCount;
   public MysqlDagStateStore(Config config, Map<URI, TopologySpec> 
topologySpecMap) {
     if (config.hasPath(CONFIG_PREFIX)) {
       config = config.getConfig(CONFIG_PREFIX).withFallback(config);
@@ -91,6 +97,10 @@ public class MysqlDagStateStore implements DagStateStore {
     }.getType();
     this.serDe = new GsonSerDe<>(typeToken, serializer, deserializer);
     this.jobExecPlanDagFactory = new JobExecutionPlanDagFactory();
+    this.metricContext = Instrumented.getMetricContext(new 
org.apache.gobblin.configuration.State(ConfigUtils.configToProperties(config)),
+        this.getClass());
+    this.totalDagCount = 
this.metricContext.contextAwareCounter(ServiceMetricNames.DAG_COUNT_MYSQL_DAG_STATE_COUNT);
+
   }
 
   /**
@@ -108,6 +118,7 @@ public class MysqlDagStateStore implements DagStateStore {
   public void writeCheckpoint(Dag<JobExecutionPlan> dag)
       throws IOException {
     mysqlStateStore.put(getStoreNameFromDagId(generateDagId(dag).toString()), 
getTableNameFromDagId(generateDagId(dag).toString()), convertDagIntoState(dag));
+    this.totalDagCount.inc();
   }
 
   @Override
@@ -120,6 +131,7 @@ public class MysqlDagStateStore implements DagStateStore {
   public void cleanUp(String dagId)
       throws IOException {
     mysqlStateStore.delete(getStoreNameFromDagId(dagId), 
getTableNameFromDagId(dagId));
+    this.totalDagCount.dec();
   }
 
   @Override

Reply via email to