This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 318c66180 [GOBBLIN-1910] part one of the changes that dag refactoring 
will require (#3853)
318c66180 is described below

commit 318c661808d201f72090088aa975f39a06f375eb
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Thu Jan 18 10:02:30 2024 -0800

    [GOBBLIN-1910] part one of the changes that dag refactoring will require 
(#3853)
    
    * part one of the changes that dag refactoring will require
    * address review comments
    ---------
    Co-authored-by: Meeth Gala <[email protected]>
    Co-authored-by: Arjun Singh Bora <[email protected]>
    Co-authored-by: Arjun Singh Bora <[email protected]>
---
 .../service/monitoring/JobStatusRetriever.java     |  67 +++++-----
 .../gobblin/service/modules/flowgraph/Dag.java     |   9 +-
 .../service/modules/orchestration/DagManager.java  | 101 ++++----------
 .../modules/orchestration/DagManagerUtils.java     | 148 ++++++++++++++++++---
 .../modules/orchestration/TimingEventUtils.java    |   2 +-
 .../service/modules/spec/JobExecutionPlan.java     |   3 +
 .../monitoring/MysqlJobStatusRetriever.java        |   5 +-
 7 files changed, 197 insertions(+), 138 deletions(-)

diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
index 750834092..bcbcc15fa 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
@@ -103,6 +103,26 @@ public abstract class JobStatusRetriever implements 
LatestFlowExecutionIdTracker
    * @return deserialize {@link State} into a {@link JobStatus}.
    */
   protected JobStatus getJobStatus(State jobState) {
+    JobStatus.JobStatusBuilder jobStatusBuilder = 
createJobStatusBuilderFromState(jobState);
+
+    String contextId = 
TroubleshooterUtils.getContextIdForJob(jobState.getProperties());
+
+    Supplier<List<Issue>> jobIssues = Suppliers.memoize(() -> {
+      List<Issue> issues;
+      try {
+        issues = issueRepository.getAll(contextId);
+      } catch (TroubleshooterException e) {
+        log.warn("Cannot retrieve job issues", e);
+        issues = Collections.emptyList();
+      }
+      return issues;
+    });
+
+    jobStatusBuilder.issues(jobIssues);
+    return jobStatusBuilder.build();
+  }
+
+  public static JobStatus.JobStatusBuilder 
createJobStatusBuilderFromState(State jobState) {
     String flowGroup = getFlowGroup(jobState);
     String flowName = getFlowName(jobState);
     long flowExecutionId = getFlowExecutionId(jobState);
@@ -125,48 +145,35 @@ public abstract class JobStatusRetriever implements 
LatestFlowExecutionIdTracker
     int progressPercentage = 
jobState.getPropAsInt(TimingEvent.JOB_COMPLETION_PERCENTAGE, 0);
     long lastProgressEventTime = 
jobState.getPropAsLong(TimingEvent.JOB_LAST_PROGRESS_EVENT_TIME, 0);
 
-    String contextId = 
TroubleshooterUtils.getContextIdForJob(jobState.getProperties());
-
-    Supplier<List<Issue>> jobIssues = Suppliers.memoize(() -> {
-      List<Issue> issues;
-      try {
-        issues = issueRepository.getAll(contextId);
-      } catch (TroubleshooterException e) {
-        log.warn("Cannot retrieve job issues", e);
-        issues = Collections.emptyList();
-      }
-      return issues;
-    });
-
-    return 
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).flowExecutionId(flowExecutionId).
-        
jobName(jobName).jobGroup(jobGroup).jobTag(jobTag).jobExecutionId(jobExecutionId).eventName(eventName).
-        
lowWatermark(lowWatermark).highWatermark(highWatermark).orchestratedTime(orchestratedTime).startTime(startTime).endTime(endTime).
-        
message(message).processedCount(processedCount).maxAttempts(maxAttempts).currentAttempts(currentAttempts).currentGeneration(currentGeneration).
-        
shouldRetry(shouldRetry).progressPercentage(progressPercentage).lastProgressEventTime(lastProgressEventTime).
-        issues(jobIssues).build();
+    return 
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).flowExecutionId(flowExecutionId).jobName(jobName)
+        
.jobGroup(jobGroup).jobTag(jobTag).jobExecutionId(jobExecutionId).eventName(eventName).lowWatermark(lowWatermark)
+        
.highWatermark(highWatermark).orchestratedTime(orchestratedTime).startTime(startTime).endTime(endTime)
+        
.message(message).processedCount(processedCount).maxAttempts(maxAttempts).currentAttempts(currentAttempts)
+        
.currentGeneration(currentGeneration).shouldRetry(shouldRetry).progressPercentage(progressPercentage)
+        .lastProgressEventTime(lastProgressEventTime);
   }
 
