umustafi commented on code in PR #3853:
URL: https://github.com/apache/gobblin/pull/3853#discussion_r1446774978
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java:
##########
Review Comment:
can we avoid some of these changes for now if no code is changed?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -17,23 +17,26 @@
package org.apache.gobblin.service.monitoring;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigValueFactory;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Collection;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
+
Review Comment:
same comment as above for cleaning up this pr
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -17,12 +17,6 @@
package org.apache.gobblin.service.modules.orchestration;
Review Comment:
same here let's remove some of these changes to keep PR shorter
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java:
##########
@@ -366,4 +389,80 @@ static List<String> getDistinctUniqueRequesters(String
serializedRequesters) {
throw new RuntimeException("Could not process requesters due to ", e);
}
}
+
+ public static void submitAndSet(Dag<JobExecutionPlan> dag,
Optional<EventSubmitter> eventSubmitter) {
+ eventSubmitter.toJavaUtil().ifPresent(es -> {
+ for (DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
+ JobExecutionPlan jobExecutionPlan =
DagManagerUtils.getJobExecutionPlan(dagNode);
+ Map<String, String> jobMetadata =
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
+
eventSubmitter.get().getTimingEvent(TimingEvent.LauncherTimings.JOB_PENDING).stop(jobMetadata);
+ jobExecutionPlan.setExecutionStatus(PENDING);
+ }
+ });
+ }
+
+ /**
+ * Retrieve the {@link JobStatus} from the {@link JobExecutionPlan}.
+ */
+ public static JobStatus pollJobStatus(DagNode<JobExecutionPlan> dagNode,
JobStatusRetriever jobStatusRetriever, Optional<Timer> jobStatusPolledTimer) {
+ Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
+ String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+ String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+ long flowExecutionId =
jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+ String jobGroup = jobConfig.getString(ConfigurationKeys.JOB_GROUP_KEY);
+ String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
+
+ return pollStatus(flowGroup, flowName, flowExecutionId, jobGroup, jobName,
jobStatusRetriever, jobStatusPolledTimer);
+ }
+
+ /**
+ * Retrieve the flow's {@link JobStatus} (i.e. job status with {@link
JobStatusRetriever#NA_KEY} as job name/group) from a dag
+ */
+ public static JobStatus pollFlowStatus(Dag<JobExecutionPlan> dag,
JobStatusRetriever jobStatusRetriever, Optional<Timer> jobStatusPolledTimer) {
+ if (dag == null || dag.isEmpty()) {
+ return null;
+ }
+ Config jobConfig =
dag.getNodes().get(0).getValue().getJobSpec().getConfig();
+ String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+ String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+ long flowExecutionId =
jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+
+ return pollStatus(flowGroup, flowName, flowExecutionId,
JobStatusRetriever.NA_KEY, JobStatusRetriever.NA_KEY, jobStatusRetriever,
jobStatusPolledTimer);
+ }
+
+ /**
+ * Retrieve the flow's {@link JobStatus} and update the timer if
jobStatusPolledTimer is present.
+ */
+ public static JobStatus pollStatus(String flowGroup, String flowName, long
flowExecutionId, String jobGroup, String jobName,
+ JobStatusRetriever jobStatusRetriever, Optional<Timer>
jobStatusPolledTimer) {
+ long pollStartTime = System.nanoTime();
+ Iterator<JobStatus> jobStatusIterator =
+ jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup,
flowExecutionId, jobName, jobGroup);
+ Instrumented.updateTimer(jobStatusPolledTimer, System.nanoTime() -
pollStartTime, TimeUnit.NANOSECONDS);
+
+ if (jobStatusIterator.hasNext()) {
+ return jobStatusIterator.next();
+ } else {
+ return null;
Review Comment:
can we return an Optional here instead of null to make it obvious there may
be no status returned?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java:
##########
@@ -48,25 +54,42 @@
import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
import
org.apache.gobblin.service.modules.orchestration.DagManager.FailureOption;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
import org.apache.gobblin.util.ConfigUtils;
+import static org.apache.gobblin.service.ExecutionStatus.PENDING;
+
public class DagManagerUtils {
static long DEFAULT_FLOW_SLA_MILLIS = TimeUnit.HOURS.toMillis(24);
static String QUOTA_KEY_SEPERATOR = ",";
- static FlowId getFlowId(Dag<JobExecutionPlan> dag) {
+ public static FlowId getFlowId(Dag<JobExecutionPlan> dag) {
return getFlowId(dag.getStartNodes().get(0));
}
+ public static DagActionStore.DagAction
createDagActionFromDag(Dag<JobExecutionPlan> dag, DagActionStore.FlowActionType
flowActionType) {
+ return createDagActionFromDagNode(dag.getStartNodes().get(0),
flowActionType);
+ }
+
+ // todo - dag action object does not have any identifier to tell if it is
for a complete dag or just for one dag node
Review Comment:
make this `TODO:` so recognized by IntelliJ
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]