arjun4084346 commented on code in PR #3853:
URL: https://github.com/apache/gobblin/pull/3853#discussion_r1446681864
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java:
##########
@@ -366,4 +396,63 @@ static List<String> getDistinctUniqueRequesters(String
serializedRequesters) {
throw new RuntimeException("Could not process requesters due to ", e);
}
}
+
+ public static void
submitInitializationEventsAndSetStatus(Dag<JobExecutionPlan> dag,
Optional<EventSubmitter> eventSubmitter) {
+ if (eventSubmitter.isPresent()) {
+ for (DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
+ JobExecutionPlan jobExecutionPlan =
DagManagerUtils.getJobExecutionPlan(dagNode);
+ Map<String, String> jobMetadata =
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
+
eventSubmitter.get().getTimingEvent(TimingEvent.LauncherTimings.JOB_PENDING).stop(jobMetadata);
+ jobExecutionPlan.setExecutionStatus(PENDING);
+ }
+ }
+ }
+
+ /**
+ * Retrieve the {@link JobStatus} from the {@link JobExecutionPlan}.
+ */
+ public static JobStatus pollJobStatus(DagNode<JobExecutionPlan> dagNode,
JobStatusRetriever jobStatusRetriever, Optional<Timer> jobStatusPolledTimer) {
+ Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
+ String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+ String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+ long flowExecutionId =
jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+ String jobGroup = jobConfig.getString(ConfigurationKeys.JOB_GROUP_KEY);
+ String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
+
+ return pollStatus(flowGroup, flowName, flowExecutionId, jobGroup, jobName,
jobStatusRetriever, jobStatusPolledTimer);
+ }
+
+ /**
+ * Retrieve the flow's {@link JobStatus} (i.e. job status with {@link
JobStatusRetriever#NA_KEY} as job name/group) from a dag
+ */
+ public static JobStatus pollFlowStatus(Dag<JobExecutionPlan> dag,
JobStatusRetriever jobStatusRetriever, Optional<Timer> jobStatusPolledTimer) {
+ if (dag == null || dag.isEmpty()) {
+ return null;
+ }
+ Config jobConfig =
dag.getNodes().get(0).getValue().getJobSpec().getConfig();
+ String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+ String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+ long flowExecutionId =
jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+
+ return pollStatus(flowGroup, flowName, flowExecutionId,
JobStatusRetriever.NA_KEY, JobStatusRetriever.NA_KEY, jobStatusRetriever,
jobStatusPolledTimer);
+ }
+
+ public static JobStatus pollStatus(String flowGroup, String flowName, long
flowExecutionId, String jobGroup, String jobName,
+ JobStatusRetriever jobStatusRetriever, Optional<Timer>
jobStatusPolledTimer) {
+ long pollStartTime = System.nanoTime();
+ Iterator<JobStatus> jobStatusIterator =
+ jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup,
flowExecutionId, jobName, jobGroup);
+ Instrumented.updateTimer(jobStatusPolledTimer, System.nanoTime() -
pollStartTime, TimeUnit.NANOSECONDS);
+
+ if (jobStatusIterator.hasNext()) {
+ return jobStatusIterator.next();
+ } else {
+ return null;
+ }
+ }
+
+ public static boolean hasRunningJobs(String dagId, Map<String,
LinkedList<DagNode<JobExecutionPlan>>> dagToJobs) {
+ List<DagNode<JobExecutionPlan>> dagNodes = dagToJobs.get(dagId);
+ return dagNodes != null && !dagNodes.isEmpty();
+ }
Review Comment:
right. It should be in interface `DagManagementStateStore`, but that's not
out yet.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]