-  protected final String getFlowGroup(State jobState) {
+  protected static final String getFlowGroup(State jobState) {
     return jobState.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
   }
 
-  protected final String getFlowName(State jobState) {
+  protected static final String getFlowName(State jobState) {
     return jobState.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD);
   }
 
-  protected final long getFlowExecutionId(State jobState) {
+  protected static final long getFlowExecutionId(State jobState) {
     return 
Long.parseLong(jobState.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD));
   }
 
-  protected final String getJobGroup(State jobState) {
+  protected static final String getJobGroup(State jobState) {
     return jobState.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
   }
 
-  protected final String getJobName(State jobState) {
+  protected static final String getJobName(State jobState) {
     return jobState.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
   }
 
-  protected final long getJobExecutionId(State jobState) {
+  protected static final long getJobExecutionId(State jobState) {
     return 
Long.parseLong(jobState.getProp(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD,
 "0"));
   }
 
@@ -178,7 +185,9 @@ public abstract class JobStatusRetriever implements 
LatestFlowExecutionIdTracker
     return flowExecutionGroupings.stream().map(exec -> {
       List<JobStatus> jobStatuses = 
ImmutableList.copyOf(asJobStatuses(exec.getJobStates().stream().sorted(
           // rationalized order, to facilitate test assertions
-          
Comparator.comparing(this::getJobGroup).thenComparing(this::getJobName).thenComparing(this::getJobExecutionId)
+          Comparator.comparing(JobStatusRetriever::getJobGroup)
+              .thenComparing(JobStatusRetriever::getJobName)
+              .thenComparing(JobStatusRetriever::getJobExecutionId)
       ).collect(Collectors.toList())));
       return new FlowStatus(exec.getFlowName(), exec.getFlowGroup(), 
exec.getFlowExecutionId(), jobStatuses.iterator(),
             getFlowStatusFromJobStatuses(jobStatuses.iterator()));
@@ -196,10 +205,8 @@ public abstract class JobStatusRetriever implements 
LatestFlowExecutionIdTracker
 
   protected List<FlowExecutionJobStateGrouping> 
groupByFlowExecutionAndRetainLatest(
       String flowGroup, List<State> jobStatusStates, int maxCountPerFlowName) {
-    Map<String, Map<Long, List<State>>> statesByFlowExecutionIdByName =
-        jobStatusStates.stream().collect(Collectors.groupingBy(
-            this::getFlowName,
-            Collectors.groupingBy(this::getFlowExecutionId)));
+    Map<String, Map<Long, List<State>>> statesByFlowExecutionIdByName = 
jobStatusStates.stream().collect(
+        Collectors.groupingBy(JobStatusRetriever::getFlowName, 
Collectors.groupingBy(JobStatusRetriever::getFlowExecutionId)));
 
     return 
statesByFlowExecutionIdByName.entrySet().stream().sorted(Map.Entry.comparingByKey()).flatMap(flowNameEntry
 -> {
       String flowName = flowNameEntry.getKey();
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
index edb19d8d1..388d943aa 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
@@ -17,6 +17,8 @@
 
 package org.apache.gobblin.service.modules.flowgraph;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -26,15 +28,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
 import lombok.Getter;
 import lombok.Setter;
-
 import org.apache.gobblin.annotation.Alpha;
 
+
 /**
  * An implementation of Dag. Assumes that nodes have unique values. Nodes with 
duplicate values will produce
  * unpredictable behavior.
@@ -261,6 +259,7 @@ public class Dag<T> {
       this.value = value;
     }
 
+
     public void addParentNode(DagNode<T> node) {
       if (parentNodes == null) {
         parentNodes = Lists.newArrayList(node);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index ca38eda0c..9e345d71a 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -271,7 +271,7 @@ public class DagManager extends AbstractIdleService {
   }
 
   // Initializes and returns an array of Queue of size numThreads
-  private static LinkedBlockingDeque<?>[] initializeDagQueue(int numThreads) {
+  static LinkedBlockingDeque<?>[] initializeDagQueue(int numThreads) {
     LinkedBlockingDeque<?>[] queue = new LinkedBlockingDeque[numThreads];
 
     for (int i=0; i< numThreads; i++) {
@@ -324,16 +324,7 @@ public class DagManager extends AbstractIdleService {
       throw new IOException("Could not add dag" + dagId + "to queue");
     }
     if (setStatus) {
-      submitEventsAndSetStatus(dag);
-    }
-  }
-
-  private void submitEventsAndSetStatus(Dag<JobExecutionPlan> dag) {
-    for (DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
-      JobExecutionPlan jobExecutionPlan = 
DagManagerUtils.getJobExecutionPlan(dagNode);
-      Map<String, String> jobMetadata = 
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
-      new TimingEvent(eventSubmitter, 
TimingEvent.LauncherTimings.JOB_PENDING).stop(jobMetadata);
-      jobExecutionPlan.setExecutionStatus(PENDING);
+      DagManagerUtils.submitPendingExecStatus(dag, this.eventSubmitter);
     }
   }
 
@@ -642,15 +633,18 @@ public class DagManager extends AbstractIdleService {
      */
     private void finishResumingDags() throws IOException {
       for (Map.Entry<String, Dag<JobExecutionPlan>> dag : 
this.resumingDags.entrySet()) {
-        JobStatus flowStatus = pollFlowStatus(dag.getValue());
-        if (flowStatus == null || 
!flowStatus.getEventName().equals(PENDING_RESUME.name())) {
+        java.util.Optional<JobStatus> flowStatus = 
DagManagerUtils.pollFlowStatus(dag.getValue(), this.jobStatusRetriever, 
this.jobStatusPolledTimer);
+        if (!flowStatus.filter(fs -> 
fs.getEventName().equals(PENDING_RESUME.name())).isPresent()) {
           continue;
         }
 
         boolean dagReady = true;
         for (DagNode<JobExecutionPlan> node : dag.getValue().getNodes()) {
-          JobStatus jobStatus = pollJobStatus(node);
-          if (jobStatus == null || 
jobStatus.getEventName().equals(FAILED.name()) || 
jobStatus.getEventName().equals(CANCELLED.name())) {
+          java.util.Optional<JobStatus> jobStatus = 
DagManagerUtils.pollJobStatus(node, this.jobStatusRetriever, 
this.jobStatusPolledTimer);
+          if (jobStatus.filter(js -> {
+            String jobName = js.getEventName();
+            return jobName.equals(FAILED.name()) || 
jobName.equals(CANCELLED.name());
+          }).isPresent()) {
             dagReady = false;
             break;
           }
@@ -772,7 +766,7 @@ public class DagManager extends AbstractIdleService {
     /**
      * Proceed the execution of each dag node based on job status.
      */
-    private void pollAndAdvanceDag() throws IOException, ExecutionException, 
InterruptedException {
+    private void pollAndAdvanceDag() {
       Map<String, Set<DagNode<JobExecutionPlan>>> nextSubmitted = 
Maps.newHashMap();
       List<DagNode<JobExecutionPlan>> nodesToCleanUp = Lists.newArrayList();
 
@@ -780,7 +774,7 @@ public class DagManager extends AbstractIdleService {
         try {
           boolean slaKilled = slaKillIfNeeded(node);
 
-          JobStatus jobStatus = pollJobStatus(node);
+          java.util.Optional<JobStatus> jobStatus = 
DagManagerUtils.pollJobStatus(node, this.jobStatusRetriever, 
this.jobStatusPolledTimer);
 
           boolean killOrphanFlow = killJobIfOrphaned(node, jobStatus);
 
@@ -815,13 +809,13 @@ public class DagManager extends AbstractIdleService {
               break;
           }
 
-          if (jobStatus != null && jobStatus.isShouldRetry()) {
+          if (jobStatus.filter(JobStatus::isShouldRetry).isPresent()) {
             log.info("Retrying job: {}, current attempts: {}, max attempts: 
{}", DagManagerUtils.getFullyQualifiedJobName(node),
-                jobStatus.getCurrentAttempts(), jobStatus.getMaxAttempts());
+                jobStatus.get().getCurrentAttempts(), 
jobStatus.get().getMaxAttempts());
             this.jobToDag.get(node).setFlowEvent(null);
             submitJob(node);
           }
-        } catch (Exception e) {
+        } catch (InterruptedException | IOException | ExecutionException e) {
           // Error occurred while processing dag, continue processing other 
dags assigned to this thread
           log.error(String.format("Exception caught in DagManager while 
processing dag %s due to ",
               DagManagerUtils.getFullyQualifiedDagName(node)), e);
@@ -850,14 +844,14 @@ public class DagManager extends AbstractIdleService {
      * @return true if the total time that the job remains in the ORCHESTRATED 
state exceeds
      * {@value ConfigurationKeys#GOBBLIN_JOB_START_SLA_TIME}.
      */
-    private boolean killJobIfOrphaned(DagNode<JobExecutionPlan> node, 
JobStatus jobStatus)
+    private boolean killJobIfOrphaned(DagNode<JobExecutionPlan> node, 
java.util.Optional<JobStatus> jobStatus)
         throws ExecutionException, InterruptedException {
-      if (jobStatus == null) {
+      if (!jobStatus.isPresent()) {
         return false;
       }
-      ExecutionStatus executionStatus = valueOf(jobStatus.getEventName());
+      ExecutionStatus executionStatus = 
valueOf(jobStatus.get().getEventName());
       long timeOutForJobStart = DagManagerUtils.getJobStartSla(node, 
this.defaultJobStartSlaTimeMillis);
-      long jobOrchestratedTime = jobStatus.getOrchestratedTime();
+      long jobOrchestratedTime = jobStatus.get().getOrchestratedTime();
       if (executionStatus == ORCHESTRATED && System.currentTimeMillis() - 
jobOrchestratedTime > timeOutForJobStart) {
         log.info("Job {} of flow {} exceeded the job start SLA of {} ms. 
Killing the job now...",
             DagManagerUtils.getJobName(node),
@@ -875,15 +869,11 @@ public class DagManager extends AbstractIdleService {
       }
     }
 
-    private ExecutionStatus getJobExecutionStatus(boolean slaKilled, boolean 
killOrphanFlow, JobStatus jobStatus) {
+    private ExecutionStatus getJobExecutionStatus(boolean slaKilled, boolean 
killOrphanFlow, java.util.Optional<JobStatus> jobStatus) {
       if (slaKilled || killOrphanFlow) {
         return CANCELLED;
       } else {
-        if (jobStatus == null) {
-          return PENDING;
-        } else {
-          return valueOf(jobStatus.getEventName());
-        }
+        return jobStatus.map(status -> 
valueOf(status.getEventName())).orElse(PENDING);
       }
     }
 
@@ -932,47 +922,7 @@ public class DagManager extends AbstractIdleService {
       return false;
     }
 
-    /**
-     * Retrieve the {@link JobStatus} from the {@link JobExecutionPlan}.
-     */
-    private JobStatus pollJobStatus(DagNode<JobExecutionPlan> dagNode) {
-      Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
-      String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
-      String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
-      long flowExecutionId = 
jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
-      String jobGroup = jobConfig.getString(ConfigurationKeys.JOB_GROUP_KEY);
-      String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
-
-      return pollStatus(flowGroup, flowName, flowExecutionId, jobGroup, 
jobName);
-    }
-
-    /**
-     * Retrieve the flow's {@link JobStatus} (i.e. job status with {@link 
JobStatusRetriever#NA_KEY} as job name/group) from a dag
-     */
-    private JobStatus pollFlowStatus(Dag<JobExecutionPlan> dag) {
-      if (dag == null || dag.isEmpty()) {
-        return null;
-      }
-      Config jobConfig = 
dag.getNodes().get(0).getValue().getJobSpec().getConfig();
-      String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
-      String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
-      long flowExecutionId = 
jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
 
-      return pollStatus(flowGroup, flowName, flowExecutionId, 
JobStatusRetriever.NA_KEY, JobStatusRetriever.NA_KEY);
-    }
-
-    private JobStatus pollStatus(String flowGroup, String flowName, long 
flowExecutionId, String jobGroup, String jobName) {
-      long pollStartTime = System.nanoTime();
-      Iterator<JobStatus> jobStatusIterator =
-          this.jobStatusRetriever.getJobStatusesForFlowExecution(flowName, 
flowGroup, flowExecutionId, jobName, jobGroup);
-      Instrumented.updateTimer(this.jobStatusPolledTimer, System.nanoTime() - 
pollStartTime, TimeUnit.NANOSECONDS);
-
-      if (jobStatusIterator.hasNext()) {
-        return jobStatusIterator.next();
-      } else {
-        return null;
-      }
-    }
 
     /**
      * Submit next set of Dag nodes in the Dag identified by the provided dagId
@@ -1107,11 +1057,6 @@ public class DagManager extends AbstractIdleService {
       }
     }
 
-    private boolean hasRunningJobs(String dagId) {
-      List<DagNode<JobExecutionPlan>> dagNodes = this.dagToJobs.get(dagId);
-      return dagNodes != null && !dagNodes.isEmpty();
-    }
-
     /**
      * Perform clean up. Remove a dag from the dagstore if the dag is complete 
and update internal state.
      */
@@ -1136,7 +1081,7 @@ public class DagManager extends AbstractIdleService {
             deleteJobState(dagId, dagNode);
           }
         }
-        if (!hasRunningJobs(dagId)) {
+        if (!DagManagerUtils.hasRunningJobs(dagId, this.dagToJobs)) {
           // Collect all the dagIds that are finished
           this.dagIdstoClean.add(dagId);
           if (dag.getFlowEvent() == null) {
@@ -1155,8 +1100,8 @@ public class DagManager extends AbstractIdleService {
       for (Iterator<String> dagIdIterator = this.dagIdstoClean.iterator(); 
dagIdIterator.hasNext();) {
         String dagId = dagIdIterator.next();
         Dag<JobExecutionPlan> dag = this.dags.get(dagId);
-        JobStatus flowStatus = pollFlowStatus(dag);
-        if (flowStatus != null && 
FlowStatusGenerator.FINISHED_STATUSES.contains(flowStatus.getEventName())) {
+        java.util.Optional<JobStatus> flowStatus = 
DagManagerUtils.pollFlowStatus(dag, this.jobStatusRetriever, 
this.jobStatusPolledTimer);
+        if (flowStatus.filter(fs -> 
FlowStatusGenerator.FINISHED_STATUSES.contains(fs.getEventName())).isPresent()) 
{
           FlowId flowId = DagManagerUtils.getFlowId(dag);
           switch(dag.getFlowEvent()) {
             case TimingEvent.FlowTimings.FLOW_SUCCEEDED:
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 97201a4aa..06ac24aa7 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -16,9 +16,11 @@
  */
 package org.apache.gobblin.service.modules.orchestration;
 
+import com.google.common.base.Joiner;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -28,14 +30,18 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import com.codahale.metrics.Timer;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecProducer;
@@ -47,17 +53,34 @@ import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
 import 
org.apache.gobblin.service.modules.orchestration.DagManager.FailureOption;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
 import org.apache.gobblin.util.ConfigUtils;
 
+import static org.apache.gobblin.service.ExecutionStatus.*;
+
 
 public class DagManagerUtils {
   static long DEFAULT_FLOW_SLA_MILLIS = TimeUnit.HOURS.toMillis(24);
   static String QUOTA_KEY_SEPERATOR = ",";
 
-  static FlowId getFlowId(Dag<JobExecutionPlan> dag) {
+  public static FlowId getFlowId(Dag<JobExecutionPlan> dag) {
     return getFlowId(dag.getStartNodes().get(0));
   }
 
+  public static DagActionStore.DagAction 
createDagActionFromDag(Dag<JobExecutionPlan> dag, DagActionStore.FlowActionType 
flowActionType) {
+    return createDagActionFromDagNode(dag.getStartNodes().get(0), 
flowActionType);
+  }
+
+  // todo - dag action object does not have any identifier to tell if it is 
for a complete dag or just for one dag node
+  public static DagActionStore.DagAction 
createDagActionFromDagNode(DagNode<JobExecutionPlan> dagNode, 
DagActionStore.FlowActionType flowActionType) {
+    Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
+    String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+    String flowName =  jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+    String flowExecutionId = 
jobConfig.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+    return new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, 
flowActionType);
+  }
+
   static FlowId getFlowId(DagNode<JobExecutionPlan> dagNode) {
     Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
     String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
@@ -65,7 +88,7 @@ public class DagManagerUtils {
     return new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
   }
 
-  static long getFlowExecId(Dag<JobExecutionPlan> dag) {
+  public static long getFlowExecId(Dag<JobExecutionPlan> dag) {
     return getFlowExecId(dag.getStartNodes().get(0));
   }
 
@@ -94,20 +117,20 @@ public class DagManagerUtils {
   private static DagManager.DagId generateDagId(Config jobConfig) {
     String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
     String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
-    long flowExecutionId = 
jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+    String flowExecutionId = 
jobConfig.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
 
-    return new DagManager.DagId(flowGroup, flowName, 
String.valueOf(flowExecutionId));
+    return new DagManager.DagId(flowGroup, flowName, flowExecutionId);
   }
 
-  static DagManager.DagId generateDagId(Dag.DagNode<JobExecutionPlan> dagNode) 
{
+  public static DagManager.DagId generateDagId(DagNode<JobExecutionPlan> 
dagNode) {
     return generateDagId(dagNode.getValue().getJobSpec().getConfig());
   }
 
-  static DagManager.DagId generateDagId(String flowGroup, String flowName, 
long flowExecutionId) {
+  public static DagManager.DagId generateDagId(String flowGroup, String 
flowName, long flowExecutionId) {
     return generateDagId(flowGroup, flowName, String.valueOf(flowExecutionId));
   }
 
-  static DagManager.DagId generateDagId(String flowGroup, String flowName, 
String flowExecutionId) {
+  public static DagManager.DagId generateDagId(String flowGroup, String 
flowName, String flowExecutionId) {
     return new DagManager.DagId(flowGroup, flowName, flowExecutionId);
   }
 
@@ -116,7 +139,7 @@ public class DagManagerUtils {
    * @param dag
    * @return fully qualified name of the underlying {@link Dag}.
    */
-  static String getFullyQualifiedDagName(Dag<JobExecutionPlan> dag) {
+  public static String getFullyQualifiedDagName(Dag<JobExecutionPlan> dag) {
     FlowId flowid = getFlowId(dag);
     long flowExecutionId = getFlowExecId(dag);
     return "(flowGroup: " + flowid.getFlowGroup() + ", flowName: " + 
flowid.getFlowName() + ", flowExecutionId: " + flowExecutionId + ")";
@@ -133,7 +156,7 @@ public class DagManagerUtils {
     return "(flowGroup: " + flowid.getFlowGroup() + ", flowName: " + 
flowid.getFlowName() + ", flowExecutionId: " + flowExecutionId + ")";
   }
 
-  static String getJobName(DagNode<JobExecutionPlan> dagNode) {
+  public static String getJobName(DagNode<JobExecutionPlan> dagNode) {
     return 
dagNode.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY);
   }
 
@@ -142,7 +165,7 @@ public class DagManagerUtils {
    * @param dagNode
    * @return a fully qualified name of the underlying job.
    */
-  static String getFullyQualifiedJobName(DagNode<JobExecutionPlan> dagNode) {
+  public static String getFullyQualifiedJobName(DagNode<JobExecutionPlan> 
dagNode) {
     Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
 
     String flowGroup = ConfigUtils.getString(jobConfig, 
ConfigurationKeys.FLOW_GROUP_KEY, "");
@@ -153,7 +176,7 @@ public class DagManagerUtils {
     return "(flowGroup: " + flowGroup + ", flowName: " + flowName + ", 
flowExecutionId: " + flowExecutionId + ", jobName: " + jobName + ")";
   }
 
-  static JobExecutionPlan getJobExecutionPlan(DagNode<JobExecutionPlan> 
dagNode) {
+  public static JobExecutionPlan getJobExecutionPlan(DagNode<JobExecutionPlan> 
dagNode) {
     return dagNode.getValue();
   }
 
@@ -172,12 +195,12 @@ public class DagManagerUtils {
     return dagNode.getValue().getJobSpec().getConfig();
   }
 
-  static SpecProducer<Spec> getSpecProducer(DagNode<JobExecutionPlan> dagNode)
+  public static SpecProducer<Spec> getSpecProducer(DagNode<JobExecutionPlan> 
dagNode)
       throws ExecutionException, InterruptedException {
     return dagNode.getValue().getSpecExecutor().getProducer().get();
   }
 
-  static ExecutionStatus getExecutionStatus(DagNode<JobExecutionPlan> dagNode) 
{
+  public static ExecutionStatus getExecutionStatus(DagNode<JobExecutionPlan> 
dagNode) {
     return dagNode.getValue().getExecutionStatus();
   }
 
@@ -186,7 +209,7 @@ public class DagManagerUtils {
    * identifies each node yet to be executed and for which each of its parent 
nodes is in the {@link ExecutionStatus#COMPLETE}
    * state.
    */
-  static Set<DagNode<JobExecutionPlan>> getNext(Dag<JobExecutionPlan> dag) {
+  public static Set<DagNode<JobExecutionPlan>> getNext(Dag<JobExecutionPlan> 
dag) {
     Set<DagNode<JobExecutionPlan>> nextNodesToExecute = new HashSet<>();
     LinkedList<DagNode<JobExecutionPlan>> nodesToExpand = 
Lists.newLinkedList(dag.getStartNodes());
     FailureOption failureOption = getFailureOption(dag);
@@ -195,12 +218,11 @@ public class DagManagerUtils {
       DagNode<JobExecutionPlan> node = nodesToExpand.poll();
       ExecutionStatus executionStatus = getExecutionStatus(node);
       boolean addFlag = true;
-      if (executionStatus == ExecutionStatus.PENDING || executionStatus == 
ExecutionStatus.PENDING_RETRY
-          || executionStatus == ExecutionStatus.PENDING_RESUME) {
+      if (executionStatus == PENDING || executionStatus == PENDING_RETRY || 
executionStatus == PENDING_RESUME) {
         //Add a node to be executed next, only if all of its parent nodes are 
COMPLETE.
         List<DagNode<JobExecutionPlan>> parentNodes = dag.getParents(node);
         for (DagNode<JobExecutionPlan> parentNode : parentNodes) {
-          if (getExecutionStatus(parentNode) != ExecutionStatus.COMPLETE) {
+          if (getExecutionStatus(parentNode) != COMPLETE) {
             addFlag = false;
             break;
           }
@@ -208,10 +230,10 @@ public class DagManagerUtils {
         if (addFlag) {
           nextNodesToExecute.add(node);
         }
-      } else if (executionStatus == ExecutionStatus.COMPLETE) {
+      } else if (executionStatus == COMPLETE) {
         //Explore the children of COMPLETED node as next candidates for 
execution.
         nodesToExpand.addAll(dag.getChildren(node));
-      } else if ((executionStatus == ExecutionStatus.FAILED) || 
(executionStatus == ExecutionStatus.CANCELLED)) {
+      } else if ((executionStatus == FAILED) || (executionStatus == 
CANCELLED)) {
         switch (failureOption) {
           case FINISH_RUNNING:
             return new HashSet<>();
@@ -234,7 +256,7 @@ public class DagManagerUtils {
     return FailureOption.valueOf(failureOption);
   }
 
-  static String getSpecExecutorUri(DagNode<JobExecutionPlan> dagNode) {
+  public static String getSpecExecutorUri(DagNode<JobExecutionPlan> dagNode) {
     return dagNode.getValue().getSpecExecutor().getUri().toString();
   }
 
@@ -252,7 +274,7 @@ public class DagManagerUtils {
   /**
    * Increment the value of {@link JobExecutionPlan#currentAttempts}
    */
-  static void incrementJobAttempt(DagNode<JobExecutionPlan> dagNode) {
+  public static void incrementJobAttempt(DagNode<JobExecutionPlan> dagNode) {
     
dagNode.getValue().setCurrentAttempts(dagNode.getValue().getCurrentAttempts() + 
1);
   }
 
@@ -261,7 +283,7 @@ public class DagManagerUtils {
    * This method is not thread safe, we achieve correctness by making sure
    * one dag will only be handled in the same DagManagerThread
    */
-  static void incrementJobGeneration(DagNode<JobExecutionPlan> dagNode) {
+  public static void incrementJobGeneration(DagNode<JobExecutionPlan> dagNode) 
{
     
dagNode.getValue().setCurrentGeneration(dagNode.getValue().getCurrentGeneration()
 + 1);
   }
 
@@ -365,4 +387,86 @@ public class DagManagerUtils {
       throw new RuntimeException("Could not process requesters due to ", e);
     }
   }
+
+  /**
+   * Set the execution status of all the {@link DagNode}s of the provided dag.
+   * Also emits a {@link TimingEvent.LauncherTimings#JOB_PENDING} for each of 
those dag nodes.
+   * @param dag dag whose status is to change.
+   * @param eventSubmitter event submitter that will send the event.
+   */
+  public static void submitPendingExecStatus(Dag<JobExecutionPlan> dag, 
EventSubmitter eventSubmitter) {
+    for (DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
+      JobExecutionPlan jobExecutionPlan = 
DagManagerUtils.getJobExecutionPlan(dagNode);
+      Map<String, String> jobMetadata = 
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
+      
eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_PENDING).stop(jobMetadata);
+      jobExecutionPlan.setExecutionStatus(PENDING);
+    }
+  }
+
+  /**
+   * Retrieve the {@link JobStatus} from the {@link JobExecutionPlan}.
+   */
+  public static java.util.Optional<JobStatus> 
pollJobStatus(DagNode<JobExecutionPlan> dagNode, JobStatusRetriever 
jobStatusRetriever, Timer jobStatusPolledTimer) {
+    Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
+    String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+    String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+    long flowExecutionId = 
jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+    String jobGroup = jobConfig.getString(ConfigurationKeys.JOB_GROUP_KEY);
+    String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
+
+    return pollStatus(flowGroup, flowName, flowExecutionId, jobGroup, jobName, 
jobStatusRetriever, jobStatusPolledTimer);
+  }
+
+  /**
+   * Retrieve the flow's {@link JobStatus} (i.e. job status with {@link 
JobStatusRetriever#NA_KEY} as job name/group) from a dag
+   * Returns empty optional if dag is null/empty or job status is not found.
+   */
+  public static java.util.Optional<JobStatus> 
pollFlowStatus(Dag<JobExecutionPlan> dag, JobStatusRetriever 
jobStatusRetriever, Timer jobStatusPolledTimer) {
+    if (dag == null || dag.isEmpty()) {
+      return java.util.Optional.empty();
+    }
+    Config jobConfig = 
dag.getNodes().get(0).getValue().getJobSpec().getConfig();
+    String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+    String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+    long flowExecutionId = 
jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+
+    return pollStatus(flowGroup, flowName, flowExecutionId, 
JobStatusRetriever.NA_KEY, JobStatusRetriever.NA_KEY, jobStatusRetriever, 
jobStatusPolledTimer);
+  }
+
+  /**
+   * Retrieve the flow's {@link JobStatus} and update the timer if 
jobStatusPolledTimer is present.
+   * Returns empty optional if job status is not found.
+   */
+  public static java.util.Optional<JobStatus> pollStatus(String flowGroup, 
String flowName, long flowExecutionId, String jobGroup, String jobName,
+                                               JobStatusRetriever 
jobStatusRetriever, Timer jobStatusPolledTimer) {
+    long pollStartTime = System.nanoTime();
+    Iterator<JobStatus> jobStatusIterator =
+        jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, 
flowExecutionId, jobName, jobGroup);
+    Instrumented.updateTimer(jobStatusPolledTimer, System.nanoTime() - 
pollStartTime, TimeUnit.NANOSECONDS);
+
+    if (jobStatusIterator.hasNext()) {
+      return java.util.Optional.of(jobStatusIterator.next());
+    } else {
+      return java.util.Optional.empty();
+    }
+  }
+
+  public static boolean hasRunningJobs(String dagId, Map<String, 
LinkedList<DagNode<JobExecutionPlan>>> dagToJobs) {
+    List<DagNode<JobExecutionPlan>> dagNodes = dagToJobs.get(dagId);
+    return dagNodes != null && !dagNodes.isEmpty();
+  }
+
+  public static String calcJobId(Config jobConfig) {
+    String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+    String flowName =jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+    long flowExecutionId = ConfigUtils.getLong(jobConfig, 
ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 0L);
+    String jobGroup = ConfigUtils.getString(jobConfig, 
ConfigurationKeys.JOB_GROUP_KEY, "");
+    String jobName = ConfigUtils.getString(jobConfig, 
ConfigurationKeys.JOB_NAME_KEY, "");
+
+    return calcJobId(flowGroup, flowName, flowExecutionId, jobGroup, jobName);
+  }
+
+  public static String calcJobId(String flowGroup, String flowName, long 
flowExecutionId, String jobGroup, String jobName) {
+    return Joiner.on("_").join(flowGroup, flowName, flowExecutionId, jobGroup, 
jobName);
+  }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
index 7947ed0ae..cddd9bb84 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
@@ -55,7 +55,7 @@ public class TimingEventUtils {
     return 
Optional.ofNullable(flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD));
   }
 
-  static Map<String, String> getJobMetadata(Map<String, String> flowMetadata, 
JobExecutionPlan jobExecutionPlan) {
+  public static Map<String, String> getJobMetadata(Map<String, String> 
flowMetadata, JobExecutionPlan jobExecutionPlan) {
     Map<String, String> jobMetadata = Maps.newHashMap();
     JobSpec jobSpec = jobExecutionPlan.getJobSpec();
     SpecExecutor specExecutor = jobExecutionPlan.getSpecExecutor();
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index 2d05c93af..535179ee8 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -47,6 +47,7 @@ import org.apache.gobblin.service.ExecutionStatus;
 import 
org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
 import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
 import org.apache.gobblin.util.ConfigUtils;
 
 import static 
org.apache.gobblin.runtime.AbstractJobLauncher.GOBBLIN_JOB_TEMPLATE_KEY;
@@ -72,6 +73,7 @@ public class JobExecutionPlan {
   private int currentAttempts = 0;
   private Optional<Future> jobFuture = Optional.absent();
   private long flowStartTime = 0L;
+  private final String id;
 
   public static class Factory {
     public static final String JOB_NAME_COMPONENT_SEPARATION_CHAR = "_";
@@ -233,6 +235,7 @@ public class JobExecutionPlan {
     this.jobSpec = jobSpec;
     this.specExecutor = specExecutor;
     this.maxAttempts = ConfigUtils.getInt(jobSpec.getConfig(), 
JOB_MAX_ATTEMPTS, 1);
+    this.id = DagManagerUtils.calcJobId(jobSpec.getConfig());
   }
 
   /**
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
index 6b8d1e251..d323d710c 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
@@ -112,8 +112,9 @@ public class MysqlJobStatusRetriever extends 
JobStatusRetriever {
   }
 
   private List<Long> getLatestExecutionIds(List<State> jobStatusStates, int 
count) {
-    // `distinct()`, to avoid each flow execution ID replicating as many times 
as it has child jobs
-    Iterator<Long> flowExecutionIds = 
jobStatusStates.stream().map(this::getFlowExecutionId).distinct().iterator();
+    // `distinct()`, to avoid each flow execution ID replicating as many times 
as it
+    // has child jobs
+    Iterator<Long> flowExecutionIds = 
jobStatusStates.stream().map(JobStatusRetriever::getFlowExecutionId).distinct().iterator();
     return Ordering.<Long>natural().greatestOf(flowExecutionIds, count);
   }
 }


Reply via email to