This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new f74c232  [GOBBLIN-1466] Make meters shared between DagManagerThreads
f74c232 is described below

commit f74c232ad05b3b3ca0fcecd4789657bd81af9e42
Author: Jack Moseley <[email protected]>
AuthorDate: Thu Jun 10 17:46:57 2021 -0700

    [GOBBLIN-1466] Make meters shared between DagManagerThreads
    
    Closes #3306 from jack-moseley/shared-meters
---
 .../service/modules/orchestration/DagManager.java  | 39 ++++++++++++++--------
 .../modules/orchestration/DagManagerTest.java      |  8 ++++-
 2 files changed, 33 insertions(+), 14 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 821d87e..5c6dd1b 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -364,11 +364,22 @@ public class DagManager extends AbstractIdleService {
         this.failedDagStateStore = 
createDagStateStore(ConfigUtils.getConfigOrEmpty(config, 
FAILED_DAG_STATESTORE_PREFIX).withFallback(config), topologySpecMap);
         Set<String> failedDagIds = 
Collections.synchronizedSet(this.failedDagStateStore.getDagIds());
 
+        ContextAwareMeter allSuccessfulMeter = null;
+        ContextAwareMeter allFailedMeter = null;
+        if (instrumentationEnabled) {
+          MetricContext metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
+          allSuccessfulMeter = 
metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+              ServiceMetricNames.SUCCESSFUL_FLOW_METER));
+          allFailedMeter = 
metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+              ServiceMetricNames.FAILED_FLOW_METER));
+        }
+
         //On startup, the service creates DagManagerThreads that are scheduled 
at a fixed rate.
         this.dagManagerThreads = new DagManagerThread[numThreads];
         for (int i = 0; i < numThreads; i++) {
           DagManagerThread dagManagerThread = new 
DagManagerThread(jobStatusRetriever, dagStateStore, failedDagStateStore,
-              queue[i], cancelQueue[i], resumeQueue[i], 
instrumentationEnabled, defaultQuota, perUserQuota, failedDagIds);
+              queue[i], cancelQueue[i], resumeQueue[i], 
instrumentationEnabled, defaultQuota, perUserQuota, failedDagIds,
+              allSuccessfulMeter, allFailedMeter);
           this.dagManagerThreads[i] = dagManagerThread;
           this.scheduledExecutorPool.scheduleAtFixedRate(dagManagerThread, 0, 
this.pollingInterval, TimeUnit.SECONDS);
         }
