phet commented on code in PR #3853:
URL: https://github.com/apache/gobblin/pull/3853#discussion_r1444999117


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java:
##########
@@ -255,10 +261,14 @@ public static class DagNode<T> {
     private T value;
     //List of parent Nodes that are dependencies of this Node.
     private List<DagNode<T>> parentNodes;
+    private String id;
 
     //Constructor
     public DagNode(T value) {
       this.value = value;
+      if (this.getValue() instanceof JobExecutionPlan) {
+        this.id = createId(((JobExecutionPlan) 
this.getValue()).getJobSpec().getConfig());
+      }

Review Comment:
   looks like you're allowing this to default to `null`-initialized when not 
`instanceof JobExecutionPlan`.
   
   given that is not even this instance's type, but that of the generic param 
it encloses, it suggests this not to be essential to the abstraction.  as such 
it wouldn't belong as a field (property) of class instances.
   
   alternatively it might live self-contained, calculated each time within a 
`static` (e.g. in `DagManagerUtils`):
   ```
   public static Optional<String> extractJobId(DagNode<T>)
   ```
   
   less good might be in a stand-alone accessor (not sticking code into the 
ctor):
   ```
   public Optional<String> getJobId()
   ```



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java:
##########
@@ -285,6 +295,20 @@ public boolean equals(Object o) {
     public int hashCode() {
       return this.getValue().hashCode();
     }
+
+    public static String createId(Config jobConfig) {

Review Comment:
   nit: this isn't creating (i.e. constructing) an ID, but calculating a value. 
 `calcJobId`?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java:
##########
@@ -366,4 +396,63 @@ static List<String> getDistinctUniqueRequesters(String 
serializedRequesters) {
       throw new RuntimeException("Could not process requesters due to ", e);
     }
   }
+
+  public static void 
submitInitializationEventsAndSetStatus(Dag<JobExecutionPlan> dag, 
Optional<EventSubmitter> eventSubmitter) {
+    if (eventSubmitter.isPresent()) {
+      for (DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
+        JobExecutionPlan jobExecutionPlan = 
DagManagerUtils.getJobExecutionPlan(dagNode);
+        Map<String, String> jobMetadata = 
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
+        
eventSubmitter.get().getTimingEvent(TimingEvent.LauncherTimings.JOB_PENDING).stop(jobMetadata);
+        jobExecutionPlan.setExecutionStatus(PENDING);
+      }
+    }
+  }
+
+  /**
+   * Retrieve the {@link JobStatus} from the {@link JobExecutionPlan}.
+   */
+  public static JobStatus pollJobStatus(DagNode<JobExecutionPlan> dagNode, 
JobStatusRetriever jobStatusRetriever, Optional<Timer> jobStatusPolledTimer) {
+    Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
+    String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+    String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+    long flowExecutionId = 
jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+    String jobGroup = jobConfig.getString(ConfigurationKeys.JOB_GROUP_KEY);
+    String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
+
+    return pollStatus(flowGroup, flowName, flowExecutionId, jobGroup, jobName, 
jobStatusRetriever, jobStatusPolledTimer);
+  }
+
+  /**
+   * Retrieve the flow's {@link JobStatus} (i.e. job status with {@link 
JobStatusRetriever#NA_KEY} as job name/group) from a dag
+   */
+  public static  JobStatus pollFlowStatus(Dag<JobExecutionPlan> dag, 
JobStatusRetriever jobStatusRetriever, Optional<Timer> jobStatusPolledTimer) {
+    if (dag == null || dag.isEmpty()) {
+      return null;
+    }
+    Config jobConfig = 
dag.getNodes().get(0).getValue().getJobSpec().getConfig();
+    String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+    String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+    long flowExecutionId = 
jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+
+    return pollStatus(flowGroup, flowName, flowExecutionId, 
JobStatusRetriever.NA_KEY, JobStatusRetriever.NA_KEY, jobStatusRetriever, 
jobStatusPolledTimer);
+  }
+
+  public static  JobStatus pollStatus(String flowGroup, String flowName, long 
flowExecutionId, String jobGroup, String jobName,

Review Comment:
   being non-obvious, javadoc ought to describe the semantics of the timer.   
e.g. what does it mean for it to be absent?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java:
##########
@@ -48,25 +53,46 @@
 import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
 import 
org.apache.gobblin.service.modules.orchestration.DagManager.FailureOption;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
 import org.apache.gobblin.util.ConfigUtils;
 
+import static org.apache.gobblin.service.ExecutionStatus.PENDING;
+
 
 public class DagManagerUtils {
   static long DEFAULT_FLOW_SLA_MILLIS = TimeUnit.HOURS.toMillis(24);
   static String QUOTA_KEY_SEPERATOR = ",";
 
-  static FlowId getFlowId(Dag<JobExecutionPlan> dag) {
+  public static FlowId getFlowId(Dag<JobExecutionPlan> dag) {
     return getFlowId(dag.getStartNodes().get(0));
   }
 
+  public static DagActionStore.DagAction createDagAction(Dag<JobExecutionPlan> 
dag, DagActionStore.FlowActionType flowActionType) {
+    return createDagAction(dag.getStartNodes().get(0), flowActionType);
+  }
+
+  public static DagActionStore.DagAction createDagAction(String flowGroup, 
String flowName, String flowExecutionId, DagActionStore.FlowActionType 
flowActionType) {
+    return new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, 
flowActionType);
+  }

Review Comment:
   I don't see this adding value (i.e. `createDagAction` vs `new DagAction`). 
do we really need it?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java:
##########
@@ -48,25 +53,46 @@
 import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
 import 
org.apache.gobblin.service.modules.orchestration.DagManager.FailureOption;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
 import org.apache.gobblin.util.ConfigUtils;
 
+import static org.apache.gobblin.service.ExecutionStatus.PENDING;
+
 
 public class DagManagerUtils {
   static long DEFAULT_FLOW_SLA_MILLIS = TimeUnit.HOURS.toMillis(24);
   static String QUOTA_KEY_SEPERATOR = ",";
 
-  static FlowId getFlowId(Dag<JobExecutionPlan> dag) {
+  public static FlowId getFlowId(Dag<JobExecutionPlan> dag) {
     return getFlowId(dag.getStartNodes().get(0));
   }
 
+  public static DagActionStore.DagAction createDagAction(Dag<JobExecutionPlan> 
dag, DagActionStore.FlowActionType flowActionType) {
+    return createDagAction(dag.getStartNodes().get(0), flowActionType);
+  }
+
+  public static DagActionStore.DagAction createDagAction(String flowGroup, 
String flowName, String flowExecutionId, DagActionStore.FlowActionType 
flowActionType) {
+    return new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, 
flowActionType);
+  }
+
+  // todo - dag action object does not have any identifier to tell if it is 
for a complete dag or just for one dag node
+  public static DagActionStore.DagAction 
createDagAction(DagNode<JobExecutionPlan> dagNode, 
DagActionStore.FlowActionType flowActionType) {

Review Comment:
   given the potential for confusion, should we be more explicit?
   ```
   createDagActionFromDag
   createDagActionFromDagNode
   ```



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java:
##########
@@ -366,4 +396,63 @@ static List<String> getDistinctUniqueRequesters(String 
serializedRequesters) {
       throw new RuntimeException("Could not process requesters due to ", e);
     }
   }
+
+  public static void 
submitInitializationEventsAndSetStatus(Dag<JobExecutionPlan> dag, 
Optional<EventSubmitter> eventSubmitter) {
+    if (eventSubmitter.isPresent()) {
+      for (DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
+        JobExecutionPlan jobExecutionPlan = 
DagManagerUtils.getJobExecutionPlan(dagNode);
+        Map<String, String> jobMetadata = 
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
+        
eventSubmitter.get().getTimingEvent(TimingEvent.LauncherTimings.JOB_PENDING).stop(jobMetadata);
+        jobExecutionPlan.setExecutionStatus(PENDING);
+      }
+    }
+  }
+
+  /**
+   * Retrieve the {@link JobStatus} from the {@link JobExecutionPlan}.
+   */
+  public static JobStatus pollJobStatus(DagNode<JobExecutionPlan> dagNode, 
JobStatusRetriever jobStatusRetriever, Optional<Timer> jobStatusPolledTimer) {
+    Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
+    String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+    String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+    long flowExecutionId = 
jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+    String jobGroup = jobConfig.getString(ConfigurationKeys.JOB_GROUP_KEY);
+    String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
+
+    return pollStatus(flowGroup, flowName, flowExecutionId, jobGroup, jobName, 
jobStatusRetriever, jobStatusPolledTimer);
+  }
+
+  /**
+   * Retrieve the flow's {@link JobStatus} (i.e. job status with {@link 
JobStatusRetriever#NA_KEY} as job name/group) from a dag
+   */
+  public static  JobStatus pollFlowStatus(Dag<JobExecutionPlan> dag, 
JobStatusRetriever jobStatusRetriever, Optional<Timer> jobStatusPolledTimer) {
+    if (dag == null || dag.isEmpty()) {
+      return null;
+    }
+    Config jobConfig = 
dag.getNodes().get(0).getValue().getJobSpec().getConfig();
+    String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+    String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+    long flowExecutionId = 
jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+
+    return pollStatus(flowGroup, flowName, flowExecutionId, 
JobStatusRetriever.NA_KEY, JobStatusRetriever.NA_KEY, jobStatusRetriever, 
jobStatusPolledTimer);
+  }
+
+  public static  JobStatus pollStatus(String flowGroup, String flowName, long 
flowExecutionId, String jobGroup, String jobName,
+    JobStatusRetriever jobStatusRetriever, Optional<Timer> 
jobStatusPolledTimer) {
+    long pollStartTime = System.nanoTime();
+    Iterator<JobStatus> jobStatusIterator =
+        jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, 
flowExecutionId, jobName, jobGroup);
+    Instrumented.updateTimer(jobStatusPolledTimer, System.nanoTime() - 
pollStartTime, TimeUnit.NANOSECONDS);
+
+    if (jobStatusIterator.hasNext()) {
+      return jobStatusIterator.next();
+    } else {
+      return null;
+    }
+  }
+
+  public static boolean hasRunningJobs(String dagId, Map<String, 
LinkedList<DagNode<JobExecutionPlan>>> dagToJobs) {
+    List<DagNode<JobExecutionPlan>> dagNodes = dagToJobs.get(dagId);
+    return dagNodes != null && !dagNodes.isEmpty();
+  }

Review Comment:
   seems fine for this to be an expedient, but short-term abstraction, but for 
anything beyond that, let's strive for better encapsulation, such as a class 
that maintains the `Map` internally and provides a method:
   ```
   public hasAnyRunning(String dagId)
   ```
   of course, even better would be:
   ```
   public hasAnyRunning(Dag<JEP> dag)
   ```
   or:
   ```
   Dag<JEP> dag = ...
   DagId dagId = getDagId(dag);
   boolean hasRunningJobs = jobRunningStatus.hasAnyRunning(dagId);
   ```



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -275,7 +278,7 @@ DagStateStore createDagStateStore(Config config, Map<URI, 
TopologySpec> topology
   }
 
   // Initializes and returns an array of Queue of size numThreads
-  private static LinkedBlockingDeque<?>[] initializeDagQueue(int numThreads) {
+  static LinkedBlockingDeque<?>[] initializeDagQueue(int numThreads) {

Review Comment:
   why package protected?  is there a clear reason not to use `public`?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java:
##########
@@ -366,4 +396,63 @@ static List<String> getDistinctUniqueRequesters(String 
serializedRequesters) {
       throw new RuntimeException("Could not process requesters due to ", e);
     }
   }
+
+  public static void 
submitInitializationEventsAndSetStatus(Dag<JobExecutionPlan> dag, 
Optional<EventSubmitter> eventSubmitter) {

Review Comment:
   given the entire body is skipped when absent (e.g. you don't even set status 
when not submitting), it may be clearer to phrase as:
   ```
   submitAndSet(Dag<JEP> dag, EventSubmitter es)
   ```
   for use as:
   ```
   eventSubmitter.ifPresent(es -> submitAndSet(dag, es);
   ```
   
   to promote that, let's rid ourselves of the antiquated guava `Optional` in 
favor of the java8 one



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to