This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 318c66180 [GOBBLIN-1910] part one of the changes that dag refactoring
will require (#3853)
318c66180 is described below
commit 318c661808d201f72090088aa975f39a06f375eb
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Thu Jan 18 10:02:30 2024 -0800
[GOBBLIN-1910] part one of the changes that dag refactoring will require
(#3853)
* part one of the changes that dag refactoring will require
* address review comments
---------
Co-authored-by: Meeth Gala <[email protected]>
Co-authored-by: Arjun Singh Bora <[email protected]>
Co-authored-by: Arjun Singh Bora <[email protected]>
---
.../service/monitoring/JobStatusRetriever.java | 67 +++++-----
.../gobblin/service/modules/flowgraph/Dag.java | 9 +-
.../service/modules/orchestration/DagManager.java | 101 ++++----------
.../modules/orchestration/DagManagerUtils.java | 148 ++++++++++++++++++---
.../modules/orchestration/TimingEventUtils.java | 2 +-
.../service/modules/spec/JobExecutionPlan.java | 3 +
.../monitoring/MysqlJobStatusRetriever.java | 5 +-
7 files changed, 197 insertions(+), 138 deletions(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
index 750834092..bcbcc15fa 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
@@ -103,6 +103,26 @@ public abstract class JobStatusRetriever implements
LatestFlowExecutionIdTracker
* @return deserialize {@link State} into a {@link JobStatus}.
*/
protected JobStatus getJobStatus(State jobState) {
+ JobStatus.JobStatusBuilder jobStatusBuilder =
createJobStatusBuilderFromState(jobState);
+
+ String contextId =
TroubleshooterUtils.getContextIdForJob(jobState.getProperties());
+
+ Supplier<List<Issue>> jobIssues = Suppliers.memoize(() -> {
+ List<Issue> issues;
+ try {
+ issues = issueRepository.getAll(contextId);
+ } catch (TroubleshooterException e) {
+ log.warn("Cannot retrieve job issues", e);
+ issues = Collections.emptyList();
+ }
+ return issues;
+ });
+
+ jobStatusBuilder.issues(jobIssues);
+ return jobStatusBuilder.build();
+ }
+
+ public static JobStatus.JobStatusBuilder
createJobStatusBuilderFromState(State jobState) {
String flowGroup = getFlowGroup(jobState);
String flowName = getFlowName(jobState);
long flowExecutionId = getFlowExecutionId(jobState);
@@ -125,48 +145,35 @@ public abstract class JobStatusRetriever implements
LatestFlowExecutionIdTracker
int progressPercentage =
jobState.getPropAsInt(TimingEvent.JOB_COMPLETION_PERCENTAGE, 0);
long lastProgressEventTime =
jobState.getPropAsLong(TimingEvent.JOB_LAST_PROGRESS_EVENT_TIME, 0);
- String contextId =
TroubleshooterUtils.getContextIdForJob(jobState.getProperties());
-
- Supplier<List<Issue>> jobIssues = Suppliers.memoize(() -> {
- List<Issue> issues;
- try {
- issues = issueRepository.getAll(contextId);
- } catch (TroubleshooterException e) {
- log.warn("Cannot retrieve job issues", e);
- issues = Collections.emptyList();
- }
- return issues;
- });
-
- return
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).flowExecutionId(flowExecutionId).
-
jobName(jobName).jobGroup(jobGroup).jobTag(jobTag).jobExecutionId(jobExecutionId).eventName(eventName).
-
lowWatermark(lowWatermark).highWatermark(highWatermark).orchestratedTime(orchestratedTime).startTime(startTime).endTime(endTime).
-
message(message).processedCount(processedCount).maxAttempts(maxAttempts).currentAttempts(currentAttempts).currentGeneration(currentGeneration).
-
shouldRetry(shouldRetry).progressPercentage(progressPercentage).lastProgressEventTime(lastProgressEventTime).
- issues(jobIssues).build();
+ return
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).flowExecutionId(flowExecutionId).jobName(jobName)
+
.jobGroup(jobGroup).jobTag(jobTag).jobExecutionId(jobExecutionId).eventName(eventName).lowWatermark(lowWatermark)
+
.highWatermark(highWatermark).orchestratedTime(orchestratedTime).startTime(startTime).endTime(endTime)
+
.message(message).processedCount(processedCount).maxAttempts(maxAttempts).currentAttempts(currentAttempts)
+
.currentGeneration(currentGeneration).shouldRetry(shouldRetry).progressPercentage(progressPercentage)
+ .lastProgressEventTime(lastProgressEventTime);
}
- protected final String getFlowGroup(State jobState) {
+ protected static final String getFlowGroup(State jobState) {
return jobState.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
}
- protected final String getFlowName(State jobState) {
+ protected static final String getFlowName(State jobState) {
return jobState.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD);
}
- protected final long getFlowExecutionId(State jobState) {
+ protected static final long getFlowExecutionId(State jobState) {
return
Long.parseLong(jobState.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD));
}
- protected final String getJobGroup(State jobState) {
+ protected static final String getJobGroup(State jobState) {
return jobState.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
}
- protected final String getJobName(State jobState) {
+ protected static final String getJobName(State jobState) {
return jobState.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
}
- protected final long getJobExecutionId(State jobState) {
+ protected static final long getJobExecutionId(State jobState) {
return
Long.parseLong(jobState.getProp(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD,
"0"));
}
@@ -178,7 +185,9 @@ public abstract class JobStatusRetriever implements
LatestFlowExecutionIdTracker
return flowExecutionGroupings.stream().map(exec -> {
List<JobStatus> jobStatuses =
ImmutableList.copyOf(asJobStatuses(exec.getJobStates().stream().sorted(
// rationalized order, to facilitate test assertions
-
Comparator.comparing(this::getJobGroup).thenComparing(this::getJobName).thenComparing(this::getJobExecutionId)
+ Comparator.comparing(JobStatusRetriever::getJobGroup)
+ .thenComparing(JobStatusRetriever::getJobName)
+ .thenComparing(JobStatusRetriever::getJobExecutionId)
).collect(Collectors.toList())));
return new FlowStatus(exec.getFlowName(), exec.getFlowGroup(),
exec.getFlowExecutionId(), jobStatuses.iterator(),
getFlowStatusFromJobStatuses(jobStatuses.iterator()));
@@ -196,10 +205,8 @@ public abstract class JobStatusRetriever implements
LatestFlowExecutionIdTracker
protected List<FlowExecutionJobStateGrouping>
groupByFlowExecutionAndRetainLatest(
String flowGroup, List<State> jobStatusStates, int maxCountPerFlowName) {
- Map<String, Map<Long, List<State>>> statesByFlowExecutionIdByName =
- jobStatusStates.stream().collect(Collectors.groupingBy(
- this::getFlowName,
- Collectors.groupingBy(this::getFlowExecutionId)));
+ Map<String, Map<Long, List<State>>> statesByFlowExecutionIdByName =
jobStatusStates.stream().collect(
+ Collectors.groupingBy(JobStatusRetriever::getFlowName,
Collectors.groupingBy(JobStatusRetriever::getFlowExecutionId)));
return
statesByFlowExecutionIdByName.entrySet().stream().sorted(Map.Entry.comparingByKey()).flatMap(flowNameEntry
-> {
String flowName = flowNameEntry.getKey();
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
index edb19d8d1..388d943aa 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
@@ -17,6 +17,8 @@
package org.apache.gobblin.service.modules.flowgraph;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -26,15 +28,11 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
import lombok.Getter;
import lombok.Setter;
-
import org.apache.gobblin.annotation.Alpha;
+
/**
* An implementation of Dag. Assumes that nodes have unique values. Nodes with
duplicate values will produce
* unpredictable behavior.
@@ -261,6 +259,7 @@ public class Dag<T> {
this.value = value;
}
+
public void addParentNode(DagNode<T> node) {
if (parentNodes == null) {
parentNodes = Lists.newArrayList(node);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index ca38eda0c..9e345d71a 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -271,7 +271,7 @@ public class DagManager extends AbstractIdleService {
}
// Initializes and returns an array of Queue of size numThreads
- private static LinkedBlockingDeque<?>[] initializeDagQueue(int numThreads) {
+ static LinkedBlockingDeque<?>[] initializeDagQueue(int numThreads) {
LinkedBlockingDeque<?>[] queue = new LinkedBlockingDeque[numThreads];
for (int i=0; i< numThreads; i++) {
@@ -324,16 +324,7 @@ public class DagManager extends AbstractIdleService {
throw new IOException("Could not add dag" + dagId + "to queue");
}
if (setStatus) {
- submitEventsAndSetStatus(dag);
- }
- }
-
- private void submitEventsAndSetStatus(Dag<JobExecutionPlan> dag) {
- for (DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
- JobExecutionPlan jobExecutionPlan =
DagManagerUtils.getJobExecutionPlan(dagNode);
- Map<String, String> jobMetadata =
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
- new TimingEvent(eventSubmitter,
TimingEvent.LauncherTimings.JOB_PENDING).stop(jobMetadata);
- jobExecutionPlan.setExecutionStatus(PENDING);
+ DagManagerUtils.submitPendingExecStatus(dag, this.eventSubmitter);
}
}
@@ -642,15 +633,18 @@ public class DagManager extends AbstractIdleService {
*/
private void finishResumingDags() throws IOException {
for (Map.Entry<String, Dag<JobExecutionPlan>> dag :
this.resumingDags.entrySet()) {
- JobStatus flowStatus = pollFlowStatus(dag.getValue());
- if (flowStatus == null ||
!flowStatus.getEventName().equals(PENDING_RESUME.name())) {
+ java.util.Optional<JobStatus> flowStatus =
DagManagerUtils.pollFlowStatus(dag.getValue(), this.jobStatusRetriever,
this.jobStatusPolledTimer);
+ if (!flowStatus.filter(fs ->
fs.getEventName().equals(PENDING_RESUME.name())).isPresent()) {
continue;
}
boolean dagReady = true;
for (DagNode<JobExecutionPlan> node : dag.getValue().getNodes()) {
- JobStatus jobStatus = pollJobStatus(node);
- if (jobStatus == null ||
jobStatus.getEventName().equals(FAILED.name()) ||
jobStatus.getEventName().equals(CANCELLED.name())) {
+ java.util.Optional<JobStatus> jobStatus =
DagManagerUtils.pollJobStatus(node, this.jobStatusRetriever,
this.jobStatusPolledTimer);
+ if (jobStatus.filter(js -> {
+ String jobName = js.getEventName();
+ return jobName.equals(FAILED.name()) ||
jobName.equals(CANCELLED.name());
+ }).isPresent()) {
dagReady = false;
break;
}
@@ -772,7 +766,7 @@ public class DagManager extends AbstractIdleService {
/**
* Proceed the execution of each dag node based on job status.
*/
- private void pollAndAdvanceDag() throws IOException, ExecutionException,
InterruptedException {
+ private void pollAndAdvanceDag() {
Map<String, Set<DagNode<JobExecutionPlan>>> nextSubmitted =
Maps.newHashMap();
List<DagNode<JobExecutionPlan>> nodesToCleanUp = Lists.newArrayList();
@@ -780,7 +774,7 @@ public class DagManager extends AbstractIdleService {
try {
boolean slaKilled = slaKillIfNeeded(node);
- JobStatus jobStatus = pollJobStatus(node);
+ java.util.Optional<JobStatus> jobStatus =
DagManagerUtils.pollJobStatus(node, this.jobStatusRetriever,
this.jobStatusPolledTimer);
boolean killOrphanFlow = killJobIfOrphaned(node, jobStatus);
@@ -815,13 +809,13 @@ public class DagManager extends AbstractIdleService {
break;
}
- if (jobStatus != null && jobStatus.isShouldRetry()) {
+ if (jobStatus.filter(JobStatus::isShouldRetry).isPresent()) {
log.info("Retrying job: {}, current attempts: {}, max attempts:
{}", DagManagerUtils.getFullyQualifiedJobName(node),
- jobStatus.getCurrentAttempts(), jobStatus.getMaxAttempts());
+ jobStatus.get().getCurrentAttempts(),
jobStatus.get().getMaxAttempts());
this.jobToDag.get(node).setFlowEvent(null);
submitJob(node);
}
- } catch (Exception e) {
+ } catch (InterruptedException | IOException | ExecutionException e) {
// Error occurred while processing dag, continue processing other
dags assigned to this thread
log.error(String.format("Exception caught in DagManager while
processing dag %s due to ",
DagManagerUtils.getFullyQualifiedDagName(node)), e);
@@ -850,14 +844,14 @@ public class DagManager extends AbstractIdleService {
* @return true if the total time that the job remains in the ORCHESTRATED
state exceeds
* {@value ConfigurationKeys#GOBBLIN_JOB_START_SLA_TIME}.
*/
- private boolean killJobIfOrphaned(DagNode<JobExecutionPlan> node,
JobStatus jobStatus)
+ private boolean killJobIfOrphaned(DagNode<JobExecutionPlan> node,
java.util.Optional<JobStatus> jobStatus)
throws ExecutionException, InterruptedException {
- if (jobStatus == null) {
+ if (!jobStatus.isPresent()) {
return false;
}
- ExecutionStatus executionStatus = valueOf(jobStatus.getEventName());
+ ExecutionStatus executionStatus =
valueOf(jobStatus.get().getEventName());
long timeOutForJobStart = DagManagerUtils.getJobStartSla(node,
this.defaultJobStartSlaTimeMillis);
- long jobOrchestratedTime = jobStatus.getOrchestratedTime();
+ long jobOrchestratedTime = jobStatus.get().getOrchestratedTime();
if (executionStatus == ORCHESTRATED && System.currentTimeMillis() -
jobOrchestratedTime > timeOutForJobStart) {
log.info("Job {} of flow {} exceeded the job start SLA of {} ms.
Killing the job now...",
DagManagerUtils.getJobName(node),
@@ -875,15 +869,11 @@ public class DagManager extends AbstractIdleService {
}
}
- private ExecutionStatus getJobExecutionStatus(boolean slaKilled, boolean
killOrphanFlow, JobStatus jobStatus) {
+ private ExecutionStatus getJobExecutionStatus(boolean slaKilled, boolean
killOrphanFlow, java.util.Optional<JobStatus> jobStatus) {
if (slaKilled || killOrphanFlow) {
return CANCELLED;
} else {
- if (jobStatus == null) {
- return PENDING;
- } else {
- return valueOf(jobStatus.getEventName());
- }
+ return jobStatus.map(status ->
valueOf(status.getEventName())).orElse(PENDING);
}
}
@@ -932,47 +922,7 @@ public class DagManager extends AbstractIdleService {
return false;
}
- /**
- * Retrieve the {@link JobStatus} from the {@link JobExecutionPlan}.
- */
- private JobStatus pollJobStatus(DagNode<JobExecutionPlan> dagNode) {
- Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
- String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
- String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
- long flowExecutionId =
jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
- String jobGroup = jobConfig.getString(ConfigurationKeys.JOB_GROUP_KEY);
- String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
-
- return pollStatus(flowGroup, flowName, flowExecutionId, jobGroup,
jobName);
- }
-
- /**
- * Retrieve the flow's {@link JobStatus} (i.e. job status with {@link
JobStatusRetriever#NA_KEY} as job name/group) from a dag
- */
- private JobStatus pollFlowStatus(Dag<JobExecutionPlan> dag) {
- if (dag == null || dag.isEmpty()) {
- return null;
- }
- Config jobConfig =
dag.getNodes().get(0).getValue().getJobSpec().getConfig();
- String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
- String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
- long flowExecutionId =
jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
- return pollStatus(flowGroup, flowName, flowExecutionId,
JobStatusRetriever.NA_KEY, JobStatusRetriever.NA_KEY);
- }
-
- private JobStatus pollStatus(String flowGroup, String flowName, long
flowExecutionId, String jobGroup, String jobName) {
- long pollStartTime = System.nanoTime();
- Iterator<JobStatus> jobStatusIterator =
- this.jobStatusRetriever.getJobStatusesForFlowExecution(flowName,
flowGroup, flowExecutionId, jobName, jobGroup);
- Instrumented.updateTimer(this.jobStatusPolledTimer, System.nanoTime() -
pollStartTime, TimeUnit.NANOSECONDS);
-
- if (jobStatusIterator.hasNext()) {
- return jobStatusIterator.next();
- } else {
- return null;
- }
- }
/**
* Submit next set of Dag nodes in the Dag identified by the provided dagId
@@ -1107,11 +1057,6 @@ public class DagManager extends AbstractIdleService {
}
}
- private boolean hasRunningJobs(String dagId) {
- List<DagNode<JobExecutionPlan>> dagNodes = this.dagToJobs.get(dagId);
- return dagNodes != null && !dagNodes.isEmpty();
- }
-
/**
* Perform clean up. Remove a dag from the dagstore if the dag is complete
and update internal state.
*/
@@ -1136,7 +1081,7 @@ public class DagManager extends AbstractIdleService {
deleteJobState(dagId, dagNode);
}
}
- if (!hasRunningJobs(dagId)) {
+ if (!DagManagerUtils.hasRunningJobs(dagId, this.dagToJobs)) {
// Collect all the dagIds that are finished
this.dagIdstoClean.add(dagId);
if (dag.getFlowEvent() == null) {
@@ -1155,8 +1100,8 @@ public class DagManager extends AbstractIdleService {
for (Iterator<String> dagIdIterator = this.dagIdstoClean.iterator();
dagIdIterator.hasNext();) {
String dagId = dagIdIterator.next();
Dag<JobExecutionPlan> dag = this.dags.get(dagId);
- JobStatus flowStatus = pollFlowStatus(dag);
- if (flowStatus != null &&
FlowStatusGenerator.FINISHED_STATUSES.contains(flowStatus.getEventName())) {
+ java.util.Optional<JobStatus> flowStatus =
DagManagerUtils.pollFlowStatus(dag, this.jobStatusRetriever,
this.jobStatusPolledTimer);
+ if (flowStatus.filter(fs ->
FlowStatusGenerator.FINISHED_STATUSES.contains(fs.getEventName())).isPresent())
{
FlowId flowId = DagManagerUtils.getFlowId(dag);
switch(dag.getFlowEvent()) {
case TimingEvent.FlowTimings.FLOW_SUCCEEDED:
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 97201a4aa..06ac24aa7 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -16,9 +16,11 @@
*/
package org.apache.gobblin.service.modules.orchestration;
+import com.google.common.base.Joiner;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -28,14 +30,18 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import com.codahale.metrics.Timer;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecProducer;
@@ -47,17 +53,34 @@ import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
import
org.apache.gobblin.service.modules.orchestration.DagManager.FailureOption;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
import org.apache.gobblin.util.ConfigUtils;
+import static org.apache.gobblin.service.ExecutionStatus.*;
+
public class DagManagerUtils {
static long DEFAULT_FLOW_SLA_MILLIS = TimeUnit.HOURS.toMillis(24);
static String QUOTA_KEY_SEPERATOR = ",";
- static FlowId getFlowId(Dag<JobExecutionPlan> dag) {
+ public static FlowId getFlowId(Dag<JobExecutionPlan> dag) {
return getFlowId(dag.getStartNodes().get(0));
}
+ public static DagActionStore.DagAction
createDagActionFromDag(Dag<JobExecutionPlan> dag, DagActionStore.FlowActionType
flowActionType) {
+ return createDagActionFromDagNode(dag.getStartNodes().get(0),
flowActionType);
+ }
+
+ // todo - dag action object does not have any identifier to tell if it is
for a complete dag or just for one dag node
+ public static DagActionStore.DagAction
createDagActionFromDagNode(DagNode<JobExecutionPlan> dagNode,
DagActionStore.FlowActionType flowActionType) {
+ Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
+ String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+ String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+ String flowExecutionId =
jobConfig.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+ return new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
flowActionType);
+ }
+
static FlowId getFlowId(DagNode<JobExecutionPlan> dagNode) {
Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
@@ -65,7 +88,7 @@ public class DagManagerUtils {
return new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
}
- static long getFlowExecId(Dag<JobExecutionPlan> dag) {
+ public static long getFlowExecId(Dag<JobExecutionPlan> dag) {
return getFlowExecId(dag.getStartNodes().get(0));
}
@@ -94,20 +117,20 @@ public class DagManagerUtils {
private static DagManager.DagId generateDagId(Config jobConfig) {
String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
- long flowExecutionId =
jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+ String flowExecutionId =
jobConfig.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
- return new DagManager.DagId(flowGroup, flowName,
String.valueOf(flowExecutionId));
+ return new DagManager.DagId(flowGroup, flowName, flowExecutionId);
}
- static DagManager.DagId generateDagId(Dag.DagNode<JobExecutionPlan> dagNode)
{
+ public static DagManager.DagId generateDagId(DagNode<JobExecutionPlan>
dagNode) {
return generateDagId(dagNode.getValue().getJobSpec().getConfig());
}
- static DagManager.DagId generateDagId(String flowGroup, String flowName,
long flowExecutionId) {
+ public static DagManager.DagId generateDagId(String flowGroup, String
flowName, long flowExecutionId) {
return generateDagId(flowGroup, flowName, String.valueOf(flowExecutionId));
}
- static DagManager.DagId generateDagId(String flowGroup, String flowName,
String flowExecutionId) {
+ public static DagManager.DagId generateDagId(String flowGroup, String
flowName, String flowExecutionId) {
return new DagManager.DagId(flowGroup, flowName, flowExecutionId);
}
@@ -116,7 +139,7 @@ public class DagManagerUtils {
* @param dag
* @return fully qualified name of the underlying {@link Dag}.
*/
- static String getFullyQualifiedDagName(Dag<JobExecutionPlan> dag) {
+ public static String getFullyQualifiedDagName(Dag<JobExecutionPlan> dag) {
FlowId flowid = getFlowId(dag);
long flowExecutionId = getFlowExecId(dag);
return "(flowGroup: " + flowid.getFlowGroup() + ", flowName: " +
flowid.getFlowName() + ", flowExecutionId: " + flowExecutionId + ")";
@@ -133,7 +156,7 @@ public class DagManagerUtils {
return "(flowGroup: " + flowid.getFlowGroup() + ", flowName: " +
flowid.getFlowName() + ", flowExecutionId: " + flowExecutionId + ")";
}
- static String getJobName(DagNode<JobExecutionPlan> dagNode) {
+ public static String getJobName(DagNode<JobExecutionPlan> dagNode) {
return
dagNode.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY);
}
@@ -142,7 +165,7 @@ public class DagManagerUtils {
* @param dagNode
* @return a fully qualified name of the underlying job.
*/
- static String getFullyQualifiedJobName(DagNode<JobExecutionPlan> dagNode) {
+ public static String getFullyQualifiedJobName(DagNode<JobExecutionPlan>
dagNode) {
Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
String flowGroup = ConfigUtils.getString(jobConfig,
ConfigurationKeys.FLOW_GROUP_KEY, "");
@@ -153,7 +176,7 @@ public class DagManagerUtils {
return "(flowGroup: " + flowGroup + ", flowName: " + flowName + ",
flowExecutionId: " + flowExecutionId + ", jobName: " + jobName + ")";
}
- static JobExecutionPlan getJobExecutionPlan(DagNode<JobExecutionPlan>
dagNode) {
+ public static JobExecutionPlan getJobExecutionPlan(DagNode<JobExecutionPlan>
dagNode) {
return dagNode.getValue();
}
@@ -172,12 +195,12 @@ public class DagManagerUtils {
return dagNode.getValue().getJobSpec().getConfig();
}
- static SpecProducer<Spec> getSpecProducer(DagNode<JobExecutionPlan> dagNode)
+ public static SpecProducer<Spec> getSpecProducer(DagNode<JobExecutionPlan>
dagNode)
throws ExecutionException, InterruptedException {
return dagNode.getValue().getSpecExecutor().getProducer().get();
}
- static ExecutionStatus getExecutionStatus(DagNode<JobExecutionPlan> dagNode)
{
+ public static ExecutionStatus getExecutionStatus(DagNode<JobExecutionPlan>
dagNode) {
return dagNode.getValue().getExecutionStatus();
}
@@ -186,7 +209,7 @@ public class DagManagerUtils {
* identifies each node yet to be executed and for which each of its parent
nodes is in the {@link ExecutionStatus#COMPLETE}
* state.
*/
- static Set<DagNode<JobExecutionPlan>> getNext(Dag<JobExecutionPlan> dag) {
+ public static Set<DagNode<JobExecutionPlan>> getNext(Dag<JobExecutionPlan>
dag) {
Set<DagNode<JobExecutionPlan>> nextNodesToExecute = new HashSet<>();
LinkedList<DagNode<JobExecutionPlan>> nodesToExpand =
Lists.newLinkedList(dag.getStartNodes());
FailureOption failureOption = getFailureOption(dag);
@@ -195,12 +218,11 @@ public class DagManagerUtils {
DagNode<JobExecutionPlan> node = nodesToExpand.poll();
ExecutionStatus executionStatus = getExecutionStatus(node);
boolean addFlag = true;
- if (executionStatus == ExecutionStatus.PENDING || executionStatus ==
ExecutionStatus.PENDING_RETRY
- || executionStatus == ExecutionStatus.PENDING_RESUME) {
+ if (executionStatus == PENDING || executionStatus == PENDING_RETRY ||
executionStatus == PENDING_RESUME) {
//Add a node to be executed next, only if all of its parent nodes are
COMPLETE.
List<DagNode<JobExecutionPlan>> parentNodes = dag.getParents(node);
for (DagNode<JobExecutionPlan> parentNode : parentNodes) {
- if (getExecutionStatus(parentNode) != ExecutionStatus.COMPLETE) {
+ if (getExecutionStatus(parentNode) != COMPLETE) {
addFlag = false;
break;
}
@@ -208,10 +230,10 @@ public class DagManagerUtils {
if (addFlag) {
nextNodesToExecute.add(node);
}
- } else if (executionStatus == ExecutionStatus.COMPLETE) {
+ } else if (executionStatus == COMPLETE) {
//Explore the children of COMPLETED node as next candidates for
execution.
nodesToExpand.addAll(dag.getChildren(node));
- } else if ((executionStatus == ExecutionStatus.FAILED) ||
(executionStatus == ExecutionStatus.CANCELLED)) {
+ } else if ((executionStatus == FAILED) || (executionStatus ==
CANCELLED)) {
switch (failureOption) {
case FINISH_RUNNING:
return new HashSet<>();
@@ -234,7 +256,7 @@ public class DagManagerUtils {
return FailureOption.valueOf(failureOption);
}
- static String getSpecExecutorUri(DagNode<JobExecutionPlan> dagNode) {
+ public static String getSpecExecutorUri(DagNode<JobExecutionPlan> dagNode) {
return dagNode.getValue().getSpecExecutor().getUri().toString();
}
@@ -252,7 +274,7 @@ public class DagManagerUtils {
/**
* Increment the value of {@link JobExecutionPlan#currentAttempts}
*/
- static void incrementJobAttempt(DagNode<JobExecutionPlan> dagNode) {
+ public static void incrementJobAttempt(DagNode<JobExecutionPlan> dagNode) {
dagNode.getValue().setCurrentAttempts(dagNode.getValue().getCurrentAttempts() +
1);
}
@@ -261,7 +283,7 @@ public class DagManagerUtils {
* This method is not thread safe, we achieve correctness by making sure
* one dag will only be handled in the same DagManagerThread
*/
- static void incrementJobGeneration(DagNode<JobExecutionPlan> dagNode) {
+ public static void incrementJobGeneration(DagNode<JobExecutionPlan> dagNode)
{
dagNode.getValue().setCurrentGeneration(dagNode.getValue().getCurrentGeneration()
+ 1);
}
@@ -365,4 +387,86 @@ public class DagManagerUtils {
throw new RuntimeException("Could not process requesters due to ", e);
}
}
+
+ /**
+ * Set the execution status of all the {@link DagNode}s of the provided dag.
+ * Also emits a {@link TimingEvent.LauncherTimings#JOB_PENDING} for each of
those dag nodes.
+ * @param dag dag whose status is to change.
+ * @param eventSubmitter event submitter that will send the event.
+ */
+ public static void submitPendingExecStatus(Dag<JobExecutionPlan> dag,
EventSubmitter eventSubmitter) {
+ for (DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
+ JobExecutionPlan jobExecutionPlan =
DagManagerUtils.getJobExecutionPlan(dagNode);
+ Map<String, String> jobMetadata =
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
+
eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_PENDING).stop(jobMetadata);
+ jobExecutionPlan.setExecutionStatus(PENDING);
+ }
+ }
+
+ /**
+ * Retrieve the {@link JobStatus} from the {@link JobExecutionPlan}.
+ */
+ public static java.util.Optional<JobStatus>
pollJobStatus(DagNode<JobExecutionPlan> dagNode, JobStatusRetriever
jobStatusRetriever, Timer jobStatusPolledTimer) {
+ Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
+ String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+ String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+ long flowExecutionId =
jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+ String jobGroup = jobConfig.getString(ConfigurationKeys.JOB_GROUP_KEY);
+ String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
+
+ return pollStatus(flowGroup, flowName, flowExecutionId, jobGroup, jobName,
jobStatusRetriever, jobStatusPolledTimer);
+ }
+
+ /**
+ * Retrieve the flow's {@link JobStatus} (i.e. job status with {@link
JobStatusRetriever#NA_KEY} as job name/group) from a dag
+ * Returns empty optional if dag is null/empty or job status is not found.
+ */
+ public static java.util.Optional<JobStatus>
pollFlowStatus(Dag<JobExecutionPlan> dag, JobStatusRetriever
jobStatusRetriever, Timer jobStatusPolledTimer) {
+ if (dag == null || dag.isEmpty()) {
+ return java.util.Optional.empty();
+ }
+ Config jobConfig =
dag.getNodes().get(0).getValue().getJobSpec().getConfig();
+ String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+ String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+ long flowExecutionId =
jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+
+ return pollStatus(flowGroup, flowName, flowExecutionId,
JobStatusRetriever.NA_KEY, JobStatusRetriever.NA_KEY, jobStatusRetriever,
jobStatusPolledTimer);
+ }
+
+ /**
+ * Retrieve the flow's {@link JobStatus} and update the timer if
jobStatusPolledTimer is present.
+ * Returns empty optional if job status is not found.
+ */
+ public static java.util.Optional<JobStatus> pollStatus(String flowGroup,
String flowName, long flowExecutionId, String jobGroup, String jobName,
+ JobStatusRetriever
jobStatusRetriever, Timer jobStatusPolledTimer) {
+ long pollStartTime = System.nanoTime();
+ Iterator<JobStatus> jobStatusIterator =
+ jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup,
flowExecutionId, jobName, jobGroup);
+ Instrumented.updateTimer(jobStatusPolledTimer, System.nanoTime() -
pollStartTime, TimeUnit.NANOSECONDS);
+
+ if (jobStatusIterator.hasNext()) {
+ return java.util.Optional.of(jobStatusIterator.next());
+ } else {
+ return java.util.Optional.empty();
+ }
+ }
+
+ public static boolean hasRunningJobs(String dagId, Map<String,
LinkedList<DagNode<JobExecutionPlan>>> dagToJobs) {
+ List<DagNode<JobExecutionPlan>> dagNodes = dagToJobs.get(dagId);
+ return dagNodes != null && !dagNodes.isEmpty();
+ }
+
+ public static String calcJobId(Config jobConfig) {
+ String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+ String flowName =jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+ long flowExecutionId = ConfigUtils.getLong(jobConfig,
ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 0L);
+ String jobGroup = ConfigUtils.getString(jobConfig,
ConfigurationKeys.JOB_GROUP_KEY, "");
+ String jobName = ConfigUtils.getString(jobConfig,
ConfigurationKeys.JOB_NAME_KEY, "");
+
+ return calcJobId(flowGroup, flowName, flowExecutionId, jobGroup, jobName);
+ }
+
+ public static String calcJobId(String flowGroup, String flowName, long
flowExecutionId, String jobGroup, String jobName) {
+ return Joiner.on("_").join(flowGroup, flowName, flowExecutionId, jobGroup,
jobName);
+ }
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
index 7947ed0ae..cddd9bb84 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
@@ -55,7 +55,7 @@ public class TimingEventUtils {
return
Optional.ofNullable(flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD));
}
- static Map<String, String> getJobMetadata(Map<String, String> flowMetadata,
JobExecutionPlan jobExecutionPlan) {
+ public static Map<String, String> getJobMetadata(Map<String, String>
flowMetadata, JobExecutionPlan jobExecutionPlan) {
Map<String, String> jobMetadata = Maps.newHashMap();
JobSpec jobSpec = jobExecutionPlan.getJobSpec();
SpecExecutor specExecutor = jobExecutionPlan.getSpecExecutor();
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index 2d05c93af..535179ee8 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -47,6 +47,7 @@ import org.apache.gobblin.service.ExecutionStatus;
import
org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
import org.apache.gobblin.util.ConfigUtils;
import static
org.apache.gobblin.runtime.AbstractJobLauncher.GOBBLIN_JOB_TEMPLATE_KEY;
@@ -72,6 +73,7 @@ public class JobExecutionPlan {
private int currentAttempts = 0;
private Optional<Future> jobFuture = Optional.absent();
private long flowStartTime = 0L;
+ private final String id;
public static class Factory {
public static final String JOB_NAME_COMPONENT_SEPARATION_CHAR = "_";
@@ -233,6 +235,7 @@ public class JobExecutionPlan {
this.jobSpec = jobSpec;
this.specExecutor = specExecutor;
this.maxAttempts = ConfigUtils.getInt(jobSpec.getConfig(),
JOB_MAX_ATTEMPTS, 1);
+ this.id = DagManagerUtils.calcJobId(jobSpec.getConfig());
}
/**
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
index 6b8d1e251..d323d710c 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
@@ -112,8 +112,9 @@ public class MysqlJobStatusRetriever extends
JobStatusRetriever {
}
private List<Long> getLatestExecutionIds(List<State> jobStatusStates, int
count) {
- // `distinct()`, to avoid each flow execution ID replicating as many times
as it has child jobs
- Iterator<Long> flowExecutionIds =
jobStatusStates.stream().map(this::getFlowExecutionId).distinct().iterator();
+ // `distinct()`, to avoid each flow execution ID replicating as many times
as it
+ // has child jobs
+ Iterator<Long> flowExecutionIds =
jobStatusStates.stream().map(JobStatusRetriever::getFlowExecutionId).distinct().iterator();
return Ordering.<Long>natural().greatestOf(flowExecutionIds, count);
}
}