This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new e3a8f1cb5 [GOBBLIN-1748] Add logs to debug multi-hop flows creation,
progression, and cleanup (#3608)
e3a8f1cb5 is described below
commit e3a8f1cb5bdd99ee1662d918b9e401b6a14be92e
Author: umustafi <[email protected]>
AuthorDate: Fri Dec 2 16:54:07 2022 -0800
[GOBBLIN-1748] Add logs to debug multi-hop flows creation, progression, and
cleanup (#3608)
* Add logs to debug multi-hop flows creation, progression, and cleanup
* respond to comments
* fix unit tests
Co-authored-by: Urmi Mustafi <[email protected]>
---
.../apache/gobblin/service/modules/orchestration/DagManager.java | 4 ++++
.../gobblin/service/modules/orchestration/DagManagerUtils.java | 2 +-
.../gobblin/service/modules/spec/JobExecutionPlanDagFactory.java | 8 ++++++++
.../service/modules/spec/JobExecutionPlanDagFactoryTest.java | 7 ++++++-
4 files changed, 19 insertions(+), 2 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 7a20ea945..5df4fb35f 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -950,11 +950,14 @@ public class DagManager extends AbstractIdleService {
synchronized Map<String, Set<DagNode<JobExecutionPlan>>> submitNext(String
dagId) throws IOException {
Dag<JobExecutionPlan> dag = this.dags.get(dagId);
Set<DagNode<JobExecutionPlan>> nextNodes = DagManagerUtils.getNext(dag);
+ List<String> nextJobNames = new ArrayList<>();
//Submit jobs from the dag ready for execution.
for (DagNode<JobExecutionPlan> dagNode : nextNodes) {
submitJob(dagNode);
+ nextJobNames.add(DagManagerUtils.getJobName(dagNode));
}
+ log.info("Submitting next nodes for dagId {}, where next jobs to be
submitted are {}", dagId, nextJobNames);
//Checkpoint the dag state
this.dagStateStore.writeCheckpoint(dag);
@@ -1167,6 +1170,7 @@ public class DagManager extends AbstractIdleService {
* @param dagId
*/
private synchronized void cleanUpDag(String dagId) {
+ log.info("Cleaning up dagId {}", dagId);
// clears flow event after cancelled job to allow resume event status to
be set
this.dags.get(dagId).setFlowEvent(null);
try {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index d6fb0c99f..3dd0fb592 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -88,7 +88,7 @@ public class DagManagerUtils {
* @param dag instance of a {@link Dag}.
* @return a DagId object associated corresponding to the {@link Dag}
instance.
*/
- static DagManager.DagId generateDagId(Dag<JobExecutionPlan> dag) {
+ public static DagManager.DagId generateDagId(Dag<JobExecutionPlan> dag) {
return
generateDagId(dag.getStartNodes().get(0).getValue().getJobSpec().getConfig());
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java
index fede6313d..8af2ee7a8 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java
@@ -32,6 +32,7 @@ import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
/**
@@ -67,11 +68,13 @@ public class JobExecutionPlanDagFactory {
*
* TODO: we likely do not need 2 for loops and we can do this in 1 pass.
*/
+ List<String> jobNames = new ArrayList<>();
for (JobExecutionPlan jobExecutionPlan : jobExecutionPlans) {
String jobName = getJobName(jobExecutionPlan);
if (jobName == null) {
continue;
}
+ jobNames.add(jobName);
Dag.DagNode<JobExecutionPlan> node = jobExecutionPlanMap.get(jobName);
Collection<String> dependencies =
getDependencies(jobExecutionPlan.getJobSpec().getConfig());
for (String dependency : dependencies) {
@@ -80,6 +83,11 @@ public class JobExecutionPlanDagFactory {
}
}
Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList);
+ if (!dagNodeList.isEmpty()) {
+ log.info("Dag plan created with id {} and jobs: {}",
DagManagerUtils.generateDagId(dag), jobNames);
+ } else {
+ log.info("Empty dag plan created for execution plans {}",
jobExecutionPlans);
+ }
return dag;
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
index 0958995a2..7463a2565 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
@@ -86,8 +86,13 @@ public class JobExecutionPlanDagFactoryTest {
//Create a list of JobExecutionPlans
List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
for (JobTemplate jobTemplate: this.jobTemplates) {
+ Config config = jobTemplate.getRawTemplateConfig()
+ .withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef("testFlowName"))
+ .withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef("testFlowGroup"))
+ .withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
ConfigValueFactory.fromAnyRef(System.currentTimeMillis()));
+
String jobSpecUri = Files.getNameWithoutExtension(new
Path(jobTemplate.getUri()).getName());
- jobExecutionPlans.add(new
JobExecutionPlan(JobSpec.builder(jobSpecUri).withConfig(jobTemplate.getRawTemplateConfig()).
+ jobExecutionPlans.add(new
JobExecutionPlan(JobSpec.builder(jobSpecUri).withConfig(config).
withVersion("1").withTemplate(jobTemplate.getUri()).build(),
specExecutor));
}