[ 
https://issues.apache.org/jira/browse/GOBBLIN-2015?focusedWorklogId=909746&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-909746
 ]

ASF GitHub Bot logged work on GOBBLIN-2015:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 13/Mar/24 21:21
            Start Date: 13/Mar/24 21:21
    Worklog Time Spent: 10m 
      Work Description: umustafi commented on code in PR #3893:
URL: https://github.com/apache/gobblin/pull/3893#discussion_r1523932275


##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.spec_executorInstance.MockedSpecExecutor;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.AzkabanProjectConfig;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
+import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStoreTest;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
+import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
+import org.apache.gobblin.util.ConfigUtils;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+
+public class LaunchDagProcTest {
+  MostlyMySqlDagManagementStateStore dagManagementStateStore;
+  @BeforeClass
+  public void setUp() throws Exception {
+    this.dagManagementStateStore = 
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(TestMetastoreDatabaseFactory.get()));
+    
doReturn(FlowSpec.builder().build()).when(this.dagManagementStateStore).getFlowSpec(any());
+    doNothing().when(this.dagManagementStateStore).tryAcquireQuota(any());
+    doNothing().when(this.dagManagementStateStore).addDagNodeState(any(), 
any());
+  }
+  @Test
+  public void launchDag()
+      throws IOException, InterruptedException, URISyntaxException {
+    // this creates a dag with 3 start nodes
+    Dag<JobExecutionPlan> dag1 = 
buildDagWithMultipleNodesAtDifferentLevels("1", System.currentTimeMillis(), 
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+        "user5", 
ConfigFactory.empty().withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef("group2")));
+    FlowCompilationValidationHelper flowCompilationValidationHelper = 
mock(FlowCompilationValidationHelper.class);
+    
doReturn(com.google.common.base.Optional.of(dag1)).when(flowCompilationValidationHelper).createExecutionPlanIfValid(any());
+    LaunchDagProc launchDagProc = new LaunchDagProc(new LaunchDagTask(new 
DagActionStore.DagAction("fg", "fn",
+        "12345", DagActionStore.FlowActionType.LAUNCH), null), 
flowCompilationValidationHelper);
+
+    launchDagProc.process(this.dagManagementStateStore);
+    int expectedNumOfSavingDagNodeStates = 3; // = number of start nodes
+    Assert.assertEquals(expectedNumOfSavingDagNodeStates,
+        
Mockito.mockingDetails(this.dagManagementStateStore).getInvocations().stream()
+            .filter(a -> 
a.getMethod().getName().equals("addDagNodeState")).count());
+  }
+
+  // This creates a dag like this
+  //  D1  D2 D3
+  //    \ | /
+  //     DN4
+  //    / | \
+  //  D5 D6  D7

Review Comment:
   helpful depiction



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java:
##########
@@ -46,8 +47,11 @@ public final void process(DagManagementStateStore 
dagManagementStateStore) throw
     T result = act(dagManagementStateStore, state);   // todo - retry
     commit(dagManagementStateStore, result);   // todo - retry
     sendNotification(result, eventSubmitter);   // todo - retry
+    log.info("{} concluded actions for dagId : {}", getClass(), getDagId());

Review Comment:
   `successfully concluded actions` or `concluded actions and committed ` to 
not get confused with `act` 



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java:
##########
@@ -18,58 +18,165 @@
 package org.apache.gobblin.service.modules.orchestration.proc;
 
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.collect.Maps;
+
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 
-import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
 import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
 
 
 /**
- * An implementation for {@link LaunchDagTask}
+ * An implementation for {@link DagProc} that launches a new job.
  */
 @Slf4j
-@Alpha
+@RequiredArgsConstructor
 public class LaunchDagProc extends DagProc<Optional<Dag<JobExecutionPlan>>, 