@@ -420,9 +431,11 @@ public class DagManager extends AbstractIdleService {
     private final int defaultQuota;
     private final Map<String, Integer> perUserQuota;
     private final AtomicLong orchestrationDelay = new AtomicLong(0);
-    private static Map<String, FlowState> flowGauges = Maps.newHashMap();
-    private ContextAwareMeter allSuccessfulMeter;
-    private ContextAwareMeter allFailedMeter;
+    private static final Map<String, FlowState> flowGauges = 
Maps.newConcurrentMap();
+    private final ContextAwareMeter allSuccessfulMeter;
+    private final ContextAwareMeter allFailedMeter;
+    private static final Map<String, ContextAwareMeter> groupSuccessfulMeters 
= Maps.newConcurrentMap();
+    private static final Map<String, ContextAwareMeter> groupFailureMeters = 
Maps.newConcurrentMap();
 
     private JobStatusRetriever jobStatusRetriever;
     private DagStateStore dagStateStore;
@@ -436,7 +449,8 @@ public class DagManager extends AbstractIdleService {
      */
     DagManagerThread(JobStatusRetriever jobStatusRetriever, DagStateStore 
dagStateStore, DagStateStore failedDagStateStore,
         BlockingQueue<Dag<JobExecutionPlan>> queue, BlockingQueue<String> 
cancelQueue, BlockingQueue<String> resumeQueue,
-        boolean instrumentationEnabled, int defaultQuota, Map<String, Integer> 
perUserQuota, Set<String> failedDagIds) {
+        boolean instrumentationEnabled, int defaultQuota, Map<String, Integer> 
perUserQuota, Set<String> failedDagIds,
+        ContextAwareMeter allSuccessfulMeter, ContextAwareMeter 
allFailedMeter) {
       this.jobStatusRetriever = jobStatusRetriever;
       this.dagStateStore = dagStateStore;
       this.failedDagStateStore = failedDagStateStore;
@@ -446,16 +460,14 @@ public class DagManager extends AbstractIdleService {
       this.resumeQueue = resumeQueue;
       this.defaultQuota = defaultQuota;
       this.perUserQuota = perUserQuota;
+      this.allSuccessfulMeter = allSuccessfulMeter;
+      this.allFailedMeter = allFailedMeter;
       if (instrumentationEnabled) {
         this.metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
         this.eventSubmitter = Optional.of(new 
EventSubmitter.Builder(this.metricContext, 
"org.apache.gobblin.service").build());
         this.jobStatusPolledTimer = 
Optional.of(this.metricContext.timer(ServiceMetricNames.JOB_STATUS_POLLED_TIMER));
         ContextAwareGauge<Long> orchestrationDelayMetric = 
metricContext.newContextAwareGauge(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY,
             () -> orchestrationDelay.get());
-        this.allSuccessfulMeter = metricContext.contextAwareMeter(
-            MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, 
ServiceMetricNames.SUCCESSFUL_FLOW_METER));
-        this.allFailedMeter = metricContext.contextAwareMeter(
-            MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, 
ServiceMetricNames.FAILED_FLOW_METER));
         this.metricContext.register(orchestrationDelayMetric);
       } else {
         this.metricContext = null;
@@ -1131,9 +1143,10 @@ public class DagManager extends AbstractIdleService {
       return counters;
     }
 
-    private ContextAwareMeter getGroupMeterForDag(String dagId, String 
meterName) {
+    private ContextAwareMeter getGroupMeterForDag(String dagId, String 
meterName, Map<String, ContextAwareMeter> meterMap) {
       String flowGroup = 
DagManagerUtils.getFlowId(this.dags.get(dagId)).getFlowGroup();
-      return 
metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
 flowGroup, meterName));
+      return meterMap.computeIfAbsent(flowGroup,
+          group -> 
metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
 group, meterName)));
     }
 
     /**
@@ -1184,7 +1197,7 @@ public class DagManager extends AbstractIdleService {
       if (this.metricContext != null) {
         
flowGauges.put(DagManagerUtils.getFlowId(this.dags.get(dagId)).toString(), 
FlowState.SUCCESSFUL);
         this.allSuccessfulMeter.mark();
-        getGroupMeterForDag(dagId, 
ServiceMetricNames.SUCCESSFUL_FLOW_METER).mark();
+        getGroupMeterForDag(dagId, ServiceMetricNames.SUCCESSFUL_FLOW_METER, 
groupSuccessfulMeters).mark();
       }
     }
 
@@ -1193,7 +1206,7 @@ public class DagManager extends AbstractIdleService {
       if (this.metricContext != null) {
         
flowGauges.put(DagManagerUtils.getFlowId(this.dags.get(dagId)).toString(), 
FlowState.FAILED);
         this.allFailedMeter.mark();
-        getGroupMeterForDag(dagId, 
ServiceMetricNames.FAILED_FLOW_METER).mark();
+        getGroupMeterForDag(dagId, ServiceMetricNames.FAILED_FLOW_METER, 
groupFailureMeters).mark();
       }
     }
 
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index 7749903..40c17df 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -47,6 +47,9 @@ import com.typesafe.config.ConfigValueFactory;
 
 import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.runtime.spec_executorInstance.MockedSpecExecutor;
@@ -57,6 +60,7 @@ import 
org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
 import org.apache.gobblin.service.monitoring.JobStatus;
 import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+import org.apache.gobblin.util.ConfigUtils;
 
 
 public class DagManagerTest {
@@ -84,8 +88,10 @@ public class DagManagerTest {
     this.queue = new LinkedBlockingQueue<>();
     this.cancelQueue = new LinkedBlockingQueue<>();
     this.resumeQueue = new LinkedBlockingQueue<>();
+    MetricContext metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
     this._dagManagerThread = new 
DagManager.DagManagerThread(_jobStatusRetriever, _dagStateStore, 
failedDagStateStore, queue, cancelQueue,
-        resumeQueue, true, 5, new HashMap<>(), new HashSet<>());
+        resumeQueue, true, 5, new HashMap<>(), new HashSet<>(), 
metricContext.contextAwareMeter("successMeter"),
+        metricContext.contextAwareMeter("failedMeter"));
 
     Field jobToDagField = 
DagManager.DagManagerThread.class.getDeclaredField("jobToDag");
     jobToDagField.setAccessible(true);

Reply via email to