This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new e68b62e739 [GOBBLIN-2117] Initialize metrics map for
DagProcEngineMetrics (#4007)
e68b62e739 is described below
commit e68b62e739976a3d370d0501fdf27c67fd784846
Author: umustafi <[email protected]>
AuthorDate: Mon Jul 22 13:32:29 2024 -0700
[GOBBLIN-2117] Initialize metrics map for DagProcEngineMetrics (#4007)
* Initialize metrics map for DagProcEngineMetrics
---
.../apache/gobblin/metrics/ServiceMetricNames.java | 1 -
.../modules/orchestration/MysqlDagActionStore.java | 2 +
.../task/DagProcessingEngineMetrics.java | 57 +++++++++++++---------
.../DagProcessingEngineMetricsTest.java | 54 ++++++++++++++++++++
4 files changed, 91 insertions(+), 23 deletions(-)
diff --git
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
index e2acdc595b..cc80975766 100644
---
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
+++
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
@@ -101,6 +101,5 @@ public class ServiceMetricNames {
public static final String DAG_ACTIONS_CONCLUDE_SUCCEEDED =
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsConcludeSucceeded.";
public static final String DAG_ACTIONS_DELETE_SUCCEEDED =
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsDeleteSucceeded.";
public static final String DAG_ACTIONS_DELETE_FAILED =
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsDeleteFailed.";
- // TODO: implement this one
public static final String DAG_ACTIONS_AVERAGE_PROCESSING_DELAY_MILLIS =
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsAvgProcessingDelayMillis.";
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
index f5b8aa277a..0e4d10940c 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
@@ -133,8 +133,10 @@ public class MysqlDagActionStore implements DagActionStore
{
try {
fillPreparedStatement(dagAction.getFlowGroup(), dagAction.getFlowName(),
dagAction.getFlowExecutionId(),
dagAction.getJobName(), dagAction.getDagActionType(),
deleteStatement);
+
this.dagProcessingEngineMetrics.markDagActionsDeleted(dagAction.getDagActionType(),
true);
return deleteStatement.executeUpdate() != 0;
} catch (SQLException e) {
+
this.dagProcessingEngineMetrics.markDagActionsDeleted(dagAction.getDagActionType(),
false);
throw new IOException(String.format("Failure deleting action for
DagAction: %s in table %s", dagAction,
tableName), e);
}}, true);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java
index 1011eae473..d8900ac8be 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java
@@ -17,9 +17,8 @@
package org.apache.gobblin.service.modules.orchestration.task;
-import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
import javax.inject.Inject;
import lombok.extern.slf4j.Slf4j;
@@ -47,22 +46,22 @@ public class DagProcessingEngineMetrics {
handle concurrent mark requests correctly. ConcurrentMap is not needed
since no updates are made to the mappings,
only get calls.
*/
- private HashMap<DagActionStore.DagActionType, ContextAwareMeter>
dagActionsStoredMeterByDagActionType = new HashMap();
- private HashMap<DagActionStore.DagActionType, ContextAwareMeter>
dagActionsObservedMeterByDagActionType = new HashMap();
- private HashMap<DagActionStore.DagActionType, ContextAwareMeter>
dagActionsLeasesObtainedMeterByDagActionType = new HashMap();
- private HashMap<DagActionStore.DagActionType, ContextAwareMeter>
dagActionsNoLongerLeasingMeterByDagActionType = new HashMap();
- private HashMap<DagActionStore.DagActionType, ContextAwareMeter>
dagActionsLeaseReminderScheduledMeterByDagActionType = new HashMap();
- private HashMap<DagActionStore.DagActionType, ContextAwareMeter>
dagActionsReminderProcessedMeterByDagActionType = new HashMap();
- private HashMap<DagActionStore.DagActionType, ContextAwareMeter>
dagActionsExceededMaxRetryMeterByDagActionType = new HashMap();
- private HashMap<DagActionStore.DagActionType, ContextAwareMeter>
dagActionsInitializeFailedMeterByDagActionType = new HashMap();
- private HashMap<DagActionStore.DagActionType, ContextAwareMeter>
dagActionsInitializeSucceededMeterByDagActionType = new HashMap();
- private HashMap<DagActionStore.DagActionType, ContextAwareMeter>
dagActionsActFailedMeterByDagActionType = new HashMap();
- private HashMap<DagActionStore.DagActionType, ContextAwareMeter>
dagActionsActSucceededMeterByDagActionType = new HashMap();
- private HashMap<DagActionStore.DagActionType, ContextAwareMeter>
dagActionsConcludeFailedMeterByDagActionType = new HashMap();
- private HashMap<DagActionStore.DagActionType, ContextAwareMeter>
dagActionsConcludeSucceededMeterByDagActionType = new HashMap();
- private HashMap<DagActionStore.DagActionType, ContextAwareMeter>
dagActionsRemovedFromStoreMeterByDagActionType = new HashMap();
- private HashMap<DagActionStore.DagActionType, ContextAwareMeter>
dagActionsFailingRemovalMeterByDagActionType = new HashMap();
- private HashMap<DagActionStore.DagActionType, ContextAwareMeter>
dagActionAverageProcessingDelayMillisMeterByDagActionType = new HashMap();
+ private final HashMap<DagActionStore.DagActionType, ContextAwareMeter>
dagActionsStoredMeterByDagActionType = new HashMap<>();
+ private final HashMap<DagActionStore.DagActionType, ContextAwareMeter>
dagActionsObservedMeterByDagActionType = new HashMap<>();
+ private final HashMap<DagActionStore.DagActionType, ContextAwareMeter>
dagActionsLeasesObtainedMeterByDagActionType = new HashMap<>();
+ private final HashMap<DagActionStore.DagActionType, ContextAwareMeter>
dagActionsNoLongerLeasingMeterByDagActionType = new HashMap<>();
+ private final HashMap<DagActionStore.DagActionType, ContextAwareMeter>
dagActionsLeaseReminderScheduledMeterByDagActionType = new HashMap<>();
+ private final HashMap<DagActionStore.DagActionType, ContextAwareMeter>
dagActionsReminderProcessedMeterByDagActionType = new HashMap<>();
+ private final HashMap<DagActionStore.DagActionType, ContextAwareMeter>
dagActionsExceededMaxRetryMeterByDagActionType = new HashMap<>();
+ private final HashMap<DagActionStore.DagActionType, ContextAwareMeter>
dagActionsInitializeFailedMeterByDagActionType = new HashMap<>();
+ private final HashMap<DagActionStore.DagActionType, ContextAwareMeter>
dagActionsInitializeSucceededMeterByDagActionType = new HashMap<>();
+ private final HashMap<DagActionStore.DagActionType, ContextAwareMeter>
dagActionsActFailedMeterByDagActionType = new HashMap<>();
+ private final HashMap<DagActionStore.DagActionType, ContextAwareMeter>
dagActionsActSucceededMeterByDagActionType = new HashMap<>();
+ private final HashMap<DagActionStore.DagActionType, ContextAwareMeter>
dagActionsConcludeFailedMeterByDagActionType = new HashMap<>();
+ private final HashMap<DagActionStore.DagActionType, ContextAwareMeter>
dagActionsConcludeSucceededMeterByDagActionType = new HashMap<>();
+ private final HashMap<DagActionStore.DagActionType, ContextAwareMeter>
dagActionsDeleteFailedMeterByDagActionType = new HashMap<>();
+ private final HashMap<DagActionStore.DagActionType, ContextAwareMeter>
dagActionsDeleteSucceededMeterByDagActionType = new HashMap<>();
+ private final HashMap<DagActionStore.DagActionType, ContextAwareMeter>
dagActionsAverageProcessingDelayMillisMeterByDagActionType = new HashMap<>();
public DagProcessingEngineMetrics(MetricContext metricContext) {
this.metricContext = metricContext;
@@ -71,10 +70,9 @@ public class DagProcessingEngineMetrics {
@Inject
public DagProcessingEngineMetrics() {
- // Create a new metric context for the DagProcessingEngineMetrics tagged
appropriately
- List<Tag<?>> tags = new ArrayList<>();
- tags.add(new Tag<>(MetricTagNames.METRIC_BACKEND_REPRESENTATION,
GobblinMetrics.MetricType.COUNTER));
- this.metricContext = Instrumented.getMetricContext(new State(),
this.getClass(), tags);
+ this(Instrumented.getMetricContext(new State(),
+ DagProcessingEngineMetrics.class,
+ Collections.singleton(new
Tag<>(MetricTagNames.METRIC_BACKEND_REPRESENTATION,
GobblinMetrics.MetricType.COUNTER))));
}
public void registerAllMetrics() {
@@ -91,6 +89,9 @@ public class DagProcessingEngineMetrics {
registerMetricForEachDagActionType(this.dagActionsActSucceededMeterByDagActionType,
ServiceMetricNames.DAG_ACTIONS_ACT_SUCCEEDED);
registerMetricForEachDagActionType(this.dagActionsConcludeFailedMeterByDagActionType,
ServiceMetricNames.DAG_ACTIONS_CONCLUDE_FAILED);
registerMetricForEachDagActionType(this.dagActionsConcludeSucceededMeterByDagActionType,
ServiceMetricNames.DAG_ACTIONS_CONCLUDE_SUCCEEDED);
+
registerMetricForEachDagActionType(this.dagActionsDeleteFailedMeterByDagActionType,
ServiceMetricNames.DAG_ACTIONS_DELETE_FAILED);
+
registerMetricForEachDagActionType(this.dagActionsDeleteSucceededMeterByDagActionType,
ServiceMetricNames.DAG_ACTIONS_DELETE_SUCCEEDED);
+
registerMetricForEachDagActionType(this.dagActionsAverageProcessingDelayMillisMeterByDagActionType,
ServiceMetricNames.DAG_ACTIONS_AVERAGE_PROCESSING_DELAY_MILLIS);
}
/**
@@ -161,7 +162,19 @@ public class DagProcessingEngineMetrics {
updateMetricForDagActionType(this.dagActionsConcludeFailedMeterByDagActionType,
dagActionType);
}
}
+
+ public void markDagActionsDeleted(DagActionStore.DagActionType
dagActionType, boolean succeeded) {
+ if (succeeded) {
+
updateMetricForDagActionType(this.dagActionsDeleteSucceededMeterByDagActionType,
dagActionType);
+ } else {
+
updateMetricForDagActionType(this.dagActionsDeleteFailedMeterByDagActionType,
dagActionType);
+ }
+ }
+ // TODO: measure processing time
+ public void mark(DagActionStore.DagActionType dagActionType) {
+
updateMetricForDagActionType(this.dagActionsAverageProcessingDelayMillisMeterByDagActionType,
dagActionType);
+ }
/**
* Generic helper used to increment a metric corresponding to the
dagActionType in the provided map. It assumes the
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineMetricsTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineMetricsTest.java
new file mode 100644
index 0000000000..a3453a9afc
--- /dev/null
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineMetricsTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class DagProcessingEngineMetricsTest {
+ DagProcessingEngineMetrics dagProcessingEngineMetrics;
+
+ @BeforeClass
+ public void setup() {
+ dagProcessingEngineMetrics = new DagProcessingEngineMetrics();
+ }
+
+ /*
+ Checks that the dagActionsStored metric can be incremented for every
DagActionType. It ensures that the default
+ constructor of the DagProcessingEngine initializes a meter for each
DagActionType.
+ */
+ @Test
+ public void testMarkDagActionsStored() {
+ for (DagActionStore.DagActionType dagActionType :
DagActionStore.DagActionType.values()) {
+ dagProcessingEngineMetrics.markDagActionsStored(dagActionType);
+ }
+ }
+
+ /*
+ Checks that the dagActionsObserved metric can be incremented for every
DagActionType. It ensures that the default
+ constructor of the DagProcessingEngine initializes a meter for each
DagActionType.
+ */
+ @Test
+ public void testMarkDagActionsObserved() {
+ for (DagActionStore.DagActionType dagActionType :
DagActionStore.DagActionType.values()) {
+ dagProcessingEngineMetrics.markDagActionsStored(dagActionType);
+ }
+ }
+}