umustafi commented on code in PR #3893:
URL: https://github.com/apache/gobblin/pull/3893#discussion_r1523989117


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java:
##########
@@ -18,58 +18,165 @@
 package org.apache.gobblin.service.modules.orchestration.proc;
 
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.collect.Maps;
+
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 
-import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
 import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
 
 
 /**
- * An implementation for {@link LaunchDagTask}
+ * An implementation for {@link DagProc} that launches a new job.
  */
 @Slf4j
-@Alpha
+@RequiredArgsConstructor
 public class LaunchDagProc extends DagProc<Optional<Dag<JobExecutionPlan>>, 
Optional<Dag<JobExecutionPlan>>> {
   private final LaunchDagTask launchDagTask;
-  private final AtomicLong orchestrationDelayCounter;
-
-  public LaunchDagProc(LaunchDagTask launchDagTask) {
-    this.launchDagTask = launchDagTask;
-    this.orchestrationDelayCounter = new AtomicLong(0);
-    ContextAwareGauge<Long> orchestrationDelayMetric = 
metricContext.newContextAwareGauge
-        (ServiceMetricNames.FLOW_ORCHESTRATION_DELAY, 
orchestrationDelayCounter::get);
-    metricContext.register(orchestrationDelayMetric);
+  private final FlowCompilationValidationHelper 
flowCompilationValidationHelper;
+  private static final AtomicLong orchestrationDelayCounter = new 
AtomicLong(0);
+  static {
+    metricContext.register(
+        
metricContext.newContextAwareGauge(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY, 
orchestrationDelayCounter::get));
+  }
+
+  @Override
+  protected DagManager.DagId getDagId() {
+    return this.launchDagTask.getDagId();
   }
 
   @Override
   protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore 
dagManagementStateStore)
       throws IOException {
-    throw new UnsupportedOperationException("Not yet implemented");
+    try {
+      DagActionStore.DagAction dagAction = this.launchDagTask.getDagAction();
+      URI flowUri = FlowSpec.Utils.createFlowSpecUri(dagAction.getFlowId());
+      FlowSpec flowSpec = dagManagementStateStore.getFlowSpec(flowUri);
+      flowSpec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
dagAction.getFlowExecutionId());
+      return 
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec).toJavaUtil();
+    } catch (URISyntaxException | SpecNotFoundException | InterruptedException 
e) {
+      throw new RuntimeException(e);
+    }
   }
 
   @Override
   protected Optional<Dag<JobExecutionPlan>> act(DagManagementStateStore 
dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag)
       throws IOException {
-    throw new UnsupportedOperationException("Not yet implemented");
+    if (!dag.isPresent()) {
+      log.warn("Dag with id " + getDagId() + " could not be compiled.");
+      // todo - add metrics

Review Comment:
   I want to distinguish here that `Orchestrator.orchestrate` will always be 
occurring before this step (its where multi-active scheduling occurs) in 
response to the GobblinServiceJobScheduler callback for a adhoc/scheduled flow 
trigger. This metric was named incorrectly before we can rename in a future PR 
if you are planning on fixing these metrics later. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to