This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 50fdda9397 Implement dagProcessingEngine metrics (#3983)
50fdda9397 is described below
commit 50fdda93978d8dbc09485e5d5706f7d725bde6ef
Author: umustafi <[email protected]>
AuthorDate: Fri Jul 19 12:49:33 2024 -0700
Implement dagProcessingEngine metrics (#3983)
Implement dagProcessingEngine metrics
---
.../apache/gobblin/metrics/ServiceMetricNames.java | 23 +++
.../runtime/DagActionStoreChangeMonitorTest.java | 3 +-
...gManagementDagActionStoreChangeMonitorTest.java | 4 +-
.../modules/core/GobblinServiceGuiceModule.java | 3 +
.../orchestration/DagManagementTaskStreamImpl.java | 24 ++-
.../modules/orchestration/DagProcessingEngine.java | 17 +-
.../MySqlDagManagementStateStore.java | 3 +-
.../modules/orchestration/MysqlDagActionStore.java | 6 +-
.../modules/orchestration/proc/DagProc.java | 23 ++-
.../proc/DeadlineEnforcementDagProc.java | 18 ++-
.../proc/EnforceFlowFinishDeadlineDagProc.java | 7 +-
.../proc/EnforceJobStartDeadlineDagProc.java | 9 +-
.../modules/orchestration/proc/KillDagProc.java | 12 +-
.../modules/orchestration/proc/LaunchDagProc.java | 8 +-
.../orchestration/proc/ReevaluateDagProc.java | 13 +-
.../modules/orchestration/proc/ResumeDagProc.java | 10 +-
.../task/DagProcessingEngineMetrics.java | 179 +++++++++++++++++++++
.../modules/orchestration/task/DagTask.java | 9 +-
.../task/EnforceFlowFinishDeadlineDagTask.java | 4 +-
.../task/EnforceJobStartDeadlineDagTask.java | 4 +-
.../modules/orchestration/task/KillDagTask.java | 4 +-
.../modules/orchestration/task/LaunchDagTask.java | 4 +-
.../orchestration/task/ReevaluateDagTask.java | 4 +-
.../modules/orchestration/task/ResumeDagTask.java | 4 +-
.../monitoring/DagActionStoreChangeMonitor.java | 10 +-
.../DagActionStoreChangeMonitorFactory.java | 8 +-
.../DagManagementDagActionStoreChangeMonitor.java | 8 +-
...nagementDagActionStoreChangeMonitorFactory.java | 7 +-
.../DagManagementTaskStreamImplTest.java | 5 +-
.../modules/orchestration/DagManagerFlowTest.java | 3 +-
.../orchestration/DagProcessingEngineTest.java | 16 +-
.../orchestration/MysqlDagActionStoreTest.java | 4 +-
.../proc/EnforceDeadlineDagProcsTest.java | 18 ++-
.../orchestration/proc/KillDagProcTest.java | 19 ++-
.../orchestration/proc/LaunchDagProcTest.java | 13 +-
.../orchestration/proc/ReevaluateDagProcTest.java | 22 ++-
.../orchestration/proc/ResumeDagProcTest.java | 7 +-
.../orchestration/task/LaunchDagTaskTest.java | 3 +-
38 files changed, 425 insertions(+), 113 deletions(-)
diff --git
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
index 3af11c1f10..e2acdc595b 100644
---
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
+++
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
@@ -80,4 +80,27 @@ public class ServiceMetricNames {
public static final String DAG_COUNT_FS_DAG_STATE_COUNT =
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "FsDagStateStore" + ".totalDagCount";
public static final String DAG_PROCESSING_EXCEPTION_METER =
"DagProcessingException";
+ /* DagProcessingEngine & Multi-active Execution Related Metrics
+ * Note: metrics ending with the delimiter '.' will be suffixed by the
specific {@link DagActionType} type for finer
+ * grained monitoring of each dagAction type in addition to the aggregation
of all types.
+ */
+ public static final String DAG_PROCESSING_ENGINE_PREFIX =
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "DagProcEngine.";
+ public static final String DAG_ACTIONS_STORED = DAG_PROCESSING_ENGINE_PREFIX
+ "dagActionsStored.";
+ public static final String DAG_ACTIONS_OBSERVED =
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsObserved.";
+ public static final String DAG_ACTIONS_LEASES_OBTAINED =
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsLeasesObtained.";
+ public static final String DAG_ACTIONS_NO_LONGER_LEASING =
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsNoLongerLeasing.";
+ public static final String DAG_ACTIONS_LEASE_REMINDER_SCHEDULED =
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsLeaseReminderScheduled.";
+ public static final String DAG_ACTIONS_REMINDER_PROCESSED =
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsRemindersProcessed.";
+ // TODO: implement dropping reminder event after exceed some time
+ public static final String DAG_ACTIONS_EXCEEDED_MAX_RETRY =
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsExceededMaxRetry.";
+ public static final String DAG_ACTIONS_INITIALIZE_FAILED =
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsInitializeFailed.";
+ public static final String DAG_ACTIONS_INITIALIZE_SUCCEEDED =
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsInitializeSucceeded.";
+ public static final String DAG_ACTIONS_ACT_FAILED =
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsActFailed.";
+ public static final String DAG_ACTIONS_ACT_SUCCEEDED =
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsActSucceeded.";
+ public static final String DAG_ACTIONS_CONCLUDE_FAILED =
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsConcludeFailed.";
+ public static final String DAG_ACTIONS_CONCLUDE_SUCCEEDED =
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsConcludeSucceeded.";
+ public static final String DAG_ACTIONS_DELETE_SUCCEEDED =
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsDeleteSucceeded.";
+ public static final String DAG_ACTIONS_DELETE_FAILED =
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsDeleteFailed.";
+ // TODO: implement this one
+ public static final String DAG_ACTIONS_AVERAGE_PROCESSING_DELAY_MILLIS =
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsAvgProcessingDelayMillis.";
}
diff --git
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
index a5de624ced..a246a8473e 100644
---
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
+++
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.runtime;
import java.net.URI;
+import
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -90,7 +91,7 @@ public class DagActionStoreChangeMonitorTest {
public MockDagActionStoreChangeMonitor(String topic, Config config, int
numThreads, boolean isMultiActiveSchedulerEnabled,
DagManagementStateStore dagManagementStateStore, DagManager
dagManager, FlowCatalog flowCatalog, Orchestrator orchestrator) {
super(topic, config, dagManager, numThreads, flowCatalog, orchestrator,
- dagManagementStateStore, isMultiActiveSchedulerEnabled);
+ dagManagementStateStore, isMultiActiveSchedulerEnabled,
mock(DagProcessingEngineMetrics.class));
}
protected void processMessageForTest(DecodeableKafkaRecord record) {
diff --git
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagManagementDagActionStoreChangeMonitorTest.java
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagManagementDagActionStoreChangeMonitorTest.java
index 3dbf0a9612..261e686bd2 100644
---
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagManagementDagActionStoreChangeMonitorTest.java
+++
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagManagementDagActionStoreChangeMonitorTest.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.runtime;
+import
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.quartz.SchedulerException;
import org.testng.annotations.BeforeClass;
@@ -75,7 +76,8 @@ public class DagManagementDagActionStoreChangeMonitorTest {
public MockDagManagementDagActionStoreChangeMonitor(Config config, int
numThreads, boolean isMultiActiveSchedulerEnabled) {
super(config, numThreads, mock(FlowCatalog.class),
mock(Orchestrator.class), mock(DagManagementStateStore.class),
- isMultiActiveSchedulerEnabled, mock(DagManagement.class),
dagActionReminderScheduler);
+ isMultiActiveSchedulerEnabled, mock(DagManagement.class),
dagActionReminderScheduler,
+ mock(DagProcessingEngineMetrics.class));
}
protected void processMessageForTest(DecodeableKafkaRecord<String,
DagActionStoreChangeEvent> record) {
super.processMessage(record);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
index 6ed53ba00e..508dcb6d8e 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
@@ -82,6 +82,7 @@ import
org.apache.gobblin.service.modules.orchestration.MysqlDagActionStore;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
import org.apache.gobblin.service.modules.orchestration.proc.DagProcUtils;
+import
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import
org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigResourceHandler;
import
org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigV2ResourceHandler;
import
org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigV2ResourceHandlerWithWarmStandby;
@@ -187,6 +188,8 @@ public class GobblinServiceGuiceModule implements Module {
binder.bind(FlowExecutionResourceHandler.class).to(GobblinServiceFlowExecutionResourceHandler.class);
}
+ binder.bind(DagProcessingEngineMetrics.class);
+
/* Note that two instances of the same class can only be differentiated
with an `annotatedWith` marker provided at
binding time (optionally bound classes cannot have names associated with
them), so both arbiters need to be
explicitly bound to be differentiated. The scheduler lease arbiter is only
used in single-active scheduler mode,
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
index c2507c5ebd..18672a99f1 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
@@ -22,6 +22,7 @@ import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import org.quartz.SchedulerException;
import com.google.inject.Inject;
@@ -82,13 +83,14 @@ public class DagManagementTaskStreamImpl implements
DagManagement, DagTaskStream
private static final int MAX_HOUSEKEEPING_THREAD_DELAY = 180;
private final BlockingQueue<DagActionStore.LeaseParams> leaseParamsQueue =
new LinkedBlockingQueue<>();
private final DagManagementStateStore dagManagementStateStore;
+ private final DagProcessingEngineMetrics dagProcEngineMetrics;
@Inject
public DagManagementTaskStreamImpl(Config config, Optional<DagActionStore>
dagActionStore,
@Named(ConfigurationKeys.PROCESSING_LEASE_ARBITER_NAME)
MultiActiveLeaseArbiter dagActionProcessingLeaseArbiter,
Optional<DagActionReminderScheduler> dagActionReminderScheduler,
@Named(InjectionNames.MULTI_ACTIVE_EXECUTION_ENABLED) boolean
isMultiActiveExecutionEnabled,
- DagManagementStateStore dagManagementStateStore) {
+ DagManagementStateStore dagManagementStateStore,
DagProcessingEngineMetrics dagProcEngineMetrics) {
this.config = config;
if (!dagActionStore.isPresent()) {
/* DagActionStore is optional because there are other configurations
that do not require it and it's initialized
@@ -105,6 +107,7 @@ public class DagManagementTaskStreamImpl implements
DagManagement, DagTaskStream
MetricContext metricContext =
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()),
getClass());
this.eventSubmitter = new EventSubmitter.Builder(metricContext,
"org.apache.gobblin.service").build();
this.dagManagementStateStore = dagManagementStateStore;
+ this.dagProcEngineMetrics = dagProcEngineMetrics;
}
public synchronized void addDagAction(DagActionStore.LeaseParams
leaseParams) {
@@ -136,6 +139,10 @@ public class DagManagementTaskStreamImpl implements
DagManagement, DagTaskStream
} else { // Handle original non-deadline dagActions as well as
reminder events of all types
LeaseAttemptStatus leaseAttemptStatus =
retrieveLeaseStatus(leaseParams);
if (leaseAttemptStatus instanceof
LeaseAttemptStatus.LeaseObtainedStatus) {
+
this.dagProcEngineMetrics.markDagActionsLeasedObtained(leaseParams);
+ if (leaseParams.isReminder()) {
+
this.dagProcEngineMetrics.markDagActionsRemindersProcessed(leaseParams);
+ }
return createDagTask(dagAction,
(LeaseAttemptStatus.LeaseObtainedStatus) leaseAttemptStatus);
}
}
@@ -198,6 +205,9 @@ public class DagManagementTaskStreamImpl implements
DagManagement, DagTaskStream
*/
if (!(leaseAttemptStatus instanceof
LeaseAttemptStatus.NoLongerLeasingStatus)) {
scheduleReminderForEvent(leaseAttemptStatus);
+
this.dagProcEngineMetrics.markDagActionsLeaseReminderScheduled(leaseParams);
+ } else {
+ this.dagProcEngineMetrics.markDagActionsNoLongerLeasing(leaseParams);
}
return leaseAttemptStatus;
}
@@ -207,17 +217,17 @@ public class DagManagementTaskStreamImpl implements
DagManagement, DagTaskStream
switch (dagActionType) {
case ENFORCE_FLOW_FINISH_DEADLINE:
- return new EnforceFlowFinishDeadlineDagTask(dagAction,
leaseObtainedStatus, dagManagementStateStore);
+ return new EnforceFlowFinishDeadlineDagTask(dagAction,
leaseObtainedStatus, dagManagementStateStore, dagProcEngineMetrics);
case ENFORCE_JOB_START_DEADLINE:
- return new EnforceJobStartDeadlineDagTask(dagAction,
leaseObtainedStatus, dagManagementStateStore);
+ return new EnforceJobStartDeadlineDagTask(dagAction,
leaseObtainedStatus, dagManagementStateStore, dagProcEngineMetrics);
case KILL:
- return new KillDagTask(dagAction, leaseObtainedStatus,
dagManagementStateStore);
+ return new KillDagTask(dagAction, leaseObtainedStatus,
dagManagementStateStore, dagProcEngineMetrics);
case LAUNCH:
- return new LaunchDagTask(dagAction, leaseObtainedStatus,
dagManagementStateStore);
+ return new LaunchDagTask(dagAction, leaseObtainedStatus,
dagManagementStateStore, dagProcEngineMetrics);
case REEVALUATE:
- return new ReevaluateDagTask(dagAction, leaseObtainedStatus,
dagManagementStateStore);
+ return new ReevaluateDagTask(dagAction, leaseObtainedStatus,
dagManagementStateStore, dagProcEngineMetrics);
case RESUME:
- return new ResumeDagTask(dagAction, leaseObtainedStatus,
dagManagementStateStore);
+ return new ResumeDagTask(dagAction, leaseObtainedStatus,
dagManagementStateStore, dagProcEngineMetrics);
default:
throw new UnsupportedOperationException(dagActionType + " not yet
implemented");
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
index 449b9cf994..813c81b3d9 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
@@ -36,6 +36,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import org.apache.gobblin.service.modules.orchestration.task.DagTask;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.ExecutorsUtils;
@@ -61,13 +62,15 @@ public class DagProcessingEngine extends
AbstractIdleService {
private final Config config;
private final Optional<DagProcFactory> dagProcFactory;
private ScheduledExecutorService scheduledExecutorPool;
+ private final DagProcessingEngineMetrics dagProcEngineMetrics;
private static final Integer TERMINATION_TIMEOUT = 30;
public static final String DEFAULT_JOB_START_DEADLINE_TIME_MS =
"defaultJobStartDeadlineTimeMillis";
@Getter static long defaultJobStartSlaTimeMillis;
@Inject
public DagProcessingEngine(Config config, Optional<DagTaskStream>
dagTaskStream, Optional<DagProcFactory> dagProcFactory,
- Optional<DagManagementStateStore> dagManagementStateStore,
@Named(DEFAULT_JOB_START_DEADLINE_TIME_MS) long deadlineTimeMs) {
+ Optional<DagManagementStateStore> dagManagementStateStore,
+ @Named(DEFAULT_JOB_START_DEADLINE_TIME_MS) long deadlineTimeMs,
DagProcessingEngineMetrics dagProcEngineMetrics) {
this.config = config;
this.dagProcFactory = dagProcFactory;
this.dagTaskStream = dagTaskStream;
@@ -79,6 +82,7 @@ public class DagProcessingEngine extends AbstractIdleService {
this.dagProcFactory.isPresent() ? "present" : "MISSING",
this.dagManagementStateStore.isPresent() ? "present" : "MISSING"));
}
+ this.dagProcEngineMetrics = dagProcEngineMetrics;
log.info("DagProcessingEngine initialized.");
setDefaultJobStartDeadlineTimeMs(deadlineTimeMs);
}
@@ -98,7 +102,7 @@ public class DagProcessingEngine extends AbstractIdleService
{
for (int i=0; i < numThreads; i++) {
// todo - set metrics for count of active DagProcEngineThread
DagProcEngineThread dagProcEngineThread = new
DagProcEngineThread(dagTaskStream.get(), dagProcFactory.get(),
- dagManagementStateStore.get(), i);
+ dagManagementStateStore.get(), dagProcEngineMetrics, i);
this.scheduledExecutorPool.submit(dagProcEngineThread);
}
}
@@ -114,9 +118,10 @@ public class DagProcessingEngine extends
AbstractIdleService {
@AllArgsConstructor
@VisibleForTesting
static class DagProcEngineThread implements Runnable {
- private DagTaskStream dagTaskStream;
- private DagProcFactory dagProcFactory;
- private DagManagementStateStore dagManagementStateStore;
+ private final DagTaskStream dagTaskStream;
+ private final DagProcFactory dagProcFactory;
+ private final DagManagementStateStore dagManagementStateStore;
+ private final DagProcessingEngineMetrics dagProcEngineMetrics;
private final int threadID;
@Override
@@ -131,7 +136,7 @@ public class DagProcessingEngine extends
AbstractIdleService {
}
DagProc<?> dagProc = dagTask.host(dagProcFactory);
try {
- dagProc.process(dagManagementStateStore);
+ dagProc.process(dagManagementStateStore, dagProcEngineMetrics);
dagTask.conclude();
} catch (Exception e) {
log.error("DagProcEngineThread encountered exception while
processing dag " + dagProc.getDagId(), e);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java
index 285700b845..c0984f835b 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java
@@ -183,8 +183,7 @@ public class MySqlDagManagementStateStore implements
DagManagementStateStore {
@Override
public Set<Dag.DagNode<JobExecutionPlan>> getDagNodes(DagManager.DagId
dagId) throws IOException {
- return this.dagStateStore.getDagNodes(dagId);
- }
+ return this.dagStateStore.getDagNodes(dagId);}
@Override
public void tryAcquireQuota(Collection<Dag.DagNode<JobExecutionPlan>>
dagNodes) throws IOException {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
index c96478c79d..f5b8aa277a 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
@@ -36,6 +36,7 @@ import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metastore.MysqlDataSourceFactory;
import org.apache.gobblin.service.ServiceConfigKeys;
+import
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.DBStatementExecutor;
@@ -63,9 +64,10 @@ public class MysqlDagActionStore implements DagActionStore {
// Deletes rows older than retention time period (in seconds) to prevent
this table from growing unbounded.
private static final String RETENTION_STATEMENT = "DELETE FROM %s WHERE
modified_time < DATE_SUB(CURRENT_TIMESTAMP, INTERVAL %s SECOND)";
+ private final DagProcessingEngineMetrics dagProcessingEngineMetrics;
@Inject
- public MysqlDagActionStore(Config config) throws IOException {
+ public MysqlDagActionStore(Config config, DagProcessingEngineMetrics
dagProcessingEngineMetrics) throws IOException {
if (config.hasPath(CONFIG_PREFIX)) {
config = config.getConfig(CONFIG_PREFIX).withFallback(config);
} else {
@@ -89,6 +91,7 @@ public class MysqlDagActionStore implements DagActionStore {
String thisTableRetentionStatement = String.format(RETENTION_STATEMENT,
this.tableName, retentionPeriodSeconds);
// Periodically deletes all rows in the table last_modified before the
retention period defined by config.
dbStatementExecutor.repeatSqlCommandExecutionAtInterval(thisTableRetentionStatement,
6L, TimeUnit.HOURS);
+ this.dagProcessingEngineMetrics = dagProcessingEngineMetrics;
}
@Override
@@ -121,6 +124,7 @@ public class MysqlDagActionStore implements DagActionStore {
throw new IOException(String.format("Failure adding action for
DagAction: %s in table %s",
new DagAction(flowGroup, flowName, flowExecutionId, jobName,
dagActionType), tableName), e);
}}, true);
+ this.dagProcessingEngineMetrics.markDagActionsStored(dagActionType);
}
@Override
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
index effe61c360..3c8aadc6eb 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
@@ -29,9 +29,11 @@ import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import org.apache.gobblin.service.modules.orchestration.task.DagTask;
@@ -63,13 +65,26 @@ public abstract class DagProc<T> {
this.dagNodeId = this.dagTask.getDagAction().getDagNodeId();
}
- public final void process(DagManagementStateStore dagManagementStateStore)
throws IOException {
- T state = initialize(dagManagementStateStore);
- act(dagManagementStateStore, state);
+ public final void process(DagManagementStateStore dagManagementStateStore,
+ DagProcessingEngineMetrics dagProcEngineMetrics) throws IOException {
+ T state;
+ try {
+ state = initialize(dagManagementStateStore);
+ dagProcEngineMetrics.markDagActionsInitialize(getDagActionType(), true);
+ } catch (Exception e) {
+ dagProcEngineMetrics.markDagActionsInitialize(getDagActionType(), false);
+ throw e;
+ }
+ act(dagManagementStateStore, state, dagProcEngineMetrics);
log.info("{} concluded processing for dagId : {}",
getClass().getSimpleName(), this.dagId);
}
protected abstract T initialize(DagManagementStateStore
dagManagementStateStore) throws IOException;
- protected abstract void act(DagManagementStateStore dagManagementStateStore,
T state) throws IOException;
+ protected abstract void act(DagManagementStateStore dagManagementStateStore,
T state,
+ DagProcessingEngineMetrics dagProcEngineMetrics) throws IOException;
+
+ public DagActionStore.DagActionType getDagActionType() {
+ return this.dagTask.getDagAction().getDagActionType();
+ }
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DeadlineEnforcementDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DeadlineEnforcementDagProc.java
index 032a26df6d..0d3369a3fd 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DeadlineEnforcementDagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DeadlineEnforcementDagProc.java
@@ -25,6 +25,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import org.apache.gobblin.service.modules.orchestration.task.DagTask;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
@@ -45,19 +46,21 @@ abstract public class DeadlineEnforcementDagProc extends
DagProc<Optional<Dag<Jo
}
@Override
- protected void act(DagManagementStateStore dagManagementStateStore,
Optional<Dag<JobExecutionPlan>> dag)
- throws IOException {
+ protected void act(DagManagementStateStore dagManagementStateStore,
Optional<Dag<JobExecutionPlan>> dag,
+ DagProcessingEngineMetrics dagProcEngineMetrics) throws IOException {
if (validate(dag, dagManagementStateStore)) {
- enforceDeadline(dagManagementStateStore, dag.get());
- }
+ enforceDeadline(dagManagementStateStore, dag.get(),
dagProcEngineMetrics);
+ } else {
+ dagProcEngineMetrics.markDagActionsAct(getDagActionType(), false);
+ }
}
- private boolean validate(Optional<Dag<JobExecutionPlan>> dag,
DagManagementStateStore dagManagementStateStore) throws IOException {
+ protected boolean validate(Optional<Dag<JobExecutionPlan>> dag,
DagManagementStateStore dagManagementStateStore)
+ throws IOException {
log.info("Request to enforce deadlines for dag {}", getDagId());
DagActionStore.DagAction dagAction = getDagTask().getDagAction();
if (!dag.isPresent()) {
- // todo - add a metric here
log.error("Dag not present when validating {}. It may already have
cancelled/finished. Dag {}",
getDagId(), dagAction);
return false;
@@ -72,5 +75,6 @@ abstract public class DeadlineEnforcementDagProc extends
DagProc<Optional<Dag<Jo
return true;
}
- abstract void enforceDeadline(DagManagementStateStore
dagManagementStateStore, Dag<JobExecutionPlan> dag) throws IOException;
+ abstract void enforceDeadline(DagManagementStateStore
dagManagementStateStore, Dag<JobExecutionPlan> dag,
+ DagProcessingEngineMetrics dagProcEngineMetrics) throws IOException;
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java
index e5aa4125dc..b9b5d179e9 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java
@@ -26,6 +26,7 @@ import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import
org.apache.gobblin.service.modules.orchestration.task.EnforceFlowFinishDeadlineDagTask;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
@@ -41,8 +42,8 @@ public class EnforceFlowFinishDeadlineDagProc extends
DeadlineEnforcementDagProc
super(enforceFlowFinishDeadlineDagTask);
}
- protected void enforceDeadline(DagManagementStateStore
dagManagementStateStore, Dag<JobExecutionPlan> dag)
- throws IOException {
+ protected void enforceDeadline(DagManagementStateStore
dagManagementStateStore, Dag<JobExecutionPlan> dag,
+ DagProcessingEngineMetrics dagProcEngineMetrics) throws IOException {
Dag.DagNode<JobExecutionPlan> dagNode = dag.getNodes().get(0);
long flowFinishDeadline = DagManagerUtils.getFlowSLA(dagNode);
long flowStartTime = DagManagerUtils.getFlowStartTime(dagNode);
@@ -58,7 +59,9 @@ public class EnforceFlowFinishDeadlineDagProc extends
DeadlineEnforcementDagProc
dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_RUN_DEADLINE_EXCEEDED);
dag.setMessage("Flow killed due to exceeding SLA of " +
flowFinishDeadline + " ms");
+ dagProcEngineMetrics.markDagActionsAct(getDagActionType(), true);
} else {
+ dagProcEngineMetrics.markDagActionsAct(getDagActionType(), true);
log.error("EnforceFlowFinishDeadline dagAction received before due time.
flowStartTime {}, flowFinishDeadline {} ", flowStartTime, flowFinishDeadline);
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java
index 5eb2236f8f..6eccc1ab1a 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java
@@ -30,6 +30,7 @@ import org.apache.gobblin.service.modules.flowgraph.Dag;
import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
import org.apache.gobblin.service.modules.orchestration.DagProcessingEngine;
+import
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import
org.apache.gobblin.service.modules.orchestration.task.EnforceJobStartDeadlineDagTask;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
@@ -48,13 +49,13 @@ public class EnforceJobStartDeadlineDagProc extends
DeadlineEnforcementDagProc {
super(enforceJobStartDeadlineDagTask);
}
- protected void enforceDeadline(DagManagementStateStore
dagManagementStateStore, Dag<JobExecutionPlan> dag)
- throws IOException {
+ protected void enforceDeadline(DagManagementStateStore
dagManagementStateStore, Dag<JobExecutionPlan> dag,
+ DagProcessingEngineMetrics dagProcEngineMetrics) throws IOException {
Pair<Optional<Dag.DagNode<JobExecutionPlan>>,
Optional<org.apache.gobblin.service.monitoring.JobStatus>>
dagNodeToCheckDeadline =
dagManagementStateStore.getDagNodeWithJobStatus(getDagNodeId());
if (!dagNodeToCheckDeadline.getLeft().isPresent()) {
// this should never happen; a job for which DEADLINE_ENFORCEMENT dag
action is created must have a dag node in store
- // todo - add metrics
+ dagProcEngineMetrics.markDagActionsAct(getDagActionType(), false);
log.error("Dag node {} not found for EnforceJobStartDeadlineDagProc",
getDagNodeId());
return;
}
@@ -63,6 +64,7 @@ public class EnforceJobStartDeadlineDagProc extends
DeadlineEnforcementDagProc {
long timeOutForJobStart = DagManagerUtils.getJobStartSla(dagNode,
DagProcessingEngine.getDefaultJobStartSlaTimeMillis());
Optional<org.apache.gobblin.service.monitoring.JobStatus> jobStatus =
dagNodeToCheckDeadline.getRight();
if (!jobStatus.isPresent()) {
+ dagProcEngineMetrics.markDagActionsAct(getDagActionType(), false);
log.error("Some job status should be present for dag node {} that this
EnforceJobStartDeadlineDagProc belongs.", getDagNodeId());
return;
}
@@ -78,5 +80,6 @@ public class EnforceJobStartDeadlineDagProc extends
DeadlineEnforcementDagProc {
dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_START_DEADLINE_EXCEEDED);
dag.setMessage("Flow killed because no update received for " +
timeOutForJobStart + " ms after orchestration");
}
+ dagProcEngineMetrics.markDagActionsAct(getDagActionType(), true);
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java
index ab68d3e7b1..c429d754ab 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java
@@ -26,6 +26,7 @@ import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
@@ -46,16 +47,16 @@ public class KillDagProc extends
DagProc<Optional<Dag<JobExecutionPlan>>> {
@Override
protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore
dagManagementStateStore)
throws IOException {
- return dagManagementStateStore.getDag(getDagId());
+ return dagManagementStateStore.getDag(getDagId());
}
@Override
- protected void act(DagManagementStateStore dagManagementStateStore,
Optional<Dag<JobExecutionPlan>> dag)
- throws IOException {
+ protected void act(DagManagementStateStore dagManagementStateStore,
Optional<Dag<JobExecutionPlan>> dag,
+ DagProcessingEngineMetrics dagProcEngineMetrics) throws IOException {
log.info("Request to kill dag {} (node: {})", getDagId(),
shouldKillSpecificJob ? getDagNodeId() : "<<all>>");
if (!dag.isPresent()) {
- // todo - add a metric here
+ dagProcEngineMetrics.markDagActionsAct(getDagActionType(), false);
log.error("Did not find Dag with id {}, it might be already
cancelled/finished and thus cleaned up from the store.", getDagId());
return;
}
@@ -68,11 +69,12 @@ public class KillDagProc extends
DagProc<Optional<Dag<JobExecutionPlan>>> {
if (dagNodeToCancel.isPresent()) {
DagProcUtils.cancelDagNode(dagNodeToCancel.get(),
dagManagementStateStore);
} else {
- // todo - add a metric here
+ dagProcEngineMetrics.markDagActionsAct(getDagActionType(), false);
log.error("Did not find Dag node with id {}, it might be already
cancelled/finished and thus cleaned up from the store.", getDagNodeId());
}
} else {
DagProcUtils.cancelDag(dag.get(), dagManagementStateStore);
}
+ dagProcEngineMetrics.markDagActionsAct(getDagActionType(), true);
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
index 88f3249f1c..c72c1fcf96 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
@@ -28,6 +28,7 @@ import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
@@ -66,14 +67,15 @@ public class LaunchDagProc extends
DagProc<Optional<Dag<JobExecutionPlan>>> {
}
@Override
- protected void act(DagManagementStateStore dagManagementStateStore,
Optional<Dag<JobExecutionPlan>> dag)
- throws IOException {
+ protected void act(DagManagementStateStore dagManagementStateStore,
Optional<Dag<JobExecutionPlan>> dag,
+ DagProcessingEngineMetrics dagProcEngineMetrics) throws IOException {
if (!dag.isPresent()) {
log.warn("Dag with id " + getDagId() + " could not be compiled.");
- // todo - add metrics
+ dagProcEngineMetrics.markDagActionsAct(getDagActionType(), false);
} else {
DagProcUtils.submitNextNodes(dagManagementStateStore, dag.get(),
getDagId());
DagProcUtils.sendEnforceFlowFinishDeadlineDagAction(dagManagementStateStore,
getDagTask().getDagAction());
+ dagProcEngineMetrics.markDagActionsAct(getDagActionType(), true);
}
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
index 27525c057c..f4e02b8475 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
@@ -29,6 +29,7 @@ import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import org.apache.gobblin.service.modules.orchestration.task.ReevaluateDagTask;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
@@ -48,19 +49,19 @@ public class ReevaluateDagProc extends
DagProc<Pair<Optional<Dag.DagNode<JobExec
}
@Override
- protected Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>>
initialize(DagManagementStateStore dagManagementStateStore)
- throws IOException {
- return dagManagementStateStore.getDagNodeWithJobStatus(this.dagNodeId);
+ protected Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>>
initialize(
+ DagManagementStateStore dagManagementStateStore) throws IOException {
+ return dagManagementStateStore.getDagNodeWithJobStatus(this.dagNodeId);
}
@Override
protected void act(DagManagementStateStore dagManagementStateStore,
Pair<Optional<Dag.DagNode<JobExecutionPlan>>,
- Optional<JobStatus>> dagNodeWithJobStatus) throws IOException {
+ Optional<JobStatus>> dagNodeWithJobStatus, DagProcessingEngineMetrics
dagProcEngineMetrics) throws IOException {
if (!dagNodeWithJobStatus.getLeft().isPresent()) {
// one of the reason this could arise is when the MALA leasing doesn't
work cleanly and another DagProc::process
// has cleaned up the Dag, yet did not complete the lease before this
current one acquired its own
log.error("DagNode or its job status not found for a Reevaluate
DagAction with dag node id {}", this.dagNodeId);
- // todo - add metrics to count such occurrences
+ dagProcEngineMetrics.markDagActionsAct(getDagActionType(), false);
return;
}
@@ -72,6 +73,7 @@ public class ReevaluateDagProc extends
DagProc<Pair<Optional<Dag.DagNode<JobExec
// dag actions for each of those parallel job and in this scenario there
is no job status available.
// If the job status is not present, this job was never launched, submit
it now.
DagProcUtils.submitJobToExecutor(dagManagementStateStore, dagNode,
getDagId());
+ dagProcEngineMetrics.markDagActionsAct(getDagActionType(), true);
return;
}
@@ -118,6 +120,7 @@ public class ReevaluateDagProc extends
DagProc<Pair<Optional<Dag.DagNode<JobExec
DagProcUtils.removeFlowFinishDeadlineDagAction(dagManagementStateStore,
getDagId());
}
+ dagProcEngineMetrics.markDagActionsAct(getDagActionType(), true);
}
/**
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
index 3447035cc4..8a94c4ef12 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
@@ -31,6 +31,7 @@ import org.apache.gobblin.service.modules.flowgraph.Dag;
import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import org.apache.gobblin.service.modules.orchestration.task.ResumeDagTask;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
@@ -52,16 +53,16 @@ public class ResumeDagProc extends
DagProc<Optional<Dag<JobExecutionPlan>>> {
@Override
protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore
dagManagementStateStore)
throws IOException {
- return dagManagementStateStore.getFailedDag(getDagId());
+ return dagManagementStateStore.getFailedDag(getDagId());
}
@Override
- protected void act(DagManagementStateStore dagManagementStateStore,
Optional<Dag<JobExecutionPlan>> failedDag)
- throws IOException {
+ protected void act(DagManagementStateStore dagManagementStateStore,
Optional<Dag<JobExecutionPlan>> failedDag,
+ DagProcessingEngineMetrics dagProcEngineMetrics) throws IOException {
log.info("Request to resume dag {}", getDagId());
if (!failedDag.isPresent()) {
- // todo - add a metric here
+ dagProcEngineMetrics.markDagActionsAct(getDagActionType(), false);
log.error("Dag " + dagId + " was not found in dag state store");
return;
}
@@ -95,5 +96,6 @@ public class ResumeDagProc extends
DagProc<Optional<Dag<JobExecutionPlan>>> {
DagProcUtils.submitNextNodes(dagManagementStateStore, failedDag.get(),
getDagId());
DagProcUtils.sendEnforceFlowFinishDeadlineDagAction(dagManagementStateStore,
getDagTask().getDagAction());
+ dagProcEngineMetrics.markDagActionsAct(getDagActionType(), true);
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java
new file mode 100644
index 0000000000..1011eae473
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import javax.inject.Inject;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.MetricTagNames;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+
+/**
+ * Used to track all metrics relating to processing dagActions when
DagProcessingEngine is enabled. The metrics can be
+ * used to trace the number of dagActions (which can be further broken down by
type) at various points of the system,
+ * starting from addition to the DagActionStore to observation in the
DagActionChangeMonitor through the
+ * DagProcessingEngine pipeline (DagManagement -> DagTaskStreamImpl ->
MySqlMultiActiveLeaseArbiter -> DagProc).
+ */
+@Slf4j
+public class DagProcessingEngineMetrics {
+ MetricContext metricContext;
+ /*
+ Declare map of dagActionType to a ContextAwareMeter for each metric.
ContextAwareMeters are thread safe, so it will
+ handle concurrent mark requests correctly. ConcurrentMap is not needed
since no updates are made to the mappings,
+ only get calls.
+ */
+ private HashMap<DagActionStore.DagActionType, ContextAwareMeter>
dagActionsStoredMeterByDagActionType = new HashMap();
+ private HashMap<DagActionStore.DagActionType, ContextAwareMeter>
dagActionsObservedMeterByDagActionType = new HashMap();
+ private HashMap<DagActionStore.DagActionType, ContextAwareMeter>
dagActionsLeasesObtainedMeterByDagActionType = new HashMap();
+ private HashMap<DagActionStore.DagActionType, ContextAwareMeter>
dagActionsNoLongerLeasingMeterByDagActionType = new HashMap();
+ private HashMap<DagActionStore.DagActionType, ContextAwareMeter>
dagActionsLeaseReminderScheduledMeterByDagActionType = new HashMap();
+ private HashMap<DagActionStore.DagActionType, ContextAwareMeter>
dagActionsReminderProcessedMeterByDagActionType = new HashMap();
+ private HashMap<DagActionStore.DagActionType, ContextAwareMeter>
dagActionsExceededMaxRetryMeterByDagActionType = new HashMap();
+ private HashMap<DagActionStore.DagActionType, ContextAwareMeter>
dagActionsInitializeFailedMeterByDagActionType = new HashMap();
+ private HashMap<DagActionStore.DagActionType, ContextAwareMeter>
dagActionsInitializeSucceededMeterByDagActionType = new HashMap();
+ private HashMap<DagActionStore.DagActionType, ContextAwareMeter>
dagActionsActFailedMeterByDagActionType = new HashMap();
+ private HashMap<DagActionStore.DagActionType, ContextAwareMeter>
dagActionsActSucceededMeterByDagActionType = new HashMap();
+ private HashMap<DagActionStore.DagActionType, ContextAwareMeter>
dagActionsConcludeFailedMeterByDagActionType = new HashMap();
+ private HashMap<DagActionStore.DagActionType, ContextAwareMeter>
dagActionsConcludeSucceededMeterByDagActionType = new HashMap();
+ private HashMap<DagActionStore.DagActionType, ContextAwareMeter>
dagActionsRemovedFromStoreMeterByDagActionType = new HashMap();
+ private HashMap<DagActionStore.DagActionType, ContextAwareMeter>
dagActionsFailingRemovalMeterByDagActionType = new HashMap();
+ private HashMap<DagActionStore.DagActionType, ContextAwareMeter>
dagActionAverageProcessingDelayMillisMeterByDagActionType = new HashMap();
+
+ public DagProcessingEngineMetrics(MetricContext metricContext) {
+ this.metricContext = metricContext;
+ registerAllMetrics();
+ }
+
+ @Inject
+ public DagProcessingEngineMetrics() {
+ // Create a new metric context for the DagProcessingEngineMetrics tagged
appropriately
+ List<Tag<?>> tags = new ArrayList<>();
+ tags.add(new Tag<>(MetricTagNames.METRIC_BACKEND_REPRESENTATION,
GobblinMetrics.MetricType.COUNTER));
+ this.metricContext = Instrumented.getMetricContext(new State(),
this.getClass(), tags);
+ }
+
+ public void registerAllMetrics() {
+
registerMetricForEachDagActionType(this.dagActionsStoredMeterByDagActionType,
ServiceMetricNames.DAG_ACTIONS_STORED);
+
registerMetricForEachDagActionType(this.dagActionsObservedMeterByDagActionType,
ServiceMetricNames.DAG_ACTIONS_OBSERVED);
+
registerMetricForEachDagActionType(this.dagActionsLeasesObtainedMeterByDagActionType,
ServiceMetricNames.DAG_ACTIONS_LEASES_OBTAINED);
+
registerMetricForEachDagActionType(this.dagActionsNoLongerLeasingMeterByDagActionType,
ServiceMetricNames.DAG_ACTIONS_NO_LONGER_LEASING);
+
registerMetricForEachDagActionType(this.dagActionsLeaseReminderScheduledMeterByDagActionType,
ServiceMetricNames.DAG_ACTIONS_LEASE_REMINDER_SCHEDULED);
+
registerMetricForEachDagActionType(this.dagActionsReminderProcessedMeterByDagActionType,
ServiceMetricNames.DAG_ACTIONS_REMINDER_PROCESSED);
+
registerMetricForEachDagActionType(this.dagActionsExceededMaxRetryMeterByDagActionType,
ServiceMetricNames.DAG_ACTIONS_EXCEEDED_MAX_RETRY);
+
registerMetricForEachDagActionType(this.dagActionsInitializeFailedMeterByDagActionType,
ServiceMetricNames.DAG_ACTIONS_INITIALIZE_FAILED);
+
registerMetricForEachDagActionType(this.dagActionsInitializeSucceededMeterByDagActionType,
ServiceMetricNames.DAG_ACTIONS_INITIALIZE_SUCCEEDED);
+
registerMetricForEachDagActionType(this.dagActionsActFailedMeterByDagActionType,
ServiceMetricNames.DAG_ACTIONS_ACT_FAILED);
+
registerMetricForEachDagActionType(this.dagActionsActSucceededMeterByDagActionType,
ServiceMetricNames.DAG_ACTIONS_ACT_SUCCEEDED);
+
registerMetricForEachDagActionType(this.dagActionsConcludeFailedMeterByDagActionType,
ServiceMetricNames.DAG_ACTIONS_CONCLUDE_FAILED);
+
registerMetricForEachDagActionType(this.dagActionsConcludeSucceededMeterByDagActionType,
ServiceMetricNames.DAG_ACTIONS_CONCLUDE_SUCCEEDED);
+ }
+
+ /**
+ * Create a meter of each dagActionType for the given metric, register it
with the metric context, and store it in a
+ * concurrent map.
+ * @param metricMap
+ * @param metricName
+ */
+ private void
registerMetricForEachDagActionType(HashMap<DagActionStore.DagActionType,
ContextAwareMeter> metricMap, String metricName) {
+ for (DagActionStore.DagActionType dagActionType :
DagActionStore.DagActionType.values()) {
+ metricMap.put(dagActionType,
this.metricContext.contextAwareMeter(metricName + dagActionType));
+ }
+ }
+
+ public void markDagActionsStored(DagActionStore.DagActionType dagActionType)
{
+ updateMetricForDagActionType(this.dagActionsStoredMeterByDagActionType,
dagActionType);
+ }
+
+ public void markDagActionsObserved(DagActionStore.DagActionType
dagActionType) {
+ updateMetricForDagActionType(this.dagActionsObservedMeterByDagActionType,
dagActionType);
+ }
+
+ public void markDagActionsLeasedObtained(DagActionStore.LeaseParams
leaseParams) {
+
updateMetricForDagActionType(this.dagActionsLeasesObtainedMeterByDagActionType,
+ leaseParams.getDagAction().getDagActionType());
+ }
+
+ public void markDagActionsNoLongerLeasing(DagActionStore.LeaseParams
leaseParams) {
+
updateMetricForDagActionType(this.dagActionsNoLongerLeasingMeterByDagActionType,
+ leaseParams.getDagAction().getDagActionType());
+ }
+
+ public void markDagActionsLeaseReminderScheduled(DagActionStore.LeaseParams
leaseParams) {
+
updateMetricForDagActionType(this.dagActionsLeaseReminderScheduledMeterByDagActionType,
+ leaseParams.getDagAction().getDagActionType());
+ }
+
+ public void markDagActionsRemindersProcessed(DagActionStore.LeaseParams
leaseParams) {
+
updateMetricForDagActionType(this.dagActionsReminderProcessedMeterByDagActionType,
+ leaseParams.getDagAction().getDagActionType());
+ }
+
+ // TODO: implement evaluating max retries later
+ public void markDagActionsExceedingMaxRetry(DagActionStore.DagActionType
dagActionType) {
+
updateMetricForDagActionType(this.dagActionsExceededMaxRetryMeterByDagActionType,
dagActionType);
+ }
+
+ public void markDagActionsInitialize(DagActionStore.DagActionType
dagActionType, boolean succeeded) {
+ if (succeeded) {
+
updateMetricForDagActionType(this.dagActionsInitializeSucceededMeterByDagActionType,
dagActionType);
+ } else {
+
updateMetricForDagActionType(this.dagActionsInitializeFailedMeterByDagActionType,
dagActionType);
+ }
+ }
+
+ public void markDagActionsAct(DagActionStore.DagActionType dagActionType,
boolean succeeded) {
+ if (succeeded) {
+
updateMetricForDagActionType(this.dagActionsActSucceededMeterByDagActionType,
dagActionType);
+ } else {
+
updateMetricForDagActionType(this.dagActionsActFailedMeterByDagActionType,
dagActionType);
+ }
+ }
+
+ public void markDagActionsConclude(DagActionStore.DagActionType
dagActionType, boolean succeeded) {
+ if (succeeded) {
+
updateMetricForDagActionType(this.dagActionsConcludeSucceededMeterByDagActionType,
dagActionType);
+ } else {
+
updateMetricForDagActionType(this.dagActionsConcludeFailedMeterByDagActionType,
dagActionType);
+ }
+ }
+
+
+ /**
+ * Generic helper used to increment a metric corresponding to the
dagActionType in the provided map. It assumes the
+ * meter for each dagActionType can be identified by its name.
+ */
+ private void
updateMetricForDagActionType(HashMap<DagActionStore.DagActionType,
ContextAwareMeter> metricMap,
+ DagActionStore.DagActionType dagActionType) {
+ if (metricMap.containsKey(dagActionType)) {
+ metricMap.get(dagActionType).mark();
+ } else {
+ throw new RuntimeException(String.format("No meter exists for
dagActionType %s in metricsMap %s",
+ dagActionType, metricMap));
+ }
+ }
+}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
index df62eb971c..3fb6110e10 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
@@ -42,12 +42,14 @@ public abstract class DagTask {
@Getter public final DagActionStore.DagAction dagAction;
protected final DagManagementStateStore dagManagementStateStore;
private final LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus;
+ private final DagProcessingEngineMetrics dagProcEngineMetrics;
public DagTask(DagActionStore.DagAction dagAction,
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus,
- DagManagementStateStore dagManagementStateStore) {
+ DagManagementStateStore dagManagementStateStore,
DagProcessingEngineMetrics dagProcEngineMetrics) {
this.dagAction = dagAction;
this.leaseObtainedStatus = leaseObtainedStatus;
this.dagManagementStateStore = dagManagementStateStore;
+ this.dagProcEngineMetrics = dagProcEngineMetrics;
}
public abstract <T> T host(DagTaskVisitor<T> visitor);
@@ -60,8 +62,11 @@ public abstract class DagTask {
public boolean conclude() {
try {
this.dagManagementStateStore.deleteDagAction(this.dagAction);
- return this.leaseObtainedStatus.completeLease();
+ boolean completedLease = this.leaseObtainedStatus.completeLease();
+
this.dagProcEngineMetrics.markDagActionsConclude(this.dagAction.getDagActionType(),
true);
+ return completedLease;
} catch (IOException e) {
+
this.dagProcEngineMetrics.markDagActionsConclude(this.dagAction.getDagActionType(),
false);
// TODO: Decide appropriate exception to throw and add to the commit
method's signature
throw new RuntimeException(e);
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceFlowFinishDeadlineDagTask.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceFlowFinishDeadlineDagTask.java
index bfd709fb9a..466006d117 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceFlowFinishDeadlineDagTask.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceFlowFinishDeadlineDagTask.java
@@ -30,8 +30,8 @@ import
org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
public class EnforceFlowFinishDeadlineDagTask extends DagTask {
public EnforceFlowFinishDeadlineDagTask(DagActionStore.DagAction dagAction,
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus,
- DagManagementStateStore dagManagementStateStore) {
- super(dagAction, leaseObtainedStatus, dagManagementStateStore);
+ DagManagementStateStore dagManagementStateStore,
DagProcessingEngineMetrics dagProcEngineMetrics) {
+ super(dagAction, leaseObtainedStatus, dagManagementStateStore,
dagProcEngineMetrics);
}
public <T> T host(DagTaskVisitor<T> visitor) {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceJobStartDeadlineDagTask.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceJobStartDeadlineDagTask.java
index c1c270eb7b..821ff6dfba 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceJobStartDeadlineDagTask.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceJobStartDeadlineDagTask.java
@@ -30,8 +30,8 @@ import
org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
public class EnforceJobStartDeadlineDagTask extends DagTask {
public EnforceJobStartDeadlineDagTask(DagActionStore.DagAction dagAction,
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus,
- DagManagementStateStore dagManagementStateStore) {
- super(dagAction, leaseObtainedStatus, dagManagementStateStore);
+ DagManagementStateStore dagManagementStateStore,
DagProcessingEngineMetrics dagProcEngineMetrics) {
+ super(dagAction, leaseObtainedStatus, dagManagementStateStore,
dagProcEngineMetrics);
}
public <T> T host(DagTaskVisitor<T> visitor) {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/KillDagTask.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/KillDagTask.java
index 3d7315378c..c194f89f23 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/KillDagTask.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/KillDagTask.java
@@ -29,8 +29,8 @@ import
org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
public class KillDagTask extends DagTask {
public KillDagTask(DagActionStore.DagAction dagAction,
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus,
- DagManagementStateStore dagManagementStateStore) {
- super(dagAction, leaseObtainedStatus, dagManagementStateStore);
+ DagManagementStateStore dagManagementStateStore,
DagProcessingEngineMetrics dagProcEngineMetrics) {
+ super(dagAction, leaseObtainedStatus, dagManagementStateStore,
dagProcEngineMetrics);
}
public <T> T host(DagTaskVisitor<T> visitor) {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
index 909763a0cd..eed110e07a 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
@@ -36,8 +36,8 @@ import
org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
@Slf4j
public class LaunchDagTask extends DagTask {
public LaunchDagTask(DagActionStore.DagAction dagAction,
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus,
- DagManagementStateStore dagManagementStateStore) {
- super(dagAction, leaseObtainedStatus, dagManagementStateStore);
+ DagManagementStateStore dagManagementStateStore,
DagProcessingEngineMetrics dagProcEngineMetrics) {
+ super(dagAction, leaseObtainedStatus, dagManagementStateStore,
dagProcEngineMetrics);
}
public <T> T host(DagTaskVisitor<T> visitor) {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/ReevaluateDagTask.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/ReevaluateDagTask.java
index ea1b36e341..bd4aaa42c9 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/ReevaluateDagTask.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/ReevaluateDagTask.java
@@ -29,8 +29,8 @@ import
org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
public class ReevaluateDagTask extends DagTask {
public ReevaluateDagTask(DagActionStore.DagAction dagAction,
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus,
- DagManagementStateStore dagManagementStateStore) {
- super(dagAction, leaseObtainedStatus, dagManagementStateStore);
+ DagManagementStateStore dagManagementStateStore,
DagProcessingEngineMetrics dagProcEngineMetrics) {
+ super(dagAction, leaseObtainedStatus, dagManagementStateStore,
dagProcEngineMetrics);
}
public <T> T host(DagTaskVisitor<T> visitor) {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/ResumeDagTask.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/ResumeDagTask.java
index 4e5ed2657d..6b481212ec 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/ResumeDagTask.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/ResumeDagTask.java
@@ -28,8 +28,8 @@ import
org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
*/
public class ResumeDagTask extends DagTask {
public ResumeDagTask(DagActionStore.DagAction dagAction,
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus,
- DagManagementStateStore dagManagementStateStore) {
- super(dagAction, leaseObtainedStatus, dagManagementStateStore);
+ DagManagementStateStore dagManagementStateStore,
DagProcessingEngineMetrics dagProcEngineMetrics) {
+ super(dagAction, leaseObtainedStatus, dagManagementStateStore,
dagProcEngineMetrics);
}
public <T> T host(DagTaskVisitor<T> visitor) {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index d9d0913bcd..bb95d2481f 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -51,6 +51,7 @@ import
org.apache.gobblin.service.modules.orchestration.DagActionStore;
import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
/**
@@ -100,6 +101,7 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer<String, DagAc
@VisibleForTesting
protected FlowCatalog flowCatalog;
protected DagManagementStateStore dagManagementStateStore;
+ protected final DagProcessingEngineMetrics dagProcEngineMetrics;
@Getter
private volatile boolean isActive;
@@ -107,7 +109,7 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer<String, DagAc
// client itself to determine all Kafka related information dynamically
rather than through the config.
public DagActionStoreChangeMonitor(String topic, Config config, DagManager
dagManager, int numThreads,
FlowCatalog flowCatalog, Orchestrator orchestrator,
DagManagementStateStore dagManagementStateStore,
- boolean isMultiActiveSchedulerEnabled) {
+ boolean isMultiActiveSchedulerEnabled, DagProcessingEngineMetrics
dagProcEngineMetrics) {
// Differentiate group id for each host
super(topic, config.withValue(GROUP_ID_KEY,
ConfigValueFactory.fromAnyRef(DAG_ACTION_CHANGE_MONITOR_PREFIX +
UUID.randomUUID().toString())),
@@ -117,6 +119,7 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer<String, DagAc
this.orchestrator = orchestrator;
this.dagManagementStateStore = dagManagementStateStore;
this.isMultiActiveSchedulerEnabled = isMultiActiveSchedulerEnabled;
+ this.dagProcEngineMetrics = dagProcEngineMetrics;
/*
Metrics need to be created before initializeMonitor() below is called (or
more specifically handleDagAction() is
@@ -231,14 +234,15 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer<String, DagAc
dagActionsSeenCache.put(changeIdentifier, changeIdentifier);
}
- protected void handleDagAction(String operation, DagActionStore.DagAction
dagAction, String flowGroup, String flowName,
- long flowExecutionId, DagActionStore.DagActionType dagActionType) {
+ protected void handleDagAction(String operation, DagActionStore.DagAction
dagAction, String flowGroup,
+ String flowName, long flowExecutionId, DagActionStore.DagActionType
dagActionType) {
// We only expect INSERT and DELETE operations done to this table. INSERTs
correspond to any type of
// {@link DagActionStore.FlowActionType} flow requests that have to be
processed. DELETEs require no action.
try {
switch (operation) {
case "INSERT":
handleDagAction(dagAction, false);
+ this.dagProcEngineMetrics.markDagActionsObserved(dagActionType);
break;
case "UPDATE":
// TODO: change this warning message and process updates if for
launch or reevaluate type
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
index 5bac15089b..b392560251 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
@@ -31,6 +31,7 @@ import org.apache.gobblin.runtime.util.InjectionNames;
import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import org.apache.gobblin.util.ConfigUtils;
@@ -47,17 +48,20 @@ public class DagActionStoreChangeMonitorFactory implements
Provider<DagActionSto
private Orchestrator orchestrator;
private DagManagementStateStore dagManagementStateStore;
private boolean isMultiActiveSchedulerEnabled;
+ private DagProcessingEngineMetrics dagProcEngineMetrics;
@Inject
public DagActionStoreChangeMonitorFactory(Config config, DagManager
dagManager, FlowCatalog flowCatalog,
Orchestrator orchestrator, DagManagementStateStore
dagManagementStateStore,
- @Named(InjectionNames.MULTI_ACTIVE_SCHEDULER_ENABLED) boolean
isMultiActiveSchedulerEnabled) {
+ @Named(InjectionNames.MULTI_ACTIVE_SCHEDULER_ENABLED) boolean
isMultiActiveSchedulerEnabled,
+ DagProcessingEngineMetrics dagProcEngineMetrics) {
this.config = Objects.requireNonNull(config);
this.dagManager = dagManager;
this.flowCatalog = flowCatalog;
this.orchestrator = orchestrator;
this.dagManagementStateStore = dagManagementStateStore;
this.isMultiActiveSchedulerEnabled = isMultiActiveSchedulerEnabled;
+ this.dagProcEngineMetrics = dagProcEngineMetrics;
}
private DagActionStoreChangeMonitor createDagActionStoreMonitor() {
@@ -68,7 +72,7 @@ public class DagActionStoreChangeMonitorFactory implements
Provider<DagActionSto
int numThreads = ConfigUtils.getInt(dagActionStoreChangeConfig,
DAG_ACTION_STORE_CHANGE_MONITOR_NUM_THREADS_KEY, 5);
return new DagActionStoreChangeMonitor(topic, dagActionStoreChangeConfig,
this.dagManager, numThreads, flowCatalog,
- orchestrator, dagManagementStateStore, isMultiActiveSchedulerEnabled);
+ orchestrator, dagManagementStateStore, isMultiActiveSchedulerEnabled,
dagProcEngineMetrics);
}
@Override
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
index 35e4dafad8..7e2f87ed43 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
@@ -31,6 +31,7 @@ import
org.apache.gobblin.service.modules.orchestration.DagActionStore;
import org.apache.gobblin.service.modules.orchestration.DagManagement;
import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
/**
@@ -47,10 +48,12 @@ public class DagManagementDagActionStoreChangeMonitor
extends DagActionStoreChan
// client itself to determine all Kafka related information dynamically
rather than through the config.
public DagManagementDagActionStoreChangeMonitor(Config config, int
numThreads,
FlowCatalog flowCatalog, Orchestrator orchestrator,
DagManagementStateStore dagManagementStateStore,
- boolean isMultiActiveSchedulerEnabled, DagManagement dagManagement,
DagActionReminderScheduler dagActionReminderScheduler) {
+ boolean isMultiActiveSchedulerEnabled, DagManagement dagManagement,
+ DagActionReminderScheduler dagActionReminderScheduler,
DagProcessingEngineMetrics dagProcEngineMetrics) {
// DagManager is only needed in the `handleDagAction` method of its parent
class and not needed in this class,
// so we are passing a null value for DagManager to its parent class.
- super("", config, null, numThreads, flowCatalog, orchestrator,
dagManagementStateStore, isMultiActiveSchedulerEnabled);
+ super("", config, null, numThreads, flowCatalog, orchestrator,
dagManagementStateStore,
+ isMultiActiveSchedulerEnabled, dagProcEngineMetrics);
this.dagManagement = dagManagement;
this.dagActionReminderScheduler = dagActionReminderScheduler;
}
@@ -64,6 +67,7 @@ public class DagManagementDagActionStoreChangeMonitor extends
DagActionStoreChan
switch (operation) {
case "INSERT":
handleDagAction(dagAction, false);
+ this.dagProcEngineMetrics.markDagActionsObserved(dagActionType);
break;
case "UPDATE":
log.warn("Received an UPDATE action to the DagActionStore when
values in this store are never supposed to be "
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
index a55286e17d..fc0d028f60 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
@@ -33,6 +33,7 @@ import
org.apache.gobblin.service.modules.orchestration.DagManagement;
import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import org.apache.gobblin.util.ConfigUtils;
@@ -50,12 +51,13 @@ public class
DagManagementDagActionStoreChangeMonitorFactory implements Provider
private final boolean isMultiActiveSchedulerEnabled;
private final DagManagement dagManagement;
private final DagActionReminderScheduler dagActionReminderScheduler;
+ private final DagProcessingEngineMetrics dagProcEngineMetrics;
@Inject
public DagManagementDagActionStoreChangeMonitorFactory(Config config,
DagManager dagManager, FlowCatalog flowCatalog,
Orchestrator orchestrator, DagManagementStateStore
dagManagementStateStore, DagManagement dagManagement,
@Named(InjectionNames.MULTI_ACTIVE_SCHEDULER_ENABLED) boolean
isMultiActiveSchedulerEnabled,
- DagActionReminderScheduler dagActionReminderScheduler) {
+ DagActionReminderScheduler dagActionReminderScheduler,
DagProcessingEngineMetrics dagProcEngineMetrics) {
this.config = Objects.requireNonNull(config);
this.flowCatalog = flowCatalog;
this.orchestrator = orchestrator;
@@ -63,6 +65,7 @@ public class DagManagementDagActionStoreChangeMonitorFactory
implements Provider
this.isMultiActiveSchedulerEnabled = isMultiActiveSchedulerEnabled;
this.dagManagement = dagManagement;
this.dagActionReminderScheduler = dagActionReminderScheduler;
+ this.dagProcEngineMetrics = dagProcEngineMetrics;
}
private DagManagementDagActionStoreChangeMonitor
createDagActionStoreMonitor() {
@@ -73,7 +76,7 @@ public class DagManagementDagActionStoreChangeMonitorFactory
implements Provider
return new
DagManagementDagActionStoreChangeMonitor(dagActionStoreChangeConfig,
numThreads, flowCatalog, orchestrator, dagManagementStateStore,
isMultiActiveSchedulerEnabled, this.dagManagement,
- this.dagActionReminderScheduler);
+ this.dagActionReminderScheduler, dagProcEngineMetrics);
}
@Override
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
index 5ce3b33789..af8acb7be9 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.service.modules.orchestration;
import java.io.IOException;
import java.util.Optional;
+import
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import org.junit.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -68,10 +69,10 @@ public class DagManagementTaskStreamImplTest {
this.dagManagementTaskStream =
new DagManagementTaskStreamImpl(config,
Optional.of(mock(DagActionStore.class)),
mock(MultiActiveLeaseArbiter.class),
Optional.of(mock(DagActionReminderScheduler.class)),
- false, mock(DagManagementStateStore.class));
+ false, mock(DagManagementStateStore.class),
mock(DagProcessingEngineMetrics.class));
this.dagProcFactory = new DagProcFactory(null);
this.dagProcEngineThread = new DagProcessingEngine.DagProcEngineThread(
- this.dagManagementTaskStream, this.dagProcFactory,
dagManagementStateStore, 0);
+ this.dagManagementTaskStream, this.dagProcFactory,
dagManagementStateStore, mock(DagProcessingEngineMetrics.class), 0);
}
@AfterClass(alwaysRun = true)
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
index 342b379f4a..008dafa1ea 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
@@ -26,6 +26,7 @@ import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -89,7 +90,7 @@ public class DagManagerFlowTest {
.addPrimitive("MysqlDagActionStore." +
ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TABLE)
.build();
- dagActionStore = new MysqlDagActionStore(config);
+ dagActionStore = new MysqlDagActionStore(config,
Mockito.mock(DagProcessingEngineMetrics.class));
dagActionStore.addFlowDagAction(flowGroup, flowName, flowExecutionId,
DagActionStore.DagActionType.KILL);
dagActionStore.addFlowDagAction(flowGroup, flowName, flowExecutionId_2,
DagActionStore.DagActionType.RESUME);
dagManager = new MockedDagManager(ConfigUtils.propertiesToConfig(props));
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
index baf4bfabbb..a5a4f168ec 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
@@ -22,6 +22,7 @@ import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
+import
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -56,6 +57,7 @@ public class DagProcessingEngineTest {
private DagProcFactory dagProcFactory;
// Field is static because it's used to instantiate every MockedDagTask
private static MySqlDagManagementStateStore dagManagementStateStore;
+ private static DagProcessingEngineMetrics mockedDagProcEngineMetrics;
private ITestMetastoreDatabase testMetastoreDatabase;
static LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus;
@@ -81,18 +83,19 @@ public class DagProcessingEngineTest {
doReturn(true).when(dagActionStore).deleteDagAction(any());
dagManagementTaskStream =
new DagManagementTaskStreamImpl(config,
Optional.of(mock(DagActionStore.class)),
- mock(MultiActiveLeaseArbiter.class),
Optional.of(mock(DagActionReminderScheduler.class)), false,
- dagManagementStateStore);
+ mock(MultiActiveLeaseArbiter.class),
Optional.of(mock(DagActionReminderScheduler.class)),
+ false, dagManagementStateStore,
Mockito.mock(DagProcessingEngineMetrics.class));
this.dagProcFactory = new DagProcFactory(null);
DagProcessingEngine.DagProcEngineThread dagProcEngineThread =
new DagProcessingEngine.DagProcEngineThread(dagManagementTaskStream,
this.dagProcFactory,
- dagManagementStateStore, 0);
+ dagManagementStateStore, mock(DagProcessingEngineMetrics.class),
0);
this.dagTaskStream = spy(new MockedDagTaskStream());
DagProcessingEngine dagProcessingEngine =
new DagProcessingEngine(config, Optional.ofNullable(dagTaskStream),
Optional.ofNullable(this.dagProcFactory),
- Optional.ofNullable(dagManagementStateStore), 100000L);
+ Optional.ofNullable(dagManagementStateStore), 100000L,
mock(DagProcessingEngineMetrics.class));
dagProcessingEngine.startAsync();
+ this.mockedDagProcEngineMetrics = mock(DagProcessingEngineMetrics.class);
}
@AfterClass(alwaysRun = true)
@@ -129,7 +132,7 @@ public class DagProcessingEngineTest {
private final boolean isBad;
public MockedDagTask(DagActionStore.DagAction dagAction, boolean isBad) {
- super(dagAction, leaseObtainedStatus,
DagProcessingEngineTest.dagManagementStateStore);
+ super(dagAction, leaseObtainedStatus,
DagProcessingEngineTest.dagManagementStateStore, mockedDagProcEngineMetrics);
this.isBad = isBad;
}
@@ -158,7 +161,8 @@ public class DagProcessingEngineTest {
}
@Override
- protected void act(DagManagementStateStore dagManagementStateStore, Void
state) {
+ protected void act(DagManagementStateStore dagManagementStateStore, Void
state,
+ DagProcessingEngineMetrics dagProcEngineMetrics) {
if (this.isBad) {
throw new RuntimeException("Simulating an exception!");
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
index 6137f91d10..692876b1e6 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
@@ -21,6 +21,8 @@ import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
+import
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
+import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -66,7 +68,7 @@ public class MysqlDagActionStoreTest {
.addPrimitive("MysqlDagActionStore." +
ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, PASSWORD)
.addPrimitive("MysqlDagActionStore." +
ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TABLE)
.build();
- return new MysqlDagActionStore(config);
+ return new MysqlDagActionStore(config,
Mockito.mock(DagProcessingEngineMetrics.class));
}
@Test
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
index ac4cd3c49f..2e64b71c40 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
@@ -22,6 +22,7 @@ import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.tuple.Pair;
+import
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.testng.annotations.AfterClass;
@@ -57,10 +58,12 @@ import static org.mockito.Mockito.spy;
public class EnforceDeadlineDagProcsTest {
private ITestMetastoreDatabase testMetastoreDatabase;
private final MockedStatic<GobblinServiceManager>
mockedGobblinServiceManager = Mockito.mockStatic(GobblinServiceManager.class);
+ private DagProcessingEngineMetrics mockedDagProcEngineMetrics;
@BeforeClass
public void setUp() throws Exception {
this.testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+ this.mockedDagProcEngineMetrics =
Mockito.mock(DagProcessingEngineMetrics.class);
}
@AfterClass(alwaysRun = true)
@@ -103,8 +106,9 @@ public class EnforceDeadlineDagProcsTest {
EnforceJobStartDeadlineDagProc enforceJobStartDeadlineDagProc = new
EnforceJobStartDeadlineDagProc(
new EnforceJobStartDeadlineDagTask(new
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
- "job0", DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE),
null, dagManagementStateStore));
- enforceJobStartDeadlineDagProc.process(dagManagementStateStore);
+ "job0", DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE),
null,
+ dagManagementStateStore, mockedDagProcEngineMetrics));
+ enforceJobStartDeadlineDagProc.process(dagManagementStateStore,
mockedDagProcEngineMetrics);
int expectedNumOfDeleteDagNodeStates = 1; // the one dag node
corresponding to the EnforceStartDeadlineDagProc
Mockito.verify(specProducers.get(0), Mockito.times(1)).cancelJob(any(),
any());
@@ -145,8 +149,9 @@ public class EnforceDeadlineDagProcsTest {
EnforceJobStartDeadlineDagProc enforceJobStartDeadlineDagProc = new
EnforceJobStartDeadlineDagProc(
new EnforceJobStartDeadlineDagTask(new
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
- "job0", DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE),
null, dagManagementStateStore));
- enforceJobStartDeadlineDagProc.process(dagManagementStateStore);
+ "job0", DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE),
null,
+ dagManagementStateStore, mockedDagProcEngineMetrics));
+ enforceJobStartDeadlineDagProc.process(dagManagementStateStore,
mockedDagProcEngineMetrics);
// no job cancelled because we simulated (by not adding) missing dag action
specProducers.forEach(sp -> Mockito.verify(sp,
Mockito.never()).cancelJob(any(), any()));
@@ -186,8 +191,9 @@ public class EnforceDeadlineDagProcsTest {
EnforceFlowFinishDeadlineDagProc enforceFlowFinishDeadlineDagProc = new
EnforceFlowFinishDeadlineDagProc(
new EnforceFlowFinishDeadlineDagTask(new
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
- "job0",
DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE), null,
dagManagementStateStore));
- enforceFlowFinishDeadlineDagProc.process(dagManagementStateStore);
+ "job0",
DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE), null,
+ dagManagementStateStore, mockedDagProcEngineMetrics));
+ enforceFlowFinishDeadlineDagProc.process(dagManagementStateStore,
mockedDagProcEngineMetrics);
specProducers.forEach(sp -> Mockito.verify(sp,
Mockito.times(1)).cancelJob(any(), any()));
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
index dbe17d3362..bc53139b31 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.ImmutablePair;
+import
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -63,12 +64,14 @@ import static org.mockito.Mockito.spy;
public class KillDagProcTest {
private MySqlDagManagementStateStore dagManagementStateStore;
private ITestMetastoreDatabase testDb;
+ private DagProcessingEngineMetrics mockedDagProcEngineMetrics;
@BeforeClass
public void setUp() throws Exception {
this.testDb = TestMetastoreDatabaseFactory.get();
this.dagManagementStateStore =
spy(MySqlDagManagementStateStoreTest.getDummyDMSS(this.testDb));
LaunchDagProcTest.mockDMSSCommonBehavior(this.dagManagementStateStore);
+ this.mockedDagProcEngineMetrics =
Mockito.mock(DagProcessingEngineMetrics.class);
}
@AfterClass(alwaysRun = true)
@@ -93,8 +96,8 @@ public class KillDagProcTest {
LaunchDagProc launchDagProc = new LaunchDagProc(new LaunchDagTask(new
DagActionStore.DagAction("fg", "flow1",
flowExecutionId, MysqlDagActionStore.NO_JOB_NAME_DEFAULT,
DagActionStore.DagActionType.LAUNCH),
- null, this.dagManagementStateStore), flowCompilationValidationHelper);
- launchDagProc.process(this.dagManagementStateStore);
+ null, this.dagManagementStateStore, mockedDagProcEngineMetrics),
flowCompilationValidationHelper);
+ launchDagProc.process(this.dagManagementStateStore,
this.mockedDagProcEngineMetrics);
List<SpecProducer<Spec>> specProducers = dag.getNodes().stream().map(n -> {
try {
@@ -106,8 +109,8 @@ public class KillDagProcTest {
KillDagProc killDagProc = new KillDagProc(new KillDagTask(new
DagActionStore.DagAction("fg", "flow1",
flowExecutionId, MysqlDagActionStore.NO_JOB_NAME_DEFAULT,
DagActionStore.DagActionType.KILL),
- null, this.dagManagementStateStore));
- killDagProc.process(this.dagManagementStateStore);
+ null, this.dagManagementStateStore, mockedDagProcEngineMetrics));
+ killDagProc.process(this.dagManagementStateStore,
this.mockedDagProcEngineMetrics);
long cancelJobCount = specProducers.stream()
.mapToLong(p -> Mockito.mockingDetails(p)
@@ -139,8 +142,8 @@ public class KillDagProcTest {
LaunchDagProc launchDagProc = new LaunchDagProc(new LaunchDagTask(new
DagActionStore.DagAction("fg", "flow2",
flowExecutionId, MysqlDagActionStore.NO_JOB_NAME_DEFAULT,
DagActionStore.DagActionType.LAUNCH),
- null, this.dagManagementStateStore), flowCompilationValidationHelper);
- launchDagProc.process(this.dagManagementStateStore);
+ null, this.dagManagementStateStore, this.mockedDagProcEngineMetrics),
flowCompilationValidationHelper);
+ launchDagProc.process(this.dagManagementStateStore,
this.mockedDagProcEngineMetrics);
List<SpecProducer<Spec>> specProducers = dag.getNodes().stream().map(n -> {
try {
@@ -152,8 +155,8 @@ public class KillDagProcTest {
KillDagProc killDagProc = new KillDagProc(new KillDagTask(new
DagActionStore.DagAction("fg", "flow2",
flowExecutionId, "job2", DagActionStore.DagActionType.KILL),
- null, this.dagManagementStateStore));
- killDagProc.process(this.dagManagementStateStore);
+ null, this.dagManagementStateStore, this.mockedDagProcEngineMetrics));
+ killDagProc.process(this.dagManagementStateStore,
this.mockedDagProcEngineMetrics);
long cancelJobCount = specProducers.stream()
.mapToLong(p -> Mockito.mockingDetails(p)
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
index 16c1bf8ffe..4027a11c7e 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
@@ -23,6 +23,7 @@ import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
+import
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import org.apache.hadoop.fs.Path;
import org.mockito.Mockito;
import org.testng.annotations.AfterClass;
@@ -70,6 +71,7 @@ import static org.mockito.Mockito.spy;
public class LaunchDagProcTest {
private ITestMetastoreDatabase testMetastoreDatabase;
private MySqlDagManagementStateStore dagManagementStateStore;
+ private DagProcessingEngineMetrics mockedDagProcEngineMetrics;
@BeforeClass
public void setUp() throws Exception {
@@ -83,6 +85,7 @@ public class LaunchDagProcTest {
public void resetDMSS() throws Exception {
this.dagManagementStateStore =
spy(MySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
mockDMSSCommonBehavior(this.dagManagementStateStore);
+ this.mockedDagProcEngineMetrics =
Mockito.mock(DagProcessingEngineMetrics.class);
}
@AfterClass(alwaysRun = true)
@@ -107,10 +110,11 @@ public class LaunchDagProcTest {
List<SpecProducer<Spec>> specProducers =
ReevaluateDagProcTest.getDagSpecProducers(dag);
LaunchDagProc launchDagProc = new LaunchDagProc(
new LaunchDagTask(new DagActionStore.DagAction(flowGroup, flowName,
flowExecutionId, "job0",
- DagActionStore.DagActionType.LAUNCH), null,
this.dagManagementStateStore),
+ DagActionStore.DagActionType.LAUNCH), null,
this.dagManagementStateStore,
+ this.mockedDagProcEngineMetrics),
flowCompilationValidationHelper);
- launchDagProc.process(this.dagManagementStateStore);
+ launchDagProc.process(this.dagManagementStateStore,
mockedDagProcEngineMetrics);
int numOfLaunchedJobs = 1; // = number of start nodes
Mockito.verify(specProducers.get(0), Mockito.times(1)).addSpec(any());
@@ -137,10 +141,11 @@ public class LaunchDagProcTest {
doReturn(com.google.common.base.Optional.of(dag)).when(flowCompilationValidationHelper).createExecutionPlanIfValid(any());
LaunchDagProc launchDagProc = new LaunchDagProc(
new LaunchDagTask(new DagActionStore.DagAction(flowGroup, flowName,
flowExecutionId,
- "jn", DagActionStore.DagActionType.LAUNCH), null,
this.dagManagementStateStore),
+ "jn", DagActionStore.DagActionType.LAUNCH), null,
this.dagManagementStateStore,
+ this.mockedDagProcEngineMetrics),
flowCompilationValidationHelper);
- launchDagProc.process(this.dagManagementStateStore);
+ launchDagProc.process(this.dagManagementStateStore,
mockedDagProcEngineMetrics);
int numOfLaunchedJobs = 3; // = number of start nodes
// parallel jobs are launched through reevaluate dag action
Mockito.verify(this.dagManagementStateStore,
Mockito.times(numOfLaunchedJobs))
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
index 93f31d1410..9673381b26 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
@@ -23,6 +23,7 @@ import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.ImmutablePair;
+import
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.testng.Assert;
@@ -65,6 +66,7 @@ public class ReevaluateDagProcTest {
private ITestMetastoreDatabase testMetastoreDatabase;
private DagManagementStateStore dagManagementStateStore;
private MockedStatic<GobblinServiceManager> mockedGobblinServiceManager;
+ private DagProcessingEngineMetrics mockedDagProcEngineMetrics;
@BeforeClass
public void setUpClass() throws Exception {
@@ -76,6 +78,7 @@ public class ReevaluateDagProcTest {
public void setUp() throws Exception {
this.dagManagementStateStore =
spy(MySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
LaunchDagProcTest.mockDMSSCommonBehavior(dagManagementStateStore);
+ this.mockedDagProcEngineMetrics =
Mockito.mock(DagProcessingEngineMetrics.class);
}
@AfterClass(alwaysRun = true)
@@ -114,8 +117,9 @@ public class ReevaluateDagProcTest {
ReevaluateDagProc
reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new
DagActionStore.DagAction(flowGroup, flowName,
- flowExecutionId, "job0", DagActionStore.DagActionType.REEVALUATE),
null, dagManagementStateStore));
- reEvaluateDagProc.process(dagManagementStateStore);
+ flowExecutionId, "job0", DagActionStore.DagActionType.REEVALUATE),
null,
+ dagManagementStateStore, mockedDagProcEngineMetrics));
+ reEvaluateDagProc.process(dagManagementStateStore,
mockedDagProcEngineMetrics);
// next job is sent to spec producer
Mockito.verify(specProducers.get(1), Mockito.times(1)).addSpec(any());
// there are two invocations, one after setting status and other after
sending new job to specProducer
@@ -163,8 +167,9 @@ public class ReevaluateDagProcTest {
ReevaluateDagProc
reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new
DagActionStore.DagAction(flowGroup, flowName,
- flowExecutionId, "job0", DagActionStore.DagActionType.REEVALUATE),
null, dagManagementStateStore));
- reEvaluateDagProc.process(dagManagementStateStore);
+ flowExecutionId, "job0", DagActionStore.DagActionType.REEVALUATE),
null,
+ dagManagementStateStore, mockedDagProcEngineMetrics));
+ reEvaluateDagProc.process(dagManagementStateStore,
mockedDagProcEngineMetrics);
// no new job to launch for this one job flow
specProducers.forEach(sp -> Mockito.verify(sp,
Mockito.never()).addSpec(any()));
@@ -199,8 +204,8 @@ public class ReevaluateDagProcTest {
ReevaluateDagProc
reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new
DagActionStore.DagAction(flowGroup, flowName,
flowExecutionId, "job0", DagActionStore.DagActionType.REEVALUATE),
null,
- dagManagementStateStore));
- reEvaluateDagProc.process(dagManagementStateStore);
+ dagManagementStateStore, mockedDagProcEngineMetrics));
+ reEvaluateDagProc.process(dagManagementStateStore,
mockedDagProcEngineMetrics);
int numOfLaunchedJobs = 1; // only the current job
// only the current job should have run
@@ -243,10 +248,11 @@ public class ReevaluateDagProcTest {
ReevaluateDagProc
reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new
DagActionStore.DagAction(flowGroup, flowName,
- flowExecutionId, "job3", DagActionStore.DagActionType.REEVALUATE),
null, dagManagementStateStore));
+ flowExecutionId, "job3", DagActionStore.DagActionType.REEVALUATE),
null,
+ dagManagementStateStore, mockedDagProcEngineMetrics));
List<SpecProducer<Spec>> specProducers = getDagSpecProducers(dag);
// process 4th job
- reEvaluateDagProc.process(dagManagementStateStore);
+ reEvaluateDagProc.process(dagManagementStateStore,
mockedDagProcEngineMetrics);
int numOfLaunchedJobs = 2; // = number of jobs that should launch when 4th
job passes, i.e. 5th and 6th job
// parallel jobs are launched through reevaluate dag action
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
index e342b29e77..4de37fad54 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.service.modules.orchestration.proc;
import java.io.IOException;
import java.net.URISyntaxException;
+import
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import org.mockito.Mockito;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -50,12 +51,14 @@ import static org.mockito.Mockito.spy;
public class ResumeDagProcTest {
private MySqlDagManagementStateStore dagManagementStateStore;
private ITestMetastoreDatabase testDb;
+ private DagProcessingEngineMetrics mockedDagProcEngineMetrics;
@BeforeClass
public void setUp() throws Exception {
testDb = TestMetastoreDatabaseFactory.get();
this.dagManagementStateStore =
spy(MySqlDagManagementStateStoreTest.getDummyDMSS(testDb));
LaunchDagProcTest.mockDMSSCommonBehavior(this.dagManagementStateStore);
+ this.mockedDagProcEngineMetrics =
Mockito.mock(DagProcessingEngineMetrics.class);
}
@AfterClass(alwaysRun = true)
@@ -91,8 +94,8 @@ public class ResumeDagProcTest {
ResumeDagProc resumeDagProc = new ResumeDagProc(new ResumeDagTask(new
DagActionStore.DagAction(flowGroup, flowName,
flowExecutionId, MysqlDagActionStore.NO_JOB_NAME_DEFAULT,
DagActionStore.DagActionType.RESUME),
- null, this.dagManagementStateStore));
- resumeDagProc.process(this.dagManagementStateStore);
+ null, this.dagManagementStateStore, mockedDagProcEngineMetrics));
+ resumeDagProc.process(this.dagManagementStateStore,
mockedDagProcEngineMetrics);
int expectedNumOfResumedJobs = 1; // = number of resumed nodes
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTaskTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTaskTest.java
index 0a057b7457..3965f26dd5 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTaskTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTaskTest.java
@@ -65,7 +65,8 @@ public class LaunchDagTaskTest {
*/
@Test
public void concludeRemovesAdhocFlowSpec() throws IOException {
- LaunchDagTask dagTask = new LaunchDagTask(dagAction, leaseObtainedStatus,
dagManagementStateStore);
+ LaunchDagTask dagTask = new LaunchDagTask(dagAction, leaseObtainedStatus,
dagManagementStateStore,
+ Mockito.mock(DagProcessingEngineMetrics.class));
dagTask.conclude();
Mockito.verify(dagManagementStateStore,
Mockito.times(1)).deleteDagAction(any());
Mockito.verify(dagManagementStateStore,
Mockito.times(1)).removeFlowSpec(any(), any(), anyBoolean());