This is an automated email from the ASF dual-hosted git repository.

aplex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new f54d16c  [GOBBLIN-1509] Announce flow failure on DagManager::addDag 
error (#3357)
f54d16c is described below

commit f54d16ca9397f41c588d7e18cd8e8524ab253718
Author: Kip Kohn <[email protected]>
AuthorDate: Thu Aug 12 11:14:10 2021 -0700

    [GOBBLIN-1509] Announce flow failure on DagManager::addDag error (#3357)
    
    Announce flow failure on DagManager::addDag error
    
    Additionally, migrate Orchestrator overall away from deprecated 
EventSubmitter::getTimingEvent factory method.
    
    Presently, addDag failure leaves the flow marooned in the COMPILED state, 
as the warranted FLOW_FAILED event is never sent. Particularly insidious is 
that scheduled flows with their execution stuck in COMPILED miss their next 
execution, unless flow.allowConcurrentExecutions is set. Thus the scheduled 
flow is stuck in its entirety, not merely a single execution.
    
    One observed cause of addDag failure is when the DagStateStore is backed by 
a replicated DB (e.g. MySqlDagStateStore) that just switched leaders. Cached 
connections in the pool may suddenly point to a read-only follower unable to 
DagStateStore::writeCheckpoint.
---
 .../modules/orchestration/Orchestrator.java        | 41 +++++++++++++---------
 1 file changed, 25 insertions(+), 16 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 9058fb0..b4ea001 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -245,14 +245,13 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
         flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow failed because 
another instance is running and concurrent "
             + "executions are disabled. Set flow.allowConcurrentExecution to 
true in the flow spec to change this behaviour.");
         if (this.eventSubmitter.isPresent()) {
-          
this.eventSubmitter.get().getTimingEvent(TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
+          new TimingEvent(this.eventSubmitter.get(), 
TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
         }
         return;
       }
 
-      TimingEvent flowCompilationTimer = this.eventSubmitter.isPresent()
-          ? 
this.eventSubmitter.get().getTimingEvent(TimingEvent.FlowTimings.FLOW_COMPILED)
-          : null;
+      Optional<TimingEvent> flowCompilationTimer = 
this.eventSubmitter.transform(submitter ->
+          new TimingEvent(submitter, TimingEvent.FlowTimings.FLOW_COMPILED));
 
       Dag<JobExecutionPlan> jobExecutionPlanDag = 
specCompiler.compileFlow(spec);
 
@@ -270,13 +269,13 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
         }
         flowMetadata.put(TimingEvent.METADATA_MESSAGE, message);
 
-        TimingEvent flowCompileFailedTimer = this.eventSubmitter.isPresent() ? 
this.eventSubmitter.get()
-            .getTimingEvent(TimingEvent.FlowTimings.FLOW_COMPILE_FAILED) : 
null;
+        Optional<TimingEvent> flowCompileFailedTimer = 
this.eventSubmitter.transform(submitter ->
+            new TimingEvent(submitter, 
TimingEvent.FlowTimings.FLOW_COMPILE_FAILED));
         Instrumented.markMeter(this.flowOrchestrationFailedMeter);
         
flowGauges.get(spec.getUri().toString()).setState(CompiledState.FAILED);
         _log.warn("Cannot determine an executor to run on for Spec: " + spec);
-        if (flowCompileFailedTimer != null) {
-          flowCompileFailedTimer.stop(flowMetadata);
+        if (flowCompileFailedTimer.isPresent()) {
+          flowCompileFailedTimer.get().stop(flowMetadata);
         }
         return;
       } else {
@@ -288,13 +287,23 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
       
flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
           
jobExecutionPlanDag.getNodes().get(0).getValue().getJobSpec().getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
 
-      if (flowCompilationTimer != null) {
-        flowCompilationTimer.stop(flowMetadata);
+      if (flowCompilationTimer.isPresent()) {
+        flowCompilationTimer.get().stop(flowMetadata);
       }
 
       if (this.dagManager.isPresent()) {
-        //Send the dag to the DagManager.
-        this.dagManager.get().addDag(jobExecutionPlanDag, true, true);
+        try {
+          //Send the dag to the DagManager.
+          this.dagManager.get().addDag(jobExecutionPlanDag, true, true);
+        } catch (Exception ex) {
+          if (this.eventSubmitter.isPresent()) {
+            // pronounce failed before stack unwinds, to ensure flow not 
marooned in `COMPILED` state; (failure likely attributable to DB 
connection/failover)
+            String failureMessage = "Failed to add Job Execution Plan due to: 
" + ex.getMessage();
+            flowMetadata.put(TimingEvent.METADATA_MESSAGE, failureMessage);
+            new TimingEvent(this.eventSubmitter.get(), 
TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
+          }
+          throw ex;
+        }
       } else {
         // Schedule all compiled JobSpecs on their respective Executor
         for (Dag.DagNode<JobExecutionPlan> dagNode : 
jobExecutionPlanDag.getNodes()) {
@@ -314,13 +323,13 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
             Map<String, String> jobMetadata = 
TimingEventUtils.getJobMetadata(flowMetadata, jobExecutionPlan);
             _log.info(String.format("Going to orchestrate JobSpec: %s on 
Executor: %s", jobSpec, producer));
 
-            TimingEvent jobOrchestrationTimer = 
this.eventSubmitter.isPresent() ? this.eventSubmitter.get().
-                getTimingEvent(TimingEvent.LauncherTimings.JOB_ORCHESTRATED) : 
null;
+            Optional<TimingEvent> jobOrchestrationTimer = 
this.eventSubmitter.transform(submitter ->
+                new TimingEvent(submitter, 
TimingEvent.LauncherTimings.JOB_ORCHESTRATED));
 
             producer.addSpec(jobSpec);
 
-            if (jobOrchestrationTimer != null) {
-              jobOrchestrationTimer.stop(jobMetadata);
+            if (jobOrchestrationTimer.isPresent()) {
+              jobOrchestrationTimer.get().stop(jobMetadata);
             }
           } catch (Exception e) {
             _log.error("Cannot successfully setup spec: " + 
jobExecutionPlan.getJobSpec() + " on executor: " + producer

Reply via email to