[ 
https://issues.apache.org/jira/browse/GOBBLIN-1910?focusedWorklogId=898799&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-898799
 ]

ASF GitHub Bot logged work on GOBBLIN-1910:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/Jan/24 22:21
            Start Date: 09/Jan/24 22:21
    Worklog Time Spent: 10m 
      Work Description: arjun4084346 commented on code in PR #3853:
URL: https://github.com/apache/gobblin/pull/3853#discussion_r1446681864


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java:
##########
@@ -366,4 +396,63 @@ static List<String> getDistinctUniqueRequesters(String 
serializedRequesters) {
       throw new RuntimeException("Could not process requesters due to ", e);
     }
   }
+
+  public static void 
submitInitializationEventsAndSetStatus(Dag<JobExecutionPlan> dag, 
Optional<EventSubmitter> eventSubmitter) {
+    if (eventSubmitter.isPresent()) {
+      for (DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
+        JobExecutionPlan jobExecutionPlan = 
DagManagerUtils.getJobExecutionPlan(dagNode);
+        Map<String, String> jobMetadata = 
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
+        
eventSubmitter.get().getTimingEvent(TimingEvent.LauncherTimings.JOB_PENDING).stop(jobMetadata);
+        jobExecutionPlan.setExecutionStatus(PENDING);
+      }
+    }
+  }
+
+  /**
+   * Retrieve the {@link JobStatus} from the {@link JobExecutionPlan}.
+   */
+  public static JobStatus pollJobStatus(DagNode<JobExecutionPlan> dagNode, 
JobStatusRetriever jobStatusRetriever, Optional<Timer> jobStatusPolledTimer) {
+    Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
+    String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+    String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+    long flowExecutionId = 
jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+    String jobGroup = jobConfig.getString(ConfigurationKeys.JOB_GROUP_KEY);
+    String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
+
+    return pollStatus(flowGroup, flowName, flowExecutionId, jobGroup, jobName, 
jobStatusRetriever, jobStatusPolledTimer);
+  }
+
+  /**
+   * Retrieve the flow's {@link JobStatus} (i.e. job status with {@link 
JobStatusRetriever#NA_KEY} as job name/group) from a dag
+   */
+  public static  JobStatus pollFlowStatus(Dag<JobExecutionPlan> dag, 
JobStatusRetriever jobStatusRetriever, Optional<Timer> jobStatusPolledTimer) {
+    if (dag == null || dag.isEmpty()) {
+      return null;
+    }
+    Config jobConfig = 
dag.getNodes().get(0).getValue().getJobSpec().getConfig();
+    String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+    String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+    long flowExecutionId = 
jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+
+    return pollStatus(flowGroup, flowName, flowExecutionId, 
JobStatusRetriever.NA_KEY, JobStatusRetriever.NA_KEY, jobStatusRetriever, 
jobStatusPolledTimer);
+  }
+
+  public static  JobStatus pollStatus(String flowGroup, String flowName, long 
flowExecutionId, String jobGroup, String jobName,
+    JobStatusRetriever jobStatusRetriever, Optional<Timer> 
jobStatusPolledTimer) {
+    long pollStartTime = System.nanoTime();
+    Iterator<JobStatus> jobStatusIterator =
+        jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, 
flowExecutionId, jobName, jobGroup);
+    Instrumented.updateTimer(jobStatusPolledTimer, System.nanoTime() - 
pollStartTime, TimeUnit.NANOSECONDS);
+
+    if (jobStatusIterator.hasNext()) {
+      return jobStatusIterator.next();
+    } else {
+      return null;
+    }
+  }
+
+  public static boolean hasRunningJobs(String dagId, Map<String, 
LinkedList<DagNode<JobExecutionPlan>>> dagToJobs) {
+    List<DagNode<JobExecutionPlan>> dagNodes = dagToJobs.get(dagId);
+    return dagNodes != null && !dagNodes.isEmpty();
+  }

Review Comment:
   right. It should be in interface `DagManagementStateStore`, but that's not 
out yet. 





Issue Time Tracking
-------------------

    Worklog Id:     (was: 898799)
    Time Spent: 15h 40m  (was: 15.5h)

> Refactor code to move current in-memory references to new design for REST 
> calls: Launch, Resume and Kill
> --------------------------------------------------------------------------------------------------------
>
>                 Key: GOBBLIN-1910
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1910
>             Project: Apache Gobblin
>          Issue Type: New Feature
>            Reporter: Meeth Gala
>            Priority: Major
>          Time Spent: 15h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to