[ 
https://issues.apache.org/jira/browse/GOBBLIN-2015?focusedWorklogId=909498&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-909498
 ]

ASF GitHub Bot logged work on GOBBLIN-2015:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 12/Mar/24 20:44
            Start Date: 12/Mar/24 20:44
    Worklog Time Spent: 10m 
      Work Description: umustafi commented on code in PR #3893:
URL: https://github.com/apache/gobblin/pull/3893#discussion_r1522074443


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java:
##########
@@ -40,36 +65,127 @@
 @Alpha
 public class LaunchDagProc extends DagProc<Optional<Dag<JobExecutionPlan>>, 
Optional<Dag<JobExecutionPlan>>> {
   private final LaunchDagTask launchDagTask;
-  private final AtomicLong orchestrationDelayCounter;
+  FlowCompilationValidationHelper flowCompilationValidationHelper;
 
-  public LaunchDagProc(LaunchDagTask launchDagTask) {
+  public LaunchDagProc(LaunchDagTask launchDagTask, 
FlowCompilationValidationHelper flowCompilationValidationHelper) {
     this.launchDagTask = launchDagTask;
-    this.orchestrationDelayCounter = new AtomicLong(0);
+    AtomicLong orchestrationDelayCounter = new AtomicLong(0);

Review Comment:
   The name of this `orchestrationDelayMetric/Counter` is not accurate if we 
are using. The "orchestration" step we are talking about should have occurred 
already for the flow in the `Orchestrator` and there we measured how long the 
compilation takes. We may however want to emit a separate metric if it feels 
helpful  



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java:
##########
@@ -40,36 +65,126 @@
 @Alpha
 public class LaunchDagProc extends DagProc<Optional<Dag<JobExecutionPlan>>, 
Optional<Dag<JobExecutionPlan>>> {
   private final LaunchDagTask launchDagTask;
-  private final AtomicLong orchestrationDelayCounter;
+  FlowCompilationValidationHelper flowCompilationValidationHelper;
 
-  public LaunchDagProc(LaunchDagTask launchDagTask) {
+  public LaunchDagProc(LaunchDagTask launchDagTask, 
FlowCompilationValidationHelper flowCompilationValidationHelper) {
     this.launchDagTask = launchDagTask;
-    this.orchestrationDelayCounter = new AtomicLong(0);
+    AtomicLong orchestrationDelayCounter = new AtomicLong(0);
     ContextAwareGauge<Long> orchestrationDelayMetric = 
metricContext.newContextAwareGauge
         (ServiceMetricNames.FLOW_ORCHESTRATION_DELAY, 
orchestrationDelayCounter::get);
     metricContext.register(orchestrationDelayMetric);
+    this.flowCompilationValidationHelper = flowCompilationValidationHelper;
   }
 
   @Override
   protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore 
dagManagementStateStore)
       throws IOException {
-    throw new UnsupportedOperationException("Not yet implemented");
+    try {
+      DagActionStore.DagAction dagAction = this.launchDagTask.getDagAction();
+      URI flowUri = FlowSpec.Utils.createFlowSpecUri(dagAction.getFlowId());
+      FlowSpec flowSpec = dagManagementStateStore.getFlowSpec(flowUri);
+      flowSpec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
dagAction.getFlowExecutionId());
+      return 
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec).toJavaUtil();
+    } catch (URISyntaxException | SpecNotFoundException | InterruptedException 
e) {
+      throw new RuntimeException(e);
+    }
   }
 
   @Override
   protected Optional<Dag<JobExecutionPlan>> act(DagManagementStateStore 
dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag)
       throws IOException {
-    throw new UnsupportedOperationException("Not yet implemented");
+    if (!dag.isPresent()) {
+      log.warn("No dag with id " + this.launchDagTask.getDagId() + " found to 
launch");
+      return Optional.empty();

Review Comment:
   We have Flow group, executor, and service level metrics at present in 
`DagManagerMetrics`. The entire metrics class instance should be passed to the 
DagProc to increment accordingly. 



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java:
##########
@@ -40,36 +65,126 @@
 @Alpha
 public class LaunchDagProc extends DagProc<Optional<Dag<JobExecutionPlan>>, 
Optional<Dag<JobExecutionPlan>>> {
   private final LaunchDagTask launchDagTask;
-  private final AtomicLong orchestrationDelayCounter;
+  FlowCompilationValidationHelper flowCompilationValidationHelper;
 
-  public LaunchDagProc(LaunchDagTask launchDagTask) {
+  public LaunchDagProc(LaunchDagTask launchDagTask, 
FlowCompilationValidationHelper flowCompilationValidationHelper) {
     this.launchDagTask = launchDagTask;
-    this.orchestrationDelayCounter = new AtomicLong(0);
+    AtomicLong orchestrationDelayCounter = new AtomicLong(0);
     ContextAwareGauge<Long> orchestrationDelayMetric = 
metricContext.newContextAwareGauge
         (ServiceMetricNames.FLOW_ORCHESTRATION_DELAY, 
orchestrationDelayCounter::get);
     metricContext.register(orchestrationDelayMetric);
+    this.flowCompilationValidationHelper = flowCompilationValidationHelper;
   }
 
   @Override
   protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore 
dagManagementStateStore)
       throws IOException {
-    throw new UnsupportedOperationException("Not yet implemented");
+    try {
+      DagActionStore.DagAction dagAction = this.launchDagTask.getDagAction();
+      URI flowUri = FlowSpec.Utils.createFlowSpecUri(dagAction.getFlowId());
+      FlowSpec flowSpec = dagManagementStateStore.getFlowSpec(flowUri);
+      flowSpec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
dagAction.getFlowExecutionId());
+      return 
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec).toJavaUtil();
+    } catch (URISyntaxException | SpecNotFoundException | InterruptedException 
e) {
+      throw new RuntimeException(e);
+    }
   }
 
   @Override
   protected Optional<Dag<JobExecutionPlan>> act(DagManagementStateStore 
dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag)
       throws IOException {
-    throw new UnsupportedOperationException("Not yet implemented");
+    if (!dag.isPresent()) {
+      log.warn("No dag with id " + this.launchDagTask.getDagId() + " found to 
launch");
+      return Optional.empty();
+    }
+    DagManager.DagId dagId = DagManagerUtils.generateDagId(dag.get());
+    Set<Dag.DagNode<JobExecutionPlan>> nextSubmitted = 
submitNext(dagManagementStateStore, dag.get());
+    for (Dag.DagNode<JobExecutionPlan> dagNode : nextSubmitted) {
+      dagManagementStateStore.addDagNodeState(dagNode, dagId);  // compare 
this - arjun1
+    }
+
+    log.info("Dag {} processed.", dagId);
+    return dag;
   }
 
-  @Override
-  protected void sendNotification(Optional<Dag<JobExecutionPlan>> result, 
EventSubmitter eventSubmitter)
-      throws IOException {
-    throw new UnsupportedOperationException("Not yet implemented");
+  /**
+   * Submit next set of Dag nodes in the Dag identified by the provided dagId
+   */
+   private Set<Dag.DagNode<JobExecutionPlan>> 
submitNext(DagManagementStateStore dagManagementStateStore,
+       Dag<JobExecutionPlan> dag) throws IOException {
+     DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
+     Set<Dag.DagNode<JobExecutionPlan>> nextNodes = 
DagManagerUtils.getNext(dag);
+     List<String> nextJobNames = new ArrayList<>();
+
+     //Submit jobs from the dag ready for execution.
+     for (Dag.DagNode<JobExecutionPlan> dagNode : nextNodes) {
+       submitJob(dagManagementStateStore, dagNode);
+       nextJobNames.add(DagManagerUtils.getJobName(dagNode));
+     }
+
+     log.info("Submitting next nodes for dagId {}, where next jobs to be 
submitted are {}", dagId, nextJobNames);
+
+     //Checkpoint the dag state, it should have an updated value of dag nodes
+     dagManagementStateStore.checkpointDag(dag);
+
+     return nextNodes;
+  }
+
+  /**
+   * Submits a {@link JobSpec} to a {@link SpecExecutor}.
+   */
+  private void submitJob(DagManagementStateStore dagManagementStateStore, 
Dag.DagNode<JobExecutionPlan> dagNode) {

Review Comment:
   rename `submitJobToExecutor`?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java:
##########
@@ -40,36 +65,126 @@
 @Alpha
 public class LaunchDagProc extends DagProc<Optional<Dag<JobExecutionPlan>>, 
Optional<Dag<JobExecutionPlan>>> {
   private final LaunchDagTask launchDagTask;
-  private final AtomicLong orchestrationDelayCounter;
+  FlowCompilationValidationHelper flowCompilationValidationHelper;
 
-  public LaunchDagProc(LaunchDagTask launchDagTask) {
+  public LaunchDagProc(LaunchDagTask launchDagTask, 
FlowCompilationValidationHelper flowCompilationValidationHelper) {
     this.launchDagTask = launchDagTask;
-    this.orchestrationDelayCounter = new AtomicLong(0);
+    AtomicLong orchestrationDelayCounter = new AtomicLong(0);
     ContextAwareGauge<Long> orchestrationDelayMetric = 
metricContext.newContextAwareGauge
         (ServiceMetricNames.FLOW_ORCHESTRATION_DELAY, 
orchestrationDelayCounter::get);
     metricContext.register(orchestrationDelayMetric);
+    this.flowCompilationValidationHelper = flowCompilationValidationHelper;
   }
 
   @Override
   protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore 
dagManagementStateStore)
       throws IOException {
-    throw new UnsupportedOperationException("Not yet implemented");
+    try {
+      DagActionStore.DagAction dagAction = this.launchDagTask.getDagAction();
+      URI flowUri = FlowSpec.Utils.createFlowSpecUri(dagAction.getFlowId());
+      FlowSpec flowSpec = dagManagementStateStore.getFlowSpec(flowUri);
+      flowSpec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
dagAction.getFlowExecutionId());
+      return 
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec).toJavaUtil();
+    } catch (URISyntaxException | SpecNotFoundException | InterruptedException 
e) {
+      throw new RuntimeException(e);

Review Comment:
   I agree, let's start delving into retrying errors or at least planning it in 
this PR. `Process` should be able to distinguish retry-able errors from 
non-retry able ones. 



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java:
##########
@@ -40,36 +65,126 @@
 @Alpha
 public class LaunchDagProc extends DagProc<Optional<Dag<JobExecutionPlan>>, 
Optional<Dag<JobExecutionPlan>>> {
   private final LaunchDagTask launchDagTask;
-  private final AtomicLong orchestrationDelayCounter;
+  FlowCompilationValidationHelper flowCompilationValidationHelper;
 
-  public LaunchDagProc(LaunchDagTask launchDagTask) {
+  public LaunchDagProc(LaunchDagTask launchDagTask, 
FlowCompilationValidationHelper flowCompilationValidationHelper) {
     this.launchDagTask = launchDagTask;
-    this.orchestrationDelayCounter = new AtomicLong(0);
+    AtomicLong orchestrationDelayCounter = new AtomicLong(0);
     ContextAwareGauge<Long> orchestrationDelayMetric = 
metricContext.newContextAwareGauge
         (ServiceMetricNames.FLOW_ORCHESTRATION_DELAY, 
orchestrationDelayCounter::get);
     metricContext.register(orchestrationDelayMetric);
+    this.flowCompilationValidationHelper = flowCompilationValidationHelper;
   }
 
   @Override
   protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore 
dagManagementStateStore)
       throws IOException {
-    throw new UnsupportedOperationException("Not yet implemented");
+    try {
+      DagActionStore.DagAction dagAction = this.launchDagTask.getDagAction();
+      URI flowUri = FlowSpec.Utils.createFlowSpecUri(dagAction.getFlowId());
+      FlowSpec flowSpec = dagManagementStateStore.getFlowSpec(flowUri);
+      flowSpec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
dagAction.getFlowExecutionId());
+      return 
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec).toJavaUtil();
+    } catch (URISyntaxException | SpecNotFoundException | InterruptedException 
e) {
+      throw new RuntimeException(e);
+    }
   }
 
   @Override
   protected Optional<Dag<JobExecutionPlan>> act(DagManagementStateStore 
dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag)
       throws IOException {
-    throw new UnsupportedOperationException("Not yet implemented");
+    if (!dag.isPresent()) {
+      log.warn("No dag with id " + this.launchDagTask.getDagId() + " found to 
launch");
+      return Optional.empty();
+    }
+    DagManager.DagId dagId = DagManagerUtils.generateDagId(dag.get());
+    Set<Dag.DagNode<JobExecutionPlan>> nextSubmitted = 
submitNext(dagManagementStateStore, dag.get());
+    for (Dag.DagNode<JobExecutionPlan> dagNode : nextSubmitted) {
+      dagManagementStateStore.addDagNodeState(dagNode, dagId);  // compare 
this - arjun1
+    }
+
+    log.info("Dag {} processed.", dagId);
+    return dag;
   }
 
-  @Override
-  protected void sendNotification(Optional<Dag<JobExecutionPlan>> result, 
EventSubmitter eventSubmitter)
-      throws IOException {
-    throw new UnsupportedOperationException("Not yet implemented");
+  /**
+   * Submit next set of Dag nodes in the Dag identified by the provided dagId
+   */
+   private Set<Dag.DagNode<JobExecutionPlan>> 
submitNext(DagManagementStateStore dagManagementStateStore,
+       Dag<JobExecutionPlan> dag) throws IOException {
+     DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
+     Set<Dag.DagNode<JobExecutionPlan>> nextNodes = 
DagManagerUtils.getNext(dag);
+     List<String> nextJobNames = new ArrayList<>();
+
+     //Submit jobs from the dag ready for execution.
+     for (Dag.DagNode<JobExecutionPlan> dagNode : nextNodes) {
+       submitJob(dagManagementStateStore, dagNode);
+       nextJobNames.add(DagManagerUtils.getJobName(dagNode));
+     }
+
+     log.info("Submitting next nodes for dagId {}, where next jobs to be 
submitted are {}", dagId, nextJobNames);
+
+     //Checkpoint the dag state, it should have an updated value of dag nodes
+     dagManagementStateStore.checkpointDag(dag);
+
+     return nextNodes;
+  }
+
+  /**
+   * Submits a {@link JobSpec} to a {@link SpecExecutor}.
+   */
+  private void submitJob(DagManagementStateStore dagManagementStateStore, 
Dag.DagNode<JobExecutionPlan> dagNode) {
+    DagManagerUtils.incrementJobAttempt(dagNode);
+    JobExecutionPlan jobExecutionPlan = 
DagManagerUtils.getJobExecutionPlan(dagNode);
+    jobExecutionPlan.setExecutionStatus(ExecutionStatus.RUNNING);
+    JobSpec jobSpec = DagManagerUtils.getJobSpec(dagNode);
+    Map<String, String> jobMetadata = 
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
+
+    String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
+
+    // Run this spec on selected executor
+    SpecProducer<Spec> producer;
+    try {
+      dagManagementStateStore.tryAcquireQuota(Collections.singleton(dagNode));
+      producer = DagManagerUtils.getSpecProducer(dagNode);
+      TimingEvent jobOrchestrationTimer = 
eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_ORCHESTRATED);
+
+      // Increment job count before submitting the job onto the spec producer, 
in case that throws an exception.
+      // By this point the quota is allocated, so it's imperative to increment 
as missing would introduce the potential to decrement below zero upon quota 
release.
+      // Quota release is guaranteed, despite failure, because exception 
handling within would mark the job FAILED.
+      // When the ensuing kafka message spurs DagManager processing, the quota 
is released and the counts decremented
+      // Ensure that we do not double increment for flows that are retried
+      if (dagNode.getValue().getCurrentAttempts() == 1) {
+        
DagManagementTaskStreamImpl.getDagManagerMetrics().incrementRunningJobMetrics(dagNode);
+      }
+      // Submit the job to the SpecProducer, which in turn performs the actual 
job submission to the SpecExecutor instance.
+      // The SpecProducer implementations submit the job to the underlying 
executor and return when the submission is complete,
+      // either successfully or unsuccessfully. To catch any exceptions in the 
job submission, the DagManagerThread
+      // blocks (by calling Future#get()) until the submission is completed.
+      Future<?> addSpecFuture = producer.addSpec(jobSpec);
+      
dagNode.getValue().setJobFuture(com.google.common.base.Optional.of(addSpecFuture));
+
+      addSpecFuture.get();
+
+      jobMetadata.put(TimingEvent.METADATA_MESSAGE, 
producer.getExecutionLink(addSpecFuture, specExecutorUri));
+      // Add serialized job properties as part of the orchestrated job event 
metadata
+      jobMetadata.put(JobExecutionPlan.JOB_PROPS_KEY, 
dagNode.getValue().toString());
+      jobOrchestrationTimer.stop(jobMetadata);
+      log.info("Orchestrated job: {} on Executor: {}", 
DagManagerUtils.getFullyQualifiedJobName(dagNode), specExecutorUri);
+      
DagManagementTaskStreamImpl.getDagManagerMetrics().incrementJobsSentToExecutor(dagNode);
+    } catch (Exception e) {
+      TimingEvent jobFailedTimer = 
eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED);
+      String message = "Cannot submit job " + 
DagManagerUtils.getFullyQualifiedJobName(dagNode) + " on executor " + 
specExecutorUri;
+      log.error(message, e);
+      jobMetadata.put(TimingEvent.METADATA_MESSAGE, message + " due to " + 
e.getMessage());
+      if (jobFailedTimer != null) {
+        jobFailedTimer.stop(jobMetadata);

Review Comment:
   no error is thrown here so how does the `DagProc::process` know to complete 
the lease or not? This seems like a case of a retry-able error



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java:
##########
@@ -40,36 +65,126 @@
 @Alpha
 public class LaunchDagProc extends DagProc<Optional<Dag<JobExecutionPlan>>, 
Optional<Dag<JobExecutionPlan>>> {
   private final LaunchDagTask launchDagTask;
-  private final AtomicLong orchestrationDelayCounter;
+  FlowCompilationValidationHelper flowCompilationValidationHelper;
 
-  public LaunchDagProc(LaunchDagTask launchDagTask) {
+  public LaunchDagProc(LaunchDagTask launchDagTask, 
FlowCompilationValidationHelper flowCompilationValidationHelper) {
     this.launchDagTask = launchDagTask;
-    this.orchestrationDelayCounter = new AtomicLong(0);
+    AtomicLong orchestrationDelayCounter = new AtomicLong(0);
     ContextAwareGauge<Long> orchestrationDelayMetric = 
metricContext.newContextAwareGauge
         (ServiceMetricNames.FLOW_ORCHESTRATION_DELAY, 
orchestrationDelayCounter::get);
     metricContext.register(orchestrationDelayMetric);
+    this.flowCompilationValidationHelper = flowCompilationValidationHelper;
   }
 
   @Override
   protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore 
dagManagementStateStore)
       throws IOException {
-    throw new UnsupportedOperationException("Not yet implemented");
+    try {
+      DagActionStore.DagAction dagAction = this.launchDagTask.getDagAction();
+      URI flowUri = FlowSpec.Utils.createFlowSpecUri(dagAction.getFlowId());
+      FlowSpec flowSpec = dagManagementStateStore.getFlowSpec(flowUri);
+      flowSpec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
dagAction.getFlowExecutionId());
+      return 
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec).toJavaUtil();
+    } catch (URISyntaxException | SpecNotFoundException | InterruptedException 
e) {
+      throw new RuntimeException(e);
+    }
   }
 
   @Override
   protected Optional<Dag<JobExecutionPlan>> act(DagManagementStateStore 
dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag)
       throws IOException {
-    throw new UnsupportedOperationException("Not yet implemented");
+    if (!dag.isPresent()) {
+      log.warn("No dag with id " + this.launchDagTask.getDagId() + " found to 
launch");
+      return Optional.empty();
+    }
+    DagManager.DagId dagId = DagManagerUtils.generateDagId(dag.get());
+    Set<Dag.DagNode<JobExecutionPlan>> nextSubmitted = 
submitNext(dagManagementStateStore, dag.get());
+    for (Dag.DagNode<JobExecutionPlan> dagNode : nextSubmitted) {
+      dagManagementStateStore.addDagNodeState(dagNode, dagId);  // compare 
this - arjun1
+    }
+
+    log.info("Dag {} processed.", dagId);

Review Comment:
   This statement is a bit ambiguous. Let's be specific: 
   `Launch dagProc concluded actions for dagID: DagID {}` we should have 
similar message for each type of DagProc. Do we want a log after initialization 
as well? Let's think carefully so we don't flood with logs.





Issue Time Tracking
-------------------

            Worklog Id:     (was: 909498)
    Remaining Estimate: 0h
            Time Spent: 10m

> implement LaunchDagProc
> -----------------------
>
>                 Key: GOBBLIN-2015
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2015
>             Project: Apache Gobblin
>          Issue Type: Task
>            Reporter: Arjun Singh Bora
>            Priority: Major
>          Time Spent: 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to