arjun4084346 commented on code in PR #3853:
URL: https://github.com/apache/gobblin/pull/3853#discussion_r1446681864


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java:
##########
@@ -366,4 +396,63 @@ static List<String> getDistinctUniqueRequesters(String 
serializedRequesters) {
       throw new RuntimeException("Could not process requesters due to ", e);
     }
   }
+
+  public static void 
submitInitializationEventsAndSetStatus(Dag<JobExecutionPlan> dag, 
Optional<EventSubmitter> eventSubmitter) {
+    if (eventSubmitter.isPresent()) {
+      for (DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
+        JobExecutionPlan jobExecutionPlan = 
DagManagerUtils.getJobExecutionPlan(dagNode);
+        Map<String, String> jobMetadata = 
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
+        
eventSubmitter.get().getTimingEvent(TimingEvent.LauncherTimings.JOB_PENDING).stop(jobMetadata);
+        jobExecutionPlan.setExecutionStatus(PENDING);
+      }
+    }
+  }
+
+  /**
+   * Retrieve the {@link JobStatus} from the {@link JobExecutionPlan}.
+   */
+  public static JobStatus pollJobStatus(DagNode<JobExecutionPlan> dagNode, 
JobStatusRetriever jobStatusRetriever, Optional<Timer> jobStatusPolledTimer) {
+    Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
+    String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+    String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+    long flowExecutionId = 
jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+    String jobGroup = jobConfig.getString(ConfigurationKeys.JOB_GROUP_KEY);
+    String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
+
+    return pollStatus(flowGroup, flowName, flowExecutionId, jobGroup, jobName, 
jobStatusRetriever, jobStatusPolledTimer);
+  }
+
+  /**
+   * Retrieve the flow's {@link JobStatus} (i.e. job status with {@link 
JobStatusRetriever#NA_KEY} as job name/group) from a dag
+   */
+  public static  JobStatus pollFlowStatus(Dag<JobExecutionPlan> dag, 
JobStatusRetriever jobStatusRetriever, Optional<Timer> jobStatusPolledTimer) {
+    if (dag == null || dag.isEmpty()) {
+      return null;
+    }
+    Config jobConfig = 
dag.getNodes().get(0).getValue().getJobSpec().getConfig();
+    String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+    String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+    long flowExecutionId = 
jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+
+    return pollStatus(flowGroup, flowName, flowExecutionId, 
JobStatusRetriever.NA_KEY, JobStatusRetriever.NA_KEY, jobStatusRetriever, 
jobStatusPolledTimer);
+  }
+
+  public static  JobStatus pollStatus(String flowGroup, String flowName, long 
flowExecutionId, String jobGroup, String jobName,
+    JobStatusRetriever jobStatusRetriever, Optional<Timer> 
jobStatusPolledTimer) {
+    long pollStartTime = System.nanoTime();
+    Iterator<JobStatus> jobStatusIterator =
+        jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, 
flowExecutionId, jobName, jobGroup);
+    Instrumented.updateTimer(jobStatusPolledTimer, System.nanoTime() - 
pollStartTime, TimeUnit.NANOSECONDS);
+
+    if (jobStatusIterator.hasNext()) {
+      return jobStatusIterator.next();
+    } else {
+      return null;
+    }
+  }
+
+  public static boolean hasRunningJobs(String dagId, Map<String, 
LinkedList<DagNode<JobExecutionPlan>>> dagToJobs) {
+    List<DagNode<JobExecutionPlan>> dagNodes = dagToJobs.get(dagId);
+    return dagNodes != null && !dagNodes.isEmpty();
+  }

Review Comment:
   right. It should be in interface `DagManagementStateStore`, but that's not 
out yet. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to