This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new e3a8f1cb5 [GOBBLIN-1748] Add logs to debug multi-hop flows creation, 
progression, and cleanup (#3608)
e3a8f1cb5 is described below

commit e3a8f1cb5bdd99ee1662d918b9e401b6a14be92e
Author: umustafi <[email protected]>
AuthorDate: Fri Dec 2 16:54:07 2022 -0800

    [GOBBLIN-1748] Add logs to debug multi-hop flows creation, progression, and 
cleanup (#3608)
    
    * Add logs to debug multi-hop flows creation, progression, and cleanup
    
    * respond to comments
    
    * fix unit tests
    
    Co-authored-by: Urmi Mustafi <[email protected]>
---
 .../apache/gobblin/service/modules/orchestration/DagManager.java  | 4 ++++
 .../gobblin/service/modules/orchestration/DagManagerUtils.java    | 2 +-
 .../gobblin/service/modules/spec/JobExecutionPlanDagFactory.java  | 8 ++++++++
 .../service/modules/spec/JobExecutionPlanDagFactoryTest.java      | 7 ++++++-
 4 files changed, 19 insertions(+), 2 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 7a20ea945..5df4fb35f 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -950,11 +950,14 @@ public class DagManager extends AbstractIdleService {
     synchronized Map<String, Set<DagNode<JobExecutionPlan>>> submitNext(String 
dagId) throws IOException {
       Dag<JobExecutionPlan> dag = this.dags.get(dagId);
       Set<DagNode<JobExecutionPlan>> nextNodes = DagManagerUtils.getNext(dag);
+      List<String> nextJobNames = new ArrayList<>();
 
       //Submit jobs from the dag ready for execution.
       for (DagNode<JobExecutionPlan> dagNode : nextNodes) {
         submitJob(dagNode);
+        nextJobNames.add(DagManagerUtils.getJobName(dagNode));
       }
+      log.info("Submitting next nodes for dagId {}, where next jobs to be 
submitted are {}", dagId, nextJobNames);
       //Checkpoint the dag state
       this.dagStateStore.writeCheckpoint(dag);
 
@@ -1167,6 +1170,7 @@ public class DagManager extends AbstractIdleService {
      * @param dagId
      */
     private synchronized void cleanUpDag(String dagId) {
+      log.info("Cleaning up dagId {}", dagId);
       // clears flow event after cancelled job to allow resume event status to 
be set
       this.dags.get(dagId).setFlowEvent(null);
        try {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index d6fb0c99f..3dd0fb592 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -88,7 +88,7 @@ public class DagManagerUtils {
    * @param dag instance of a {@link Dag}.
    * @return a DagId object associated corresponding to the {@link Dag} 
instance.
    */
-  static DagManager.DagId generateDagId(Dag<JobExecutionPlan> dag) {
+  public static DagManager.DagId generateDagId(Dag<JobExecutionPlan> dag) {
     return 
generateDagId(dag.getStartNodes().get(0).getValue().getJobSpec().getConfig());
   }
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java
index fede6313d..8af2ee7a8 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java
@@ -32,6 +32,7 @@ import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
 
 
 /**
@@ -67,11 +68,13 @@ public class JobExecutionPlanDagFactory {
      *
      * TODO: we likely do not need 2 for loops and we can do this in 1 pass.
      */
+    List<String> jobNames = new ArrayList<>();
     for (JobExecutionPlan jobExecutionPlan : jobExecutionPlans) {
       String jobName = getJobName(jobExecutionPlan);
       if (jobName == null) {
         continue;
       }
+      jobNames.add(jobName);
       Dag.DagNode<JobExecutionPlan> node = jobExecutionPlanMap.get(jobName);
       Collection<String> dependencies = 
getDependencies(jobExecutionPlan.getJobSpec().getConfig());
       for (String dependency : dependencies) {
@@ -80,6 +83,11 @@ public class JobExecutionPlanDagFactory {
       }
     }
     Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList);
+    if (!dagNodeList.isEmpty()) {
+      log.info("Dag plan created with id {} and jobs: {}", 
DagManagerUtils.generateDagId(dag), jobNames);
+    } else {
+      log.info("Empty dag plan created for execution plans {}", 
jobExecutionPlans);
+    }
     return dag;
   }
 
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
index 0958995a2..7463a2565 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
@@ -86,8 +86,13 @@ public class JobExecutionPlanDagFactoryTest {
     //Create a list of JobExecutionPlans
     List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
     for (JobTemplate jobTemplate: this.jobTemplates) {
+      Config config = jobTemplate.getRawTemplateConfig()
+          .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef("testFlowName"))
+          .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef("testFlowGroup"))
+          .withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
ConfigValueFactory.fromAnyRef(System.currentTimeMillis()));
+
       String jobSpecUri = Files.getNameWithoutExtension(new 
Path(jobTemplate.getUri()).getName());
-      jobExecutionPlans.add(new 
JobExecutionPlan(JobSpec.builder(jobSpecUri).withConfig(jobTemplate.getRawTemplateConfig()).
+      jobExecutionPlans.add(new 
JobExecutionPlan(JobSpec.builder(jobSpecUri).withConfig(config).
           withVersion("1").withTemplate(jobTemplate.getUri()).build(), 
specExecutor));
     }
 

Reply via email to