This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new e68b62e739 [GOBBLIN-2117] Initialize metrics map for 
DagProcEngineMetrics (#4007)
e68b62e739 is described below

commit e68b62e739976a3d370d0501fdf27c67fd784846
Author: umustafi <[email protected]>
AuthorDate: Mon Jul 22 13:32:29 2024 -0700

    [GOBBLIN-2117] Initialize metrics map for DagProcEngineMetrics (#4007)
    
    * Initialize metrics map for DagProcEngineMetrics
---
 .../apache/gobblin/metrics/ServiceMetricNames.java |  1 -
 .../modules/orchestration/MysqlDagActionStore.java |  2 +
 .../task/DagProcessingEngineMetrics.java           | 57 +++++++++++++---------
 .../DagProcessingEngineMetricsTest.java            | 54 ++++++++++++++++++++
 4 files changed, 91 insertions(+), 23 deletions(-)

diff --git 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
index e2acdc595b..cc80975766 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
@@ -101,6 +101,5 @@ public class ServiceMetricNames {
   public static final String DAG_ACTIONS_CONCLUDE_SUCCEEDED = 
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsConcludeSucceeded.";
   public static final String DAG_ACTIONS_DELETE_SUCCEEDED = 
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsDeleteSucceeded.";
   public static final String DAG_ACTIONS_DELETE_FAILED = 
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsDeleteFailed.";
-  // TODO: implement this one
   public static final String DAG_ACTIONS_AVERAGE_PROCESSING_DELAY_MILLIS = 
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsAvgProcessingDelayMillis.";
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
index f5b8aa277a..0e4d10940c 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
@@ -133,8 +133,10 @@ public class MysqlDagActionStore implements DagActionStore 
{
     try {
       fillPreparedStatement(dagAction.getFlowGroup(), dagAction.getFlowName(), 
dagAction.getFlowExecutionId(),
           dagAction.getJobName(), dagAction.getDagActionType(), 
deleteStatement);
+      
this.dagProcessingEngineMetrics.markDagActionsDeleted(dagAction.getDagActionType(),
 true);
       return deleteStatement.executeUpdate() != 0;
     } catch (SQLException e) {
+      
this.dagProcessingEngineMetrics.markDagActionsDeleted(dagAction.getDagActionType(),
 false);
       throw new IOException(String.format("Failure deleting action for 
DagAction: %s in table %s", dagAction,
           tableName), e);
     }}, true);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java
index 1011eae473..d8900ac8be 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java
@@ -17,9 +17,8 @@
 
 package org.apache.gobblin.service.modules.orchestration.task;
 
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 
 import javax.inject.Inject;
 import lombok.extern.slf4j.Slf4j;
@@ -47,22 +46,22 @@ public class DagProcessingEngineMetrics {
    handle concurrent mark requests correctly. ConcurrentMap is not needed 
since no updates are made to the mappings,
    only get calls.
   */
-  private HashMap<DagActionStore.DagActionType, ContextAwareMeter> 
dagActionsStoredMeterByDagActionType = new HashMap();
-  private HashMap<DagActionStore.DagActionType, ContextAwareMeter> 
dagActionsObservedMeterByDagActionType =  new HashMap();
-  private HashMap<DagActionStore.DagActionType, ContextAwareMeter> 
dagActionsLeasesObtainedMeterByDagActionType =  new HashMap();
-  private HashMap<DagActionStore.DagActionType, ContextAwareMeter> 
dagActionsNoLongerLeasingMeterByDagActionType =  new HashMap();
-  private HashMap<DagActionStore.DagActionType, ContextAwareMeter> 
dagActionsLeaseReminderScheduledMeterByDagActionType =  new HashMap();
-  private HashMap<DagActionStore.DagActionType, ContextAwareMeter> 
dagActionsReminderProcessedMeterByDagActionType =  new HashMap();
-  private HashMap<DagActionStore.DagActionType, ContextAwareMeter> 
dagActionsExceededMaxRetryMeterByDagActionType =  new HashMap();
-  private HashMap<DagActionStore.DagActionType, ContextAwareMeter> 
dagActionsInitializeFailedMeterByDagActionType =  new HashMap();
-  private HashMap<DagActionStore.DagActionType, ContextAwareMeter> 
dagActionsInitializeSucceededMeterByDagActionType =  new HashMap();
-  private HashMap<DagActionStore.DagActionType, ContextAwareMeter> 
dagActionsActFailedMeterByDagActionType =  new HashMap();
-  private HashMap<DagActionStore.DagActionType, ContextAwareMeter> 
dagActionsActSucceededMeterByDagActionType =  new HashMap();
-  private HashMap<DagActionStore.DagActionType, ContextAwareMeter> 
dagActionsConcludeFailedMeterByDagActionType =  new HashMap();
-  private HashMap<DagActionStore.DagActionType, ContextAwareMeter> 
dagActionsConcludeSucceededMeterByDagActionType =  new HashMap();
-  private HashMap<DagActionStore.DagActionType, ContextAwareMeter> 
dagActionsRemovedFromStoreMeterByDagActionType =  new HashMap();
-  private HashMap<DagActionStore.DagActionType, ContextAwareMeter> 
dagActionsFailingRemovalMeterByDagActionType =  new HashMap();
-  private HashMap<DagActionStore.DagActionType, ContextAwareMeter> 
dagActionAverageProcessingDelayMillisMeterByDagActionType =  new HashMap();
+  private final HashMap<DagActionStore.DagActionType, ContextAwareMeter> 
dagActionsStoredMeterByDagActionType = new HashMap<>();
+  private final HashMap<DagActionStore.DagActionType, ContextAwareMeter> 
dagActionsObservedMeterByDagActionType =  new HashMap<>();
+  private final HashMap<DagActionStore.DagActionType, ContextAwareMeter> 
dagActionsLeasesObtainedMeterByDagActionType =  new HashMap<>();
+  private final HashMap<DagActionStore.DagActionType, ContextAwareMeter> 
dagActionsNoLongerLeasingMeterByDagActionType =  new HashMap<>();
+  private final HashMap<DagActionStore.DagActionType, ContextAwareMeter> 
dagActionsLeaseReminderScheduledMeterByDagActionType =  new HashMap<>();
+  private final HashMap<DagActionStore.DagActionType, ContextAwareMeter> 
dagActionsReminderProcessedMeterByDagActionType =  new HashMap<>();
+  private final HashMap<DagActionStore.DagActionType, ContextAwareMeter> 
dagActionsExceededMaxRetryMeterByDagActionType =  new HashMap<>();
+  private final HashMap<DagActionStore.DagActionType, ContextAwareMeter> 
dagActionsInitializeFailedMeterByDagActionType =  new HashMap<>();
+  private final HashMap<DagActionStore.DagActionType, ContextAwareMeter> 
dagActionsInitializeSucceededMeterByDagActionType =  new HashMap<>();
+  private final HashMap<DagActionStore.DagActionType, ContextAwareMeter> 
dagActionsActFailedMeterByDagActionType =  new HashMap<>();
+  private final HashMap<DagActionStore.DagActionType, ContextAwareMeter> 
dagActionsActSucceededMeterByDagActionType =  new HashMap<>();
+  private final HashMap<DagActionStore.DagActionType, ContextAwareMeter> 
dagActionsConcludeFailedMeterByDagActionType =  new HashMap<>();
+  private final HashMap<DagActionStore.DagActionType, ContextAwareMeter> 
dagActionsConcludeSucceededMeterByDagActionType =  new HashMap<>();
+  private final HashMap<DagActionStore.DagActionType, ContextAwareMeter> 
dagActionsDeleteFailedMeterByDagActionType =  new HashMap<>();
+  private final HashMap<DagActionStore.DagActionType, ContextAwareMeter> 
dagActionsDeleteSucceededMeterByDagActionType =  new HashMap<>();
+  private final HashMap<DagActionStore.DagActionType, ContextAwareMeter> 
dagActionsAverageProcessingDelayMillisMeterByDagActionType =  new HashMap<>();
 
   public DagProcessingEngineMetrics(MetricContext metricContext) {
     this.metricContext = metricContext;
@@ -71,10 +70,9 @@ public class DagProcessingEngineMetrics {
 
   @Inject
   public DagProcessingEngineMetrics() {
-    // Create a new metric context for the DagProcessingEngineMetrics tagged 
appropriately
-    List<Tag<?>> tags = new ArrayList<>();
-    tags.add(new Tag<>(MetricTagNames.METRIC_BACKEND_REPRESENTATION, 
GobblinMetrics.MetricType.COUNTER));
-    this.metricContext = Instrumented.getMetricContext(new State(), 
this.getClass(), tags);
+    this(Instrumented.getMetricContext(new State(),
+        DagProcessingEngineMetrics.class,
+        Collections.singleton(new 
Tag<>(MetricTagNames.METRIC_BACKEND_REPRESENTATION, 
GobblinMetrics.MetricType.COUNTER))));
   }
 
   public void registerAllMetrics() {
@@ -91,6 +89,9 @@ public class DagProcessingEngineMetrics {
     
registerMetricForEachDagActionType(this.dagActionsActSucceededMeterByDagActionType,
 ServiceMetricNames.DAG_ACTIONS_ACT_SUCCEEDED);
     
registerMetricForEachDagActionType(this.dagActionsConcludeFailedMeterByDagActionType,
 ServiceMetricNames.DAG_ACTIONS_CONCLUDE_FAILED);
     
registerMetricForEachDagActionType(this.dagActionsConcludeSucceededMeterByDagActionType,
 ServiceMetricNames.DAG_ACTIONS_CONCLUDE_SUCCEEDED);
+    
registerMetricForEachDagActionType(this.dagActionsDeleteFailedMeterByDagActionType,
 ServiceMetricNames.DAG_ACTIONS_DELETE_FAILED);
+    
registerMetricForEachDagActionType(this.dagActionsDeleteSucceededMeterByDagActionType,
 ServiceMetricNames.DAG_ACTIONS_DELETE_SUCCEEDED);
+    
registerMetricForEachDagActionType(this.dagActionsAverageProcessingDelayMillisMeterByDagActionType,
 ServiceMetricNames.DAG_ACTIONS_AVERAGE_PROCESSING_DELAY_MILLIS);
   }
 
   /**
@@ -161,7 +162,19 @@ public class DagProcessingEngineMetrics {
       
updateMetricForDagActionType(this.dagActionsConcludeFailedMeterByDagActionType, 
dagActionType);
     }
   }
+  
+  public void markDagActionsDeleted(DagActionStore.DagActionType 
dagActionType, boolean succeeded) {
+    if (succeeded) {
+      
updateMetricForDagActionType(this.dagActionsDeleteSucceededMeterByDagActionType,
 dagActionType);
+    } else {
+      
updateMetricForDagActionType(this.dagActionsDeleteFailedMeterByDagActionType, 
dagActionType);
+    }
+  }
 
+  // TODO: measure processing time
+  public void mark(DagActionStore.DagActionType dagActionType) {
+    
updateMetricForDagActionType(this.dagActionsAverageProcessingDelayMillisMeterByDagActionType,
 dagActionType);
+  }
 
   /**
    * Generic helper used to increment a metric corresponding to the 
dagActionType in the provided map. It assumes the
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineMetricsTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineMetricsTest.java
new file mode 100644
index 0000000000..a3453a9afc
--- /dev/null
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineMetricsTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class DagProcessingEngineMetricsTest {
+  DagProcessingEngineMetrics dagProcessingEngineMetrics;
+
+  @BeforeClass
+  public void setup() {
+    dagProcessingEngineMetrics = new DagProcessingEngineMetrics();
+  }
+
+  /*
+  Checks that the dagActionsStored metric can be incremented for every 
DagActionType. It ensures that the default
+  constructor of the DagProcessingEngine initializes a meter for each 
DagActionType.
+   */
+  @Test
+  public void testMarkDagActionsStored() {
+    for (DagActionStore.DagActionType dagActionType : 
DagActionStore.DagActionType.values()) {
+      dagProcessingEngineMetrics.markDagActionsStored(dagActionType);
+    }
+  }
+
+  /*
+  Checks that the dagActionsObserved metric can be incremented for every 
DagActionType. It ensures that the default
+  constructor of the DagProcessingEngine initializes a meter for each 
DagActionType.
+   */
+  @Test
+  public void testMarkDagActionsObserved() {
+    for (DagActionStore.DagActionType dagActionType : 
DagActionStore.DagActionType.values()) {
+      dagProcessingEngineMetrics.markDagActionsStored(dagActionType);
+    }
+  }
+}

Reply via email to