[ 
https://issues.apache.org/jira/browse/GOBBLIN-2015?focusedWorklogId=909733&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-909733
 ]

ASF GitHub Bot logged work on GOBBLIN-2015:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 13/Mar/24 20:24
            Start Date: 13/Mar/24 20:24
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3893:
URL: https://github.com/apache/gobblin/pull/3893#discussion_r1523832314


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java:
##########
@@ -68,16 +69,18 @@ public class MostlyMySqlDagManagementStateStore implements 
DagManagementStateSto
   private static final String FAILED_DAG_STATESTORE_PREFIX = 
"failedDagStateStore";
   public static final String DAG_STATESTORE_CLASS_KEY = 
DagManager.DAG_MANAGER_PREFIX + "dagStateStoreClass";
   FlowCatalog flowCatalog;
+  @Getter private final DagManagerMetrics dagManagerMetrics = new 
DagManagerMetrics();

Review Comment:
   nit: aren't @nnotations supposed to go on the line above?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java:
##########
@@ -231,4 +234,9 @@ public void 
tryAcquireQuota(Collection<Dag.DagNode<JobExecutionPlan>> dagNodes)
   public boolean releaseQuota(Dag.DagNode<JobExecutionPlan> dagNode) throws 
IOException {
     return this.quotaManager.releaseQuota(dagNode);
   }
+
+//  @Override
+//  public DagManagerMetrics getMetrics() {
+//    return this.dagManagerMetrics;
+//  }