Optional<Dag<JobExecutionPlan>>> {
   private final LaunchDagTask launchDagTask;
-  private final AtomicLong orchestrationDelayCounter;
-
-  public LaunchDagProc(LaunchDagTask launchDagTask) {
-    this.launchDagTask = launchDagTask;
-    this.orchestrationDelayCounter = new AtomicLong(0);
-    ContextAwareGauge<Long> orchestrationDelayMetric = 
metricContext.newContextAwareGauge
-        (ServiceMetricNames.FLOW_ORCHESTRATION_DELAY, 
orchestrationDelayCounter::get);
-    metricContext.register(orchestrationDelayMetric);
+  private final FlowCompilationValidationHelper 
flowCompilationValidationHelper;
+  private static final AtomicLong orchestrationDelayCounter = new 
AtomicLong(0);
+  static {
+    metricContext.register(
+        
metricContext.newContextAwareGauge(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY, 
orchestrationDelayCounter::get));
+  }
+
+  @Override
+  protected DagManager.DagId getDagId() {
+    return this.launchDagTask.getDagId();
   }
 
   @Override
   protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore 
dagManagementStateStore)
       throws IOException {
-    throw new UnsupportedOperationException("Not yet implemented");
+    try {
+      DagActionStore.DagAction dagAction = this.launchDagTask.getDagAction();
+      URI flowUri = FlowSpec.Utils.createFlowSpecUri(dagAction.getFlowId());
+      FlowSpec flowSpec = dagManagementStateStore.getFlowSpec(flowUri);
+      flowSpec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
dagAction.getFlowExecutionId());
+      return 
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec).toJavaUtil();
+    } catch (URISyntaxException | SpecNotFoundException | InterruptedException 
e) {
+      throw new RuntimeException(e);
+    }
   }
 
   @Override
   protected Optional<Dag<JobExecutionPlan>> act(DagManagementStateStore 
dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag)
       throws IOException {
-    throw new UnsupportedOperationException("Not yet implemented");
+    if (!dag.isPresent()) {
+      log.warn("Dag with id " + getDagId() + " could not be compiled.");
+      // todo - add metrics
+      return Optional.empty();
+    }
+    submitNextNodes(dagManagementStateStore, dag.get());
+    return dag;
   }
 
-  @Override
-  protected void sendNotification(Optional<Dag<JobExecutionPlan>> result, 
EventSubmitter eventSubmitter)
-      throws IOException {
-    throw new UnsupportedOperationException("Not yet implemented");
+  /**
+   * Submit next set of Dag nodes in the provided Dag.
+   */
+   private void submitNextNodes(DagManagementStateStore 
dagManagementStateStore,
+       Dag<JobExecutionPlan> dag) throws IOException {
+     Set<Dag.DagNode<JobExecutionPlan>> nextNodes = 
DagManagerUtils.getNext(dag);
+
+     //Submit jobs from the dag ready for execution.
+     for (Dag.DagNode<JobExecutionPlan> dagNode : nextNodes) {
+       submitJobToExecutor(dagManagementStateStore, dagNode);
+       dagManagementStateStore.addDagNodeState(dagNode, getDagId());
+       log.info("Submitted job {} for dagId {}", 
DagManagerUtils.getJobName(dagNode), getDagId());
+     }
+
+     //Checkpoint the dag state, it should have an updated value of dag nodes
+     dagManagementStateStore.checkpointDag(dag);
+   }
+
+  /**
+   * Submits a {@link JobSpec} to a {@link SpecExecutor}.
+   */
+  private void submitJobToExecutor(DagManagementStateStore 
dagManagementStateStore, Dag.DagNode<JobExecutionPlan> dagNode) {

Review Comment:
   if this is to be called from multiple `DagProcs` in the future we should put 
it in a utility class that can be re-used by `LaunchFlow, LaunchJob & 
Reevaluate` procs





Issue Time Tracking
-------------------

    Worklog Id:     (was: 909746)
    Time Spent: 3.5h  (was: 3h 20m)

> implement LaunchDagProc
> -----------------------
>
>                 Key: GOBBLIN-2015
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2015
>             Project: Apache Gobblin
>          Issue Type: Task
>            Reporter: Arjun Singh Bora
>            Priority: Major
>          Time Spent: 3.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to