This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 587972312 [GOBBLIN-2107] Delete adhoc flowSpecs from flowCatalog bug 
fix (#3996)
587972312 is described below

commit 5879723122ed2be34131dcadbfaeb5acf36ee563
Author: umustafi <[email protected]>
AuthorDate: Wed Jul 10 12:07:16 2024 -0700

    [GOBBLIN-2107] Delete adhoc flowSpecs from flowCatalog bug fix (#3996)
    
    * Delete adhoc flowSpecs from flowCatalog to avoid build up
    
            * if job compilation fails then ensure quota is released and flow 
compilation event sent in all cases
            * fix bug where adhoc flows are not deleted in multi-active 
scheduler case by re-adding deletion to
              DagManager.addDag()
    
    ---------
    
    Co-authored-by: Urmi Mustafi <[email protected]>
---
 .../modules/orchestration/Orchestrator.java        | 39 ++++++++++++-------
 .../utils/FlowCompilationValidationHelper.java     |  5 +++
 .../monitoring/DagActionStoreChangeMonitor.java    |  8 ++--
 .../modules/orchestration/OrchestratorTest.java    | 44 +++++++++++++++++++++-
 4 files changed, 78 insertions(+), 18 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 8ff5e2daf..f53b156b7 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -212,9 +212,10 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
       String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
 
       sharedFlowMetricsSingleton.addFlowGauge(spec, flowConfig, flowGroup, 
flowName);
-
-      // only compile and pass directly to `DagManager` when multi-active NOT 
enabled; otherwise recompilation to occur later,
-      // once `DagActionStoreChangeMonitor` subsequently delegates this 
`DagActionType.LAUNCH`
+      /* Only compile and pass directly to `DagManager` when multi-active 
scheduler NOT enabled; otherwise
+      recompilation to occur later, once `DagActionStoreChangeMonitor` 
subsequently delegates this
+      `DagActionType.LAUNCH`
+       */
       if (flowLaunchHandler.isPresent()) {
         DagActionStore.DagAction launchDagAction = 
DagActionStore.DagAction.forFlow(
             flowGroup,
@@ -230,7 +231,8 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
             launchDagAction, isReminderEvent ? "reminder" : "original", 
triggerTimestampMillis);
       } else {
         try {
-          TimingEvent flowCompilationTimer = new 
TimingEvent(this.eventSubmitter, TimingEvent.FlowTimings.FLOW_COMPILED);
+          TimingEvent flowCompilationTimer =
+              new TimingEvent(this.eventSubmitter, 
TimingEvent.FlowTimings.FLOW_COMPILED);
           Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata(flowSpec);
           Optional<Dag<JobExecutionPlan>> compiledDagOptional =
               
this.flowCompilationValidationHelper.validateAndHandleConcurrentExecution(flowConfig,
 flowSpec, flowGroup,
@@ -259,7 +261,9 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
           // Depending on if DagManager is present, handle execution
           submitFlowToDagManager(flowSpec, compiledDag);
         } finally {
-          // remove from the flow catalog, regardless of whether the flow was 
successfully validated and permitted to exec (concurrently)
+          /* Remove adhoc flow spec from the flow catalog, regardless of 
whether the flow was successfully validated
+          and permitted to exec (concurrently)
+           */
           this.dagManager.removeFlowSpecIfAdhoc(flowSpec);
         }
       }
@@ -271,14 +275,23 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
     Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() - 
startTime, TimeUnit.NANOSECONDS);
   }
 
-  public void submitFlowToDagManager(FlowSpec flowSpec) throws IOException, 
InterruptedException {
-    Optional<Dag<JobExecutionPlan>> optionalJobExecutionPlanDag =
-        
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec);
-    if (optionalJobExecutionPlanDag.isPresent()) {
-      submitFlowToDagManager(flowSpec, optionalJobExecutionPlanDag.get());
-    } else {
-      _log.warn("Flow: {} submitted to dagManager failed to compile and 
produce a job execution plan dag", flowSpec);
-      Instrumented.markMeter(this.flowOrchestrationFailedMeter);
+  /**
+   * Compiles the provided {@link FlowSpec} into a {@link 
Dag<JobExecutionPlan>} and forwards that to the
+   * {@link DagManager} for execution. It's meant to be called by
+   * {@link org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor}
+   */
+  public void compileAndSubmitFlowToDagManager(FlowSpec flowSpec) throws 
IOException, InterruptedException {
+    try {
+      Optional<Dag<JobExecutionPlan>> optionalJobExecutionPlanDag =
+          
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec);
+      if (optionalJobExecutionPlanDag.isPresent()) {
+        submitFlowToDagManager(flowSpec, optionalJobExecutionPlanDag.get());
+      } else {
+        _log.warn("Flow: {} submitted to dagManager failed to compile and 
produce a job execution plan dag", flowSpec);
+        Instrumented.markMeter(this.flowOrchestrationFailedMeter);
+      }
+    } finally {
+      this.dagManager.removeFlowSpecIfAdhoc(flowSpec);
     }
   }
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
index 7ddfdb3ba..553fee271 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
@@ -140,7 +140,12 @@ public class FlowCompilationValidationHelper {
         ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION, 
String.valueOf(this.isFlowConcurrencyEnabled)));
 
     Dag<JobExecutionPlan> jobExecutionPlanDag = 
specCompiler.compileFlow(flowSpec);
+
     if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
+      // Send FLOW_FAILED event
+      flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Unable to compile 
flowSpec to produce non-empty "
+          + "jobExecutionPlanDag.");
+      new TimingEvent(eventSubmitter, 
TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
       return Optional.absent();
     }
     addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index a179811a4..3f2c86771 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -315,14 +315,16 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer<String, DagAc
       change. It's crucial to adopt the consensus flowExecutionId here to 
prevent creating a new one during compilation.
       */
       spec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
dagAction.getFlowExecutionId());
-      this.orchestrator.submitFlowToDagManager(spec);
+      this.orchestrator.compileAndSubmitFlowToDagManager(spec);
     } catch (URISyntaxException e) {
       log.warn("Could not create URI object for flowId {}. Exception {}", 
flowId, e.getMessage());
       launchSubmissionMetricProxy.markFailure();
       return;
     } catch (SpecNotFoundException e) {
-      log.warn("Spec not found for flowId {} due to exception {}", flowId, 
e.getMessage());
-      launchSubmissionMetricProxy.markFailure();
+      log.info("Spec not found for flowId {} due to deletion by active 
dagManager host due to exception {}",
+          flowId, e.getMessage());
+      // TODO: mark this failure if there are other valid cases of this 
exception
+      // launchSubmissionMetricProxy.markFailure();
       return;
     } catch (IOException e) {
       log.warn("Failed to add Job Execution Plan for flowId {} due to 
exception {}", flowId, e.getMessage());
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
index a32a211fc..961eddaa2 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
@@ -18,6 +18,7 @@
 package org.apache.gobblin.service.modules.orchestration;
 
 import java.io.File;
+import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Collection;
@@ -25,6 +26,9 @@ import java.util.List;
 import java.util.Properties;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.hadoop.fs.Path;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
@@ -66,8 +70,7 @@ import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.PathUtils;
 
 import static org.mockito.ArgumentMatchers.*;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.*;
 
 
 public class OrchestratorTest {
@@ -92,7 +95,9 @@ public class OrchestratorTest {
   private ITestMetastoreDatabase testMetastoreDatabase;
   private FlowStatusGenerator mockFlowStatusGenerator;
   private DagManager mockDagManager;
+  private FlowCompilationValidationHelper 
mockedFlowCompilationValidationHelper;
   private Orchestrator dagMgrNotFlowLaunchHandlerBasedOrchestrator;
+  private Orchestrator mockedFlowCompValHelperBasedOrchestrator;
   private static final String TEST_USER = "testUser";
   private static final String TEST_PASSWORD = "testPassword";
   private static final String TEST_TABLE = "quotas";
@@ -141,6 +146,19 @@ public class OrchestratorTest {
         this.topologyCatalog, mockDagManager, Optional.of(logger), 
mockFlowStatusGenerator,
         Optional.absent(), sharedFlowMetricsSingleton, 
Optional.of(mock(FlowCatalog.class)), Optional.of(dagManagementStateStore),
         flowCompilationValidationHelper);
+
+    /* Initialize a second orchestrator with a mocked 
flowCompilationValidationHelper to use Mockito to spoof the dag
+    returned by a call to compile a flowSpec
+    */
+    this.mockedFlowCompilationValidationHelper = 
mock(FlowCompilationValidationHelper.class);
+    
when(mockedFlowCompilationValidationHelper.getSpecCompiler()).thenReturn(mock(SpecCompiler.class));
+    Mockito.doNothing().when(mockDagManager).setTopologySpecMap(anyMap());
+
+    this.mockedFlowCompValHelperBasedOrchestrator = new 
Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),
+        this.topologyCatalog, mockDagManager, Optional.of(logger), 
mockFlowStatusGenerator,
+        Optional.absent(), sharedFlowMetricsSingleton, 
Optional.of(mock(FlowCatalog.class)), Optional.of(dagManagementStateStore),
+        this.mockedFlowCompilationValidationHelper);
+
     
this.topologyCatalog.addListener(dagMgrNotFlowLaunchHandlerBasedOrchestrator);
     this.flowCatalog.addListener(dagMgrNotFlowLaunchHandlerBasedOrchestrator);
     // Start application
@@ -447,6 +465,28 @@ public class OrchestratorTest {
     Mockito.verify(this.mockDagManager, 
Mockito.times(1)).removeFlowSpecIfAdhoc(any());
   }
 
+
+  /**
+   * Tests that when compiling and forwarding a dagAction from
+   * {@link 
org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor#submitFlowToDagManagerHelper}
 to the
+   * DagManager that {@link DagManager#removeFlowSpecIfAdhoc(FlowSpec)} is 
called to ensure adhoc flowSpecs are deleted
+   * after compilation.
+   */
+  @Test
+  public void testDeleteFlowSpecCalledForMultiActivePath()
+      throws IOException, URISyntaxException, InterruptedException {
+    FlowId flowId = 
GobblinServiceManagerTest.createFlowIdWithUniqueName(TEST_FLOW_GROUP_NAME);
+    FlowSpec adhocSpec = createBasicFlowSpecForFlowId(flowId);
+    FlowSpec flowSpec1 = initFlowSpec();
+
+    Optional<Dag<JobExecutionPlan>> dag = Optional.of(
+        DagManagerTest.buildDag("0", 123L, "FINISH_RUNNING", false));
+    
Mockito.when(this.mockedFlowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec1)).thenReturn(dag);
+    Mockito.doNothing().when(mockDagManager).removeFlowSpecIfAdhoc(flowSpec1);
+    
this.mockedFlowCompValHelperBasedOrchestrator.compileAndSubmitFlowToDagManager(flowSpec1);
+    Mockito.verify(this.mockedFlowCompValHelperBasedOrchestrator.dagManager, 
times(1)).removeFlowSpecIfAdhoc(any(FlowSpec.class));
+  }
+
   public static FlowSpec createBasicFlowSpecForFlowId(FlowId flowId) throws 
URISyntaxException {
     return createBasicFlowSpecForFlowId(flowId, new Properties());
   }

Reply via email to