Review Comment:
   remove



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java:
##########
@@ -58,6 +58,7 @@ public class DagProcessingEngineTest {
   DagProcessingEngine dagProcessingEngine;
   DagTaskStream dagTaskStream;
   DagProcFactory dagProcFactory;
+  MostlyMySqlDagManagementStateStore dagManagementStateStore;

Review Comment:
   shouldn't all of these be `private`?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java:
##########
@@ -18,58 +18,165 @@
 package org.apache.gobblin.service.modules.orchestration.proc;
 
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.collect.Maps;
+
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 
-import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
 import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
 
 
 /**
- * An implementation for {@link LaunchDagTask}
+ * An implementation for {@link DagProc} that launches a new job.
  */
 @Slf4j
-@Alpha
+@RequiredArgsConstructor
 public class LaunchDagProc extends DagProc<Optional<Dag<JobExecutionPlan>>, 
Optional<Dag<JobExecutionPlan>>> {
   private final LaunchDagTask launchDagTask;
-  private final AtomicLong orchestrationDelayCounter;
-
-  public LaunchDagProc(LaunchDagTask launchDagTask) {
-    this.launchDagTask = launchDagTask;
-    this.orchestrationDelayCounter = new AtomicLong(0);
-    ContextAwareGauge<Long> orchestrationDelayMetric = 
metricContext.newContextAwareGauge
-        (ServiceMetricNames.FLOW_ORCHESTRATION_DELAY, 
orchestrationDelayCounter::get);
-    metricContext.register(orchestrationDelayMetric);
+  private final FlowCompilationValidationHelper 
flowCompilationValidationHelper;
+  private static final AtomicLong orchestrationDelayCounter = new 
AtomicLong(0);
+  static {
+    metricContext.register(
+        
metricContext.newContextAwareGauge(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY, 
orchestrationDelayCounter::get));
+  }

Review Comment:
   I don't see this being used anywhere... where are you planning for it to 
feature?
   
   also, would it belong within `DagManagerMetrics`?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java:
##########
@@ -46,8 +47,11 @@ public final void process(DagManagementStateStore 
dagManagementStateStore) throw
     T result = act(dagManagementStateStore, state);   // todo - retry
     commit(dagManagementStateStore, result);   // todo - retry
     sendNotification(result, eventSubmitter);   // todo - retry
+    log.info("{} concluded actions for dagId : {}", getClass(), getDagId());

Review Comment:
   `getClass().getSimpleName()` is probably the better length than the FQ name



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java:
##########
@@ -18,58 +18,165 @@
 package org.apache.gobblin.service.modules.orchestration.proc;
 
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.collect.Maps;
+
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 
-import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
 import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
 
 
 /**
- * An implementation for {@link LaunchDagTask}
+ * An implementation for {@link DagProc} that launches a new job.
  */
 @Slf4j
-@Alpha
+@RequiredArgsConstructor
 public class LaunchDagProc extends DagProc<Optional<Dag<JobExecutionPlan>>, 
Optional<Dag<JobExecutionPlan>>> {
   private final LaunchDagTask launchDagTask;
-  private final AtomicLong orchestrationDelayCounter;
-
-  public LaunchDagProc(LaunchDagTask launchDagTask) {
-    this.launchDagTask = launchDagTask;
-    this.orchestrationDelayCounter = new AtomicLong(0);
-    ContextAwareGauge<Long> orchestrationDelayMetric = 
metricContext.newContextAwareGauge
-        (ServiceMetricNames.FLOW_ORCHESTRATION_DELAY, 
orchestrationDelayCounter::get);
-    metricContext.register(orchestrationDelayMetric);
+  private final FlowCompilationValidationHelper 
flowCompilationValidationHelper;
+  private static final AtomicLong orchestrationDelayCounter = new 
AtomicLong(0);
+  static {
+    metricContext.register(
+        
metricContext.newContextAwareGauge(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY, 
orchestrationDelayCounter::get));
+  }
+
+  @Override
+  protected DagManager.DagId getDagId() {
+    return this.launchDagTask.getDagId();
   }
 
   @Override
   protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore 
dagManagementStateStore)
       throws IOException {
-    throw new UnsupportedOperationException("Not yet implemented");
+    try {
+      DagActionStore.DagAction dagAction = this.launchDagTask.getDagAction();
+      URI flowUri = FlowSpec.Utils.createFlowSpecUri(dagAction.getFlowId());
+      FlowSpec flowSpec = dagManagementStateStore.getFlowSpec(flowUri);
+      flowSpec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
dagAction.getFlowExecutionId());
+      return 
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec).toJavaUtil();
+    } catch (URISyntaxException | SpecNotFoundException | InterruptedException 
e) {
+      throw new RuntimeException(e);
+    }
   }
 
   @Override
   protected Optional<Dag<JobExecutionPlan>> act(DagManagementStateStore 
dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag)
       throws IOException {
-    throw new UnsupportedOperationException("Not yet implemented");
+    if (!dag.isPresent()) {
+      log.warn("Dag with id " + getDagId() + " could not be compiled.");
+      // todo - add metrics

Review Comment:
   didn't urmi say mention there's already a `DagManagerMetrics` for this?  if 
so, seems easy enough to mark in this pass...



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.spec_executorInstance.MockedSpecExecutor;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.AzkabanProjectConfig;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
+import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStoreTest;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
+import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
+import org.apache.gobblin.util.ConfigUtils;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+
+public class LaunchDagProcTest {
+  MostlyMySqlDagManagementStateStore dagManagementStateStore;
+  @BeforeClass
+  public void setUp() throws Exception {
+    this.dagManagementStateStore = 
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(TestMetastoreDatabaseFactory.get()));
+    
doReturn(FlowSpec.builder().build()).when(this.dagManagementStateStore).getFlowSpec(any());
+    doNothing().when(this.dagManagementStateStore).tryAcquireQuota(any());
+    doNothing().when(this.dagManagementStateStore).addDagNodeState(any(), 
any());
+  }
+  @Test

Review Comment:
   newline before



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java:
##########
@@ -18,58 +18,165 @@
 package org.apache.gobblin.service.modules.orchestration.proc;
 
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.collect.Maps;
+
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 
-import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
 import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
 
 
 /**
- * An implementation for {@link LaunchDagTask}
+ * An implementation for {@link DagProc} that launches a new job.
  */
 @Slf4j
-@Alpha
+@RequiredArgsConstructor
 public class LaunchDagProc extends DagProc<Optional<Dag<JobExecutionPlan>>, 
Optional<Dag<JobExecutionPlan>>> {
   private final LaunchDagTask launchDagTask;
-  private final AtomicLong orchestrationDelayCounter;
-
-  public LaunchDagProc(LaunchDagTask launchDagTask) {
-    this.launchDagTask = launchDagTask;
-    this.orchestrationDelayCounter = new AtomicLong(0);
-    ContextAwareGauge<Long> orchestrationDelayMetric = 
metricContext.newContextAwareGauge
-        (ServiceMetricNames.FLOW_ORCHESTRATION_DELAY, 
orchestrationDelayCounter::get);
-    metricContext.register(orchestrationDelayMetric);
+  private final FlowCompilationValidationHelper 
flowCompilationValidationHelper;
+  private static final AtomicLong orchestrationDelayCounter = new 
AtomicLong(0);
+  static {
+    metricContext.register(
+        
metricContext.newContextAwareGauge(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY, 
orchestrationDelayCounter::get));
+  }
+
+  @Override
+  protected DagManager.DagId getDagId() {
+    return this.launchDagTask.getDagId();
   }
 
   @Override
   protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore 
dagManagementStateStore)
       throws IOException {
-    throw new UnsupportedOperationException("Not yet implemented");
+    try {
+      DagActionStore.DagAction dagAction = this.launchDagTask.getDagAction();
+      URI flowUri = FlowSpec.Utils.createFlowSpecUri(dagAction.getFlowId());
+      FlowSpec flowSpec = dagManagementStateStore.getFlowSpec(flowUri);

Review Comment:
   I'd personally make the impl even more readable by abstracting this into:
   ```
   private FlowSpec loadFlowSpec() throws URISyntaxException, 
SpecNotFoundException;
   ```



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.spec_executorInstance.MockedSpecExecutor;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.AzkabanProjectConfig;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
+import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStoreTest;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
+import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
+import org.apache.gobblin.util.ConfigUtils;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+
+public class LaunchDagProcTest {
+  MostlyMySqlDagManagementStateStore dagManagementStateStore;

Review Comment:
   `private`?
   
   also newline before method def



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java:
##########
@@ -18,58 +18,165 @@
 package org.apache.gobblin.service.modules.orchestration.proc;
 
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.collect.Maps;
+
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 
-import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
 import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
 
 
 /**
- * An implementation for {@link LaunchDagTask}
+ * An implementation for {@link DagProc} that launches a new job.
  */
 @Slf4j
-@Alpha
+@RequiredArgsConstructor
 public class LaunchDagProc extends DagProc<Optional<Dag<JobExecutionPlan>>, 
Optional<Dag<JobExecutionPlan>>> {
   private final LaunchDagTask launchDagTask;
-  private final AtomicLong orchestrationDelayCounter;
-
-  public LaunchDagProc(LaunchDagTask launchDagTask) {
-    this.launchDagTask = launchDagTask;
-    this.orchestrationDelayCounter = new AtomicLong(0);
-    ContextAwareGauge<Long> orchestrationDelayMetric = 
metricContext.newContextAwareGauge
-        (ServiceMetricNames.FLOW_ORCHESTRATION_DELAY, 
orchestrationDelayCounter::get);
-    metricContext.register(orchestrationDelayMetric);
+  private final FlowCompilationValidationHelper 
flowCompilationValidationHelper;
+  private static final AtomicLong orchestrationDelayCounter = new 
AtomicLong(0);
+  static {
+    metricContext.register(
+        
metricContext.newContextAwareGauge(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY, 
orchestrationDelayCounter::get));
+  }
+
+  @Override
+  protected DagManager.DagId getDagId() {
+    return this.launchDagTask.getDagId();
   }
 
   @Override
   protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore 
dagManagementStateStore)
       throws IOException {
-    throw new UnsupportedOperationException("Not yet implemented");
+    try {
+      DagActionStore.DagAction dagAction = this.launchDagTask.getDagAction();
+      URI flowUri = FlowSpec.Utils.createFlowSpecUri(dagAction.getFlowId());
+      FlowSpec flowSpec = dagManagementStateStore.getFlowSpec(flowUri);
+      flowSpec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
dagAction.getFlowExecutionId());
+      return 
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec).toJavaUtil();
+    } catch (URISyntaxException | SpecNotFoundException | InterruptedException 
e) {
+      throw new RuntimeException(e);
+    }
   }
 
   @Override
   protected Optional<Dag<JobExecutionPlan>> act(DagManagementStateStore 
dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag)
       throws IOException {
-    throw new UnsupportedOperationException("Not yet implemented");
+    if (!dag.isPresent()) {
+      log.warn("Dag with id " + getDagId() + " could not be compiled.");
+      // todo - add metrics
+      return Optional.empty();
+    }
+    submitNextNodes(dagManagementStateStore, dag.get());
+    return dag;
   }
 
-  @Override
-  protected void sendNotification(Optional<Dag<JobExecutionPlan>> result, 
EventSubmitter eventSubmitter)
-      throws IOException {
-    throw new UnsupportedOperationException("Not yet implemented");
+  /**
+   * Submit next set of Dag nodes in the provided Dag.
+   */
+   private void submitNextNodes(DagManagementStateStore 
dagManagementStateStore,
+       Dag<JobExecutionPlan> dag) throws IOException {
+     Set<Dag.DagNode<JobExecutionPlan>> nextNodes = 
DagManagerUtils.getNext(dag);
+
+     //Submit jobs from the dag ready for execution.
+     for (Dag.DagNode<JobExecutionPlan> dagNode : nextNodes) {
+       submitJobToExecutor(dagManagementStateStore, dagNode);

Review Comment:
   I'm all for deferring on the impl until 
https://issues.apache.org/jira/browse/GOBBLIN-2017 , but given we've aligned on 
handling unbounded jobs separately such that this impl solely cover the case of 
a single job, let's for the moment fail-fast when > 1.
   
   e.g. create a method to do that, say `handleMultipleJobs`.  that way when 
you return to GOBBLIN-2017 that's where to insert the changes.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 909733)
    Time Spent: 2h 50m  (was: 2h 40m)

> implement LaunchDagProc
> -----------------------
>
>                 Key: GOBBLIN-2015
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2015
>             Project: Apache Gobblin
>          Issue Type: Task
>            Reporter: Arjun Singh Bora
>            Priority: Major
>          Time Spent: 2h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to