phet commented on code in PR #3853:
URL: https://github.com/apache/gobblin/pull/3853#discussion_r1444999117
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java:
##########
@@ -255,10 +261,14 @@ public static class DagNode<T> {
private T value;
//List of parent Nodes that are dependencies of this Node.
private List<DagNode<T>> parentNodes;
+ private String id;
//Constructor
public DagNode(T value) {
this.value = value;
+ if (this.getValue() instanceof JobExecutionPlan) {
+ this.id = createId(((JobExecutionPlan)
this.getValue()).getJobSpec().getConfig());
+ }
Review Comment:
looks like you're allowing this to default to `null`-initialized when not
`instanceof JobExecutionPlan`.
given that is not even this instance's type, but that of the generic param
it encloses, it suggests this not to be essential to the abstraction. as such
it wouldn't belong as a field (property) of class instances.
alternatively it might live self-contained, calculated each time within a
`static` (e.g. in `DagManagerUtils`):
```
public static Optional<String> extractJobId(DagNode<T>)
```
less good might be in a stand-alone accessor (not sticking code into the
ctor):
```
public Optional<String> getJobId()
```
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java:
##########
@@ -285,6 +295,20 @@ public boolean equals(Object o) {
public int hashCode() {
return this.getValue().hashCode();
}
+
+ public static String createId(Config jobConfig) {
Review Comment:
nit: this isn't creating (i.e. constructing) an ID, but calculating a value.
`calcJobId`?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java:
##########
@@ -366,4 +396,63 @@ static List<String> getDistinctUniqueRequesters(String
serializedRequesters) {
throw new RuntimeException("Could not process requesters due to ", e);
}
}
+
+ public static void
submitInitializationEventsAndSetStatus(Dag<JobExecutionPlan> dag,
Optional<EventSubmitter> eventSubmitter) {
+ if (eventSubmitter.isPresent()) {
+ for (DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
+ JobExecutionPlan jobExecutionPlan =
DagManagerUtils.getJobExecutionPlan(dagNode);
+ Map<String, String> jobMetadata =
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
+
eventSubmitter.get().getTimingEvent(TimingEvent.LauncherTimings.JOB_PENDING).stop(jobMetadata);
+ jobExecutionPlan.setExecutionStatus(PENDING);
+ }
+ }
+ }
+
+ /**
+ * Retrieve the {@link JobStatus} from the {@link JobExecutionPlan}.
+ */
+ public static JobStatus pollJobStatus(DagNode<JobExecutionPlan> dagNode,
JobStatusRetriever jobStatusRetriever, Optional<Timer> jobStatusPolledTimer) {
+ Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
+ String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+ String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+ long flowExecutionId =
jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+ String jobGroup = jobConfig.getString(ConfigurationKeys.JOB_GROUP_KEY);
+ String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
+
+ return pollStatus(flowGroup, flowName, flowExecutionId, jobGroup, jobName,
jobStatusRetriever, jobStatusPolledTimer);
+ }
+
+ /**
+ * Retrieve the flow's {@link JobStatus} (i.e. job status with {@link
JobStatusRetriever#NA_KEY} as job name/group) from a dag
+ */
+ public static JobStatus pollFlowStatus(Dag<JobExecutionPlan> dag,
JobStatusRetriever jobStatusRetriever, Optional<Timer> jobStatusPolledTimer) {
+ if (dag == null || dag.isEmpty()) {
+ return null;
+ }
+ Config jobConfig =
dag.getNodes().get(0).getValue().getJobSpec().getConfig();
+ String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+ String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+ long flowExecutionId =
jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+
+ return pollStatus(flowGroup, flowName, flowExecutionId,
JobStatusRetriever.NA_KEY, JobStatusRetriever.NA_KEY, jobStatusRetriever,
jobStatusPolledTimer);
+ }
+
+ public static JobStatus pollStatus(String flowGroup, String flowName, long
flowExecutionId, String jobGroup, String jobName,
Review Comment:
being non-obvious, javadoc ought to describe the semantics of the timer.
e.g. what does it mean for it to be absent?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java:
##########
@@ -48,25 +53,46 @@
import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
import
org.apache.gobblin.service.modules.orchestration.DagManager.FailureOption;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
import org.apache.gobblin.util.ConfigUtils;
+import static org.apache.gobblin.service.ExecutionStatus.PENDING;
+
public class DagManagerUtils {
static long DEFAULT_FLOW_SLA_MILLIS = TimeUnit.HOURS.toMillis(24);
static String QUOTA_KEY_SEPERATOR = ",";
- static FlowId getFlowId(Dag<JobExecutionPlan> dag) {
+ public static FlowId getFlowId(Dag<JobExecutionPlan> dag) {
return getFlowId(dag.getStartNodes().get(0));
}
+ public static DagActionStore.DagAction createDagAction(Dag<JobExecutionPlan>
dag, DagActionStore.FlowActionType flowActionType) {
+ return createDagAction(dag.getStartNodes().get(0), flowActionType);
+ }
+
+ public static DagActionStore.DagAction createDagAction(String flowGroup,
String flowName, String flowExecutionId, DagActionStore.FlowActionType
flowActionType) {
+ return new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
flowActionType);
+ }
Review Comment:
I don't see this adding value (i.e. `createDagAction` vs `new DagAction`).
do we really need it?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java:
##########
@@ -48,25 +53,46 @@
import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
import
org.apache.gobblin.service.modules.orchestration.DagManager.FailureOption;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
import org.apache.gobblin.util.ConfigUtils;
+import static org.apache.gobblin.service.ExecutionStatus.PENDING;
+
public class DagManagerUtils {
static long DEFAULT_FLOW_SLA_MILLIS = TimeUnit.HOURS.toMillis(24);
static String QUOTA_KEY_SEPERATOR = ",";
- static FlowId getFlowId(Dag<JobExecutionPlan> dag) {
+ public static FlowId getFlowId(Dag<JobExecutionPlan> dag) {
return getFlowId(dag.getStartNodes().get(0));
}
+ public static DagActionStore.DagAction createDagAction(Dag<JobExecutionPlan>
dag, DagActionStore.FlowActionType flowActionType) {
+ return createDagAction(dag.getStartNodes().get(0), flowActionType);
+ }
+
+ public static DagActionStore.DagAction createDagAction(String flowGroup,
String flowName, String flowExecutionId, DagActionStore.FlowActionType
flowActionType) {
+ return new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
flowActionType);
+ }
+
+ // todo - dag action object does not have any identifier to tell if it is
for a complete dag or just for one dag node
+ public static DagActionStore.DagAction
createDagAction(DagNode<JobExecutionPlan> dagNode,
DagActionStore.FlowActionType flowActionType) {
Review Comment:
given the potential for confusion, should we be more explicit?
```
createDagActionFromDag
createDagActionFromDagNode
```
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java:
##########
@@ -366,4 +396,63 @@ static List<String> getDistinctUniqueRequesters(String
serializedRequesters) {
throw new RuntimeException("Could not process requesters due to ", e);
}
}
+
+ public static void
submitInitializationEventsAndSetStatus(Dag<JobExecutionPlan> dag,
Optional<EventSubmitter> eventSubmitter) {
+ if (eventSubmitter.isPresent()) {
+ for (DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
+ JobExecutionPlan jobExecutionPlan =
DagManagerUtils.getJobExecutionPlan(dagNode);
+ Map<String, String> jobMetadata =
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
+
eventSubmitter.get().getTimingEvent(TimingEvent.LauncherTimings.JOB_PENDING).stop(jobMetadata);
+ jobExecutionPlan.setExecutionStatus(PENDING);
+ }
+ }
+ }
+
+ /**
+ * Retrieve the {@link JobStatus} from the {@link JobExecutionPlan}.
+ */
+ public static JobStatus pollJobStatus(DagNode<JobExecutionPlan> dagNode,
JobStatusRetriever jobStatusRetriever, Optional<Timer> jobStatusPolledTimer) {
+ Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
+ String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+ String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+ long flowExecutionId =
jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+ String jobGroup = jobConfig.getString(ConfigurationKeys.JOB_GROUP_KEY);
+ String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
+
+ return pollStatus(flowGroup, flowName, flowExecutionId, jobGroup, jobName,
jobStatusRetriever, jobStatusPolledTimer);
+ }
+
+ /**
+ * Retrieve the flow's {@link JobStatus} (i.e. job status with {@link
JobStatusRetriever#NA_KEY} as job name/group) from a dag
+ */
+ public static JobStatus pollFlowStatus(Dag<JobExecutionPlan> dag,
JobStatusRetriever jobStatusRetriever, Optional<Timer> jobStatusPolledTimer) {
+ if (dag == null || dag.isEmpty()) {
+ return null;
+ }
+ Config jobConfig =
dag.getNodes().get(0).getValue().getJobSpec().getConfig();
+ String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+ String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+ long flowExecutionId =
jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+
+ return pollStatus(flowGroup, flowName, flowExecutionId,
JobStatusRetriever.NA_KEY, JobStatusRetriever.NA_KEY, jobStatusRetriever,
jobStatusPolledTimer);
+ }
+
+ public static JobStatus pollStatus(String flowGroup, String flowName, long
flowExecutionId, String jobGroup, String jobName,
+ JobStatusRetriever jobStatusRetriever, Optional<Timer>
jobStatusPolledTimer) {
+ long pollStartTime = System.nanoTime();
+ Iterator<JobStatus> jobStatusIterator =
+ jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup,
flowExecutionId, jobName, jobGroup);
+ Instrumented.updateTimer(jobStatusPolledTimer, System.nanoTime() -
pollStartTime, TimeUnit.NANOSECONDS);
+
+ if (jobStatusIterator.hasNext()) {
+ return jobStatusIterator.next();
+ } else {
+ return null;
+ }
+ }
+
+ public static boolean hasRunningJobs(String dagId, Map<String,
LinkedList<DagNode<JobExecutionPlan>>> dagToJobs) {
+ List<DagNode<JobExecutionPlan>> dagNodes = dagToJobs.get(dagId);
+ return dagNodes != null && !dagNodes.isEmpty();
+ }
Review Comment:
seems fine for this to be an expedient, but short-term abstraction, but for
anything beyond that, let's strive for better encapsulation, such as a class
that maintains the `Map` internally and provides a method:
```
public hasAnyRunning(String dagId)
```
of course, even better would be:
```
public hasAnyRunning(Dag<JEP> dag)
```
or:
```
Dag<JEP> dag = ...
DagId dagId = getDagId(dag);
boolean hasRunningJobs = jobRunningStatus.hasAnyRunning(dagId);
```
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -275,7 +278,7 @@ DagStateStore createDagStateStore(Config config, Map<URI,
TopologySpec> topology
}
// Initializes and returns an array of Queue of size numThreads
- private static LinkedBlockingDeque<?>[] initializeDagQueue(int numThreads) {
+ static LinkedBlockingDeque<?>[] initializeDagQueue(int numThreads) {
Review Comment:
why package protected? is there a clear reason not to use `public`?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java:
##########
@@ -366,4 +396,63 @@ static List<String> getDistinctUniqueRequesters(String
serializedRequesters) {
throw new RuntimeException("Could not process requesters due to ", e);
}
}
+
+ public static void
submitInitializationEventsAndSetStatus(Dag<JobExecutionPlan> dag,
Optional<EventSubmitter> eventSubmitter) {
Review Comment:
given the entire body is skipped when absent (e.g. you don't even set status
when not submitting), it may be clearer to phrase as:
```
submitAndSet(Dag<JEP> dag, EventSubmitter es)
```
for use as:
```
eventSubmitter.ifPresent(es -> submitAndSet(dag, es);
```
to promote that, let's rid ourselves of the antiquated guava `Optional` in
favor of the java8 one
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]