This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 843391304 [GOBBLIN-2062] Ensure the `Orchestrator` always has 
`DagManager` remove `FlowSpec` - even when orchestration has failed (#3944)
843391304 is described below

commit 8433913043e9106e59771e24f7c0876d891bbfed
Author: Kip Kohn <[email protected]>
AuthorDate: Tue Jun 11 18:41:07 2024 -0700

    [GOBBLIN-2062] Ensure the `Orchestrator` always has `DagManager` remove 
`FlowSpec` - even when orchestration has failed (#3944)
    
    * Ensure the `Orchestrator` always has `DagManager` remove `FlowSpec` - 
even when orchestration has failed
    * Address several compilation warnings related to lombok
    * Ensure `OrchestratorTest` cleans up its `ITestMetastoreDatabase`
    * Avoid NPE in `GobblinServiceManagerTest::cleanUp`
    * Remove out-dated and confusing comment in `GobblinServiceJobScheduler`
---
 gobblin-restli/server.gradle                       |   1 +
 .../service/modules/orchestration/DagManager.java  |  14 +-
 .../modules/orchestration/LeaseAttemptStatus.java  |   5 +
 .../modules/orchestration/Orchestrator.java        |  56 +++---
 .../scheduler/GobblinServiceJobScheduler.java      |   1 -
 .../utils/FlowCompilationValidationHelper.java     |   4 +-
 .../gobblin/service/GobblinServiceManagerTest.java |  11 +-
 .../modules/orchestration/OrchestratorTest.java    | 212 +++++++++++++++------
 .../java/org/apache/gobblin/util/PathUtils.java    |   4 +-
 .../org/apache/gobblin/util/PropertiesUtils.java   |   4 +-
 10 files changed, 209 insertions(+), 103 deletions(-)

diff --git a/gobblin-restli/server.gradle b/gobblin-restli/server.gradle
index 1f3bb08d6..ebba20cd7 100644
--- a/gobblin-restli/server.gradle
+++ b/gobblin-restli/server.gradle
@@ -45,6 +45,7 @@ dependencies {
   }
 
   compile externalDependency.gson
+  compile externalDependency.lombok
   compile externalDependency.pegasus.restliServer
   compile externalDependency.pegasus.restliCommon
   compile externalDependency.pegasus.restliNettyStandalone
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 51a846cb8..5c03b35f6 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -278,18 +278,10 @@ public class DagManager extends AbstractIdleService {
   }
 
   /**
-   * Method to submit a {@link Dag} to the {@link DagManager} and delete adhoc 
flowSpecs from the FlowCatalog after
-   * persisting it in the other addDag method called. The DagManager's failure 
recovery method ensures the flow will be
-   * executed in the event of downtime.
-   * @param flowSpec
-   * @param dag
-   * @param persist
-   * @param setStatus
-   * @throws IOException
+   * Delete adhoc flowSpecs from the {@link FlowCatalog} after (separately) 
persisting via {@link DagManager#addDag(Dag, boolean, boolean)}.
+   * This DagManager's failure recovery mechanisms ensure the flow will be 
executed, even in the event of downtime.
    */
-  public synchronized void addDagAndRemoveAdhocFlowSpec(FlowSpec flowSpec, 
Dag<JobExecutionPlan> dag, boolean persist, boolean setStatus)
-      throws IOException {
-    addDag(dag, persist, setStatus);
+  public synchronized void removeFlowSpecIfAdhoc(FlowSpec flowSpec) throws 
IOException {
     // Only the active dagManager should delete the flowSpec
     if (isActive) {
       deleteSpecFromCatalogIfAdhoc(flowSpec);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java
index 1087c6be4..efa6ee881 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 
 import lombok.AccessLevel;
 import lombok.Data;
+import lombok.EqualsAndHashCode;
 import lombok.Getter;
 
 
@@ -60,6 +61,8 @@ public abstract class LeaseAttemptStatus {
    completeLease method from a caller without access to the {@link 
MultiActiveLeaseArbiter}.
   */
   @Data
+  // avoid - warning: Generating equals/hashCode implementation but without a 
call to superclass, even though this class does not extend java.lang.Object
+  @EqualsAndHashCode(callSuper=false)
   public static class LeaseObtainedStatus extends LeaseAttemptStatus {
     private final DagActionStore.DagAction consensusDagAction;
     private final long eventTimeMillis;
@@ -86,6 +89,8 @@ public abstract class LeaseAttemptStatus {
   the lease has completed or expired
    */
   @Data
+  // avoid - warning: Generating equals/hashCode implementation but without a 
call to superclass, even though this class does not extend java.lang.Object
+  @EqualsAndHashCode(callSuper=false)
   public static class LeasedToAnotherStatus extends LeaseAttemptStatus {
     private final DagActionStore.DagAction consensusDagAction;
     private final long eventTimeMillis;
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 86f1d12a9..d82d4f9a7 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -229,33 +229,39 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
         _log.info("Multi-active scheduler finished handling trigger event: 
[{}, is: {}, triggerEventTimestamp: {}]",
             launchDagAction, isReminderEvent ? "reminder" : "original", 
triggerTimestampMillis);
       } else {
-        TimingEvent flowCompilationTimer = new 
TimingEvent(this.eventSubmitter, TimingEvent.FlowTimings.FLOW_COMPILED);
-        Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata(flowSpec);
-        Optional<Dag<JobExecutionPlan>> compiledDagOptional =
-            
this.flowCompilationValidationHelper.validateAndHandleConcurrentExecution(flowConfig,
 flowSpec, flowGroup,
-                flowName, flowMetadata);
-
-        if (!compiledDagOptional.isPresent()) {
-          Instrumented.markMeter(this.flowOrchestrationFailedMeter);
-          return;
-        }
-        Dag<JobExecutionPlan> compiledDag = compiledDagOptional.get();
-        if (compiledDag.isEmpty()) {
-          
FlowCompilationValidationHelper.populateFlowCompilationFailedEventMessage(eventSubmitter,
 flowSpec, flowMetadata);
-          Instrumented.markMeter(this.flowOrchestrationFailedMeter);
+        try {
+          TimingEvent flowCompilationTimer = new 
TimingEvent(this.eventSubmitter, TimingEvent.FlowTimings.FLOW_COMPILED);
+          Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata(flowSpec);
+          Optional<Dag<JobExecutionPlan>> compiledDagOptional =
+              
this.flowCompilationValidationHelper.validateAndHandleConcurrentExecution(flowConfig,
 flowSpec, flowGroup,
+                  flowName, flowMetadata);
+
+          if (!compiledDagOptional.isPresent()) {
+            Instrumented.markMeter(this.flowOrchestrationFailedMeter);
+            return;
+          }
+          Dag<JobExecutionPlan> compiledDag = compiledDagOptional.get();
+          if (compiledDag.isEmpty()) {
+            
FlowCompilationValidationHelper.populateFlowCompilationFailedEventMessage(eventSubmitter,
 flowSpec,
+                flowMetadata);
+            Instrumented.markMeter(this.flowOrchestrationFailedMeter);
+            
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
+                SharedFlowMetricsSingleton.CompiledState.FAILED);
+            _log.warn("Cannot determine an executor to run on for Spec: " + 
spec);
+            return;
+          }
           
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
-              SharedFlowMetricsSingleton.CompiledState.FAILED);
-          _log.warn("Cannot determine an executor to run on for Spec: " + 
spec);
-          return;
-        }
-        sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
-            SharedFlowMetricsSingleton.CompiledState.SUCCESSFUL);
+              SharedFlowMetricsSingleton.CompiledState.SUCCESSFUL);
 
-        
FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata, 
compiledDag);
-        flowCompilationTimer.stop(flowMetadata);
+          
FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata, 
compiledDag);
+          flowCompilationTimer.stop(flowMetadata);
 
-        // Depending on if DagManager is present, handle execution
-        submitFlowToDagManager(flowSpec, compiledDag);
+          // Depending on if DagManager is present, handle execution
+          submitFlowToDagManager(flowSpec, compiledDag);
+        } finally {
+          // remove from the flow catalog, regardless of whether the flow was 
successfully validated and permitted to exec (concurrently)
+          this.dagManager.removeFlowSpecIfAdhoc(flowSpec);
+        }
       }
     } else {
       Instrumented.markMeter(this.flowOrchestrationFailedMeter);
@@ -283,7 +289,7 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
       Note that the responsibility of the multi-active scheduler mode ends 
after this method is completed AND the
       consumption of a launch type event is committed to the consumer.
        */
-      this.dagManager.addDagAndRemoveAdhocFlowSpec(flowSpec, 
jobExecutionPlanDag, true, true);
+      this.dagManager.addDag(jobExecutionPlanDag, true, true);
     } catch (Exception ex) {
       String failureMessage = "Failed to add Job Execution Plan due to: " + 
ex.getMessage();
       _log.warn("Orchestrator call - " + failureMessage, ex);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index 4b47264f4..e8cc6ccb0 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -825,7 +825,6 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
               }
             }
           }
-          // Note that we only remove the spec from the flow catalog after it 
is orchestrated
           
GobblinServiceJobScheduler.this.scheduledFlowSpecs.remove(specUri.toString());
           
GobblinServiceJobScheduler.this.lastUpdatedTimeForFlowSpec.remove(specUri.toString());
         }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
index af90d5b7b..7ddfdb3ba 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
@@ -145,7 +145,7 @@ public class FlowCompilationValidationHelper {
     }
     addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
 
-    if (isExecutionPermitted(flowStatusGenerator, flowName, flowGroup, 
allowConcurrentExecution,
+    if (isExecutionPermitted(flowStatusGenerator, flowGroup, flowName, 
allowConcurrentExecution,
         
Long.parseLong(flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD))))
 {
       return Optional.fromNullable(jobExecutionPlanDag);
     } else {
@@ -176,7 +176,7 @@ public class FlowCompilationValidationHelper {
    * @param allowConcurrentExecution
    * @return true if the {@link FlowSpec} allows concurrent executions or if 
no other instance of the flow is currently RUNNING.
    */
-  private boolean isExecutionPermitted(FlowStatusGenerator 
flowStatusGenerator, String flowName, String flowGroup,
+  private boolean isExecutionPermitted(FlowStatusGenerator 
flowStatusGenerator, String flowGroup, String flowName,
       boolean allowConcurrentExecution, long flowExecutionId) {
     return allowConcurrentExecution || 
!flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId);
   }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
index 63327b291..3d21c9c00 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
@@ -231,6 +231,8 @@ public class GobblinServiceManagerTest {
 
     DagManager spiedDagManager = spy(gobblinServiceManager.getDagManager());
     doNothing().when(spiedDagManager).setActive(anyBoolean());
+    // WARNING: this `spiedDagManager` WILL NOT BE the one used by the 
`Orchestrator`: its DM has apparently already been
+    // provided to the `Orchestrator` ctor, prior to this replacement here of 
`GobblinServiceManager.dagManager`
     gobblinServiceManager.dagManager = spiedDagManager;
     return gobblinServiceManager;
   }
@@ -276,7 +278,10 @@ public class GobblinServiceManagerTest {
     }
 
     mysql.stop();
-    testMetastoreDatabase.close();
+
+    if (testMetastoreDatabase != null) {
+      testMetastoreDatabase.close();
+    }
   }
 
   /**
@@ -367,7 +372,7 @@ public class GobblinServiceManagerTest {
 
     this.flowConfigClient.createFlowConfig(flowConfig);
 
-    // FlowSpec still remains it is only deleted by inactiveDagManager
+    // FlowSpec still remains: it is only deleted by activeDagManager
     
AssertWithBackoff.create().maxSleepMs(200L).timeoutMs(4000L).backoffFactor(1)
         .assertTrue(input -> 
this.gobblinServiceManager.getFlowCatalog().getSpecs().size() == 1,
             "Waiting for job to get orchestrated...");
@@ -822,7 +827,7 @@ public class GobblinServiceManagerTest {
   /*
   Creates a unique flowId for a given group using the current timestamp as the 
flowName.
    */
-  public FlowId createFlowIdWithUniqueName(String groupName) {
+  public static FlowId createFlowIdWithUniqueName(String groupName) {
     return new 
FlowId().setFlowGroup(groupName).setFlowName(String.valueOf(System.currentTimeMillis()));
   }
 }
\ No newline at end of file
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
index 86d6ff104..dc2a1b40f 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
@@ -26,11 +26,14 @@ import java.util.Properties;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.fs.Path;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import com.codahale.metrics.MetricRegistry;
@@ -53,6 +56,8 @@ import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
 import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.GobblinServiceManagerTest;
 import org.apache.gobblin.service.modules.flow.IdentityFlowToJobSpecCompiler;
 import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
 import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
@@ -60,8 +65,7 @@ import 
org.apache.gobblin.service.monitoring.FlowStatusGenerator;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.PathUtils;
 
-import static org.mockito.ArgumentMatchers.anyMap;
-import static org.mockito.Mockito.doNothing;
+import static org.mockito.ArgumentMatchers.*;
 import static org.mockito.Mockito.mock;
 
 
@@ -72,6 +76,7 @@ public class OrchestratorTest {
   private static final String SPEC_STORE_PARENT_DIR = "/tmp/orchestrator/";
   private static final String SPEC_DESCRIPTION = "Test Orchestrator";
   private static final String SPEC_VERSION = FlowSpec.Builder.DEFAULT_VERSION;
+  private static final String TEST_FLOW_GROUP_NAME = "myTestFlowGroup";
   private static final String TOPOLOGY_SPEC_STORE_DIR = 
"/tmp/orchestrator/topologyTestSpecStore";
   private static final String FLOW_SPEC_STORE_DIR = 
"/tmp/orchestrator/flowTestSpecStore";
   private static final String FLOW_SPEC_GROUP_DIR = 
"/tmp/orchestrator/flowTestSpecStore/flowTestGroupDir";
@@ -82,14 +87,23 @@ public class OrchestratorTest {
 
   private FlowCatalog flowCatalog;
   private FlowSpec flowSpec;
-  private Orchestrator orchestrator;
+
+  private ITestMetastoreDatabase testMetastoreDatabase;
+  private FlowStatusGenerator mockFlowStatusGenerator;
+  private DagManager mockDagManager;
+  private Orchestrator dagMgrNotFlowLaunchHandlerBasedOrchestrator;
   private static final String TEST_USER = "testUser";
   private static final String TEST_PASSWORD = "testPassword";
   private static final String TEST_TABLE = "quotas";
-  private static ITestMetastoreDatabase testDb;
+
   @BeforeClass
-  public void setup() throws Exception {
-    testDb = TestMetastoreDatabaseFactory.get();
+  public void setUpClass() throws Exception {
+    // PERF: when within `@{Before,After}Class` the 7 current tests take only 
33s; when `@{Before,After}Method` `.get()`s a per-test DB, the same take 1m49s!
+    this.testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+  }
+
+  @BeforeMethod
+  public void setUp() throws Exception {
     cleanUpDir(TOPOLOGY_SPEC_STORE_DIR);
     cleanUpDir(FLOW_SPEC_STORE_DIR);
 
@@ -112,14 +126,14 @@ public class OrchestratorTest {
         Optional.of(logger), Optional.<MetricContext>absent(), true, true);
 
     this.serviceLauncher.addService(flowCatalog);
-    FlowStatusGenerator mockStatusGenerator = mock(FlowStatusGenerator.class);
-    FlowLaunchHandler mockFlowTriggerHandler = mock(FlowLaunchHandler.class);
-    DagManager mockDagManager = mock(DagManager.class);
-    doNothing().when(mockDagManager).setTopologySpecMap(anyMap());
+    this.mockFlowStatusGenerator = mock(FlowStatusGenerator.class);
+    this.mockDagManager = mock(DagManager.class);
+    Mockito.doNothing().when(mockDagManager).setTopologySpecMap(anyMap());
+
     Config config = ConfigBuilder.create()
         
.addPrimitive(MostlyMySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
             
MostlyMySqlDagManagementStateStoreTest.TestMysqlDagStateStore.class.getName())
-        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY),
 testDb.getJdbcUrl())
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY),
 this.testMetastoreDatabase.getJdbcUrl())
         
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY),
 TEST_USER)
         
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY),
 TEST_PASSWORD)
         
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
 TEST_TABLE).build();
@@ -129,12 +143,13 @@ public class OrchestratorTest {
 
     SharedFlowMetricsSingleton sharedFlowMetricsSingleton = new 
SharedFlowMetricsSingleton(ConfigUtils.propertiesToConfig(orchestratorProperties));
 
-    this.orchestrator = new 
Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),
-        this.topologyCatalog, mockDagManager, Optional.of(logger), 
mockStatusGenerator,
-        Optional.of(mockFlowTriggerHandler), sharedFlowMetricsSingleton, 
Optional.of(mock(FlowCatalog.class)), Optional.of(dagManagementStateStore),
-        new FlowCompilationValidationHelper(config, 
sharedFlowMetricsSingleton, mock(UserQuotaManager.class), mockStatusGenerator));
-    this.topologyCatalog.addListener(orchestrator);
-    this.flowCatalog.addListener(orchestrator);
+    FlowCompilationValidationHelper flowCompilationValidationHelper = new 
FlowCompilationValidationHelper(config, sharedFlowMetricsSingleton, 
mock(UserQuotaManager.class), mockFlowStatusGenerator);
+    this.dagMgrNotFlowLaunchHandlerBasedOrchestrator = new 
Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),
+        this.topologyCatalog, mockDagManager, Optional.of(logger), 
mockFlowStatusGenerator,
+        Optional.absent(), sharedFlowMetricsSingleton, 
Optional.of(mock(FlowCatalog.class)), Optional.of(dagManagementStateStore),
+        flowCompilationValidationHelper);
+    
this.topologyCatalog.addListener(dagMgrNotFlowLaunchHandlerBasedOrchestrator);
+    this.flowCatalog.addListener(dagMgrNotFlowLaunchHandlerBasedOrchestrator);
     // Start application
     this.serviceLauncher.start();
     // Create Spec to play with
@@ -142,6 +157,25 @@ public class OrchestratorTest {
     this.flowSpec = initFlowSpec();
   }
 
+  @AfterMethod
+  public void tearDown() throws Exception {
+    // Shutdown Catalog
+    this.serviceLauncher.stop();
+
+    File specStoreDir = new File(SPEC_STORE_PARENT_DIR);
+    if (specStoreDir.exists()) {
+      FileUtils.deleteDirectory(specStoreDir);
+    }
+  }
+
+  @AfterClass(alwaysRun = true)
+  public void tearDownClass() throws Exception {
+    if (this.testMetastoreDatabase != null) {
+      // `.close()` to avoid (in the aggregate, across multiple suites) - 
java.sql.SQLNonTransientConnectionException: Too many connections
+      this.testMetastoreDatabase.close();
+    }
+  }
+
   private void cleanUpDir(String dir) throws Exception {
     File specStoreDir = new File(dir);
     if (specStoreDir.exists()) {
@@ -221,24 +255,10 @@ public class OrchestratorTest {
     return uri;
   }
 
-  @AfterClass
-  public void cleanUp() throws Exception {
-    // Shutdown Catalog
-    this.serviceLauncher.stop();
-
-    File specStoreDir = new File(SPEC_STORE_PARENT_DIR);
-    if (specStoreDir.exists()) {
-      FileUtils.deleteDirectory(specStoreDir);
-    }
-
-    if (testDb != null) {
-      testDb.close();
-    }
-  }
-
+  // TODO: this test doesn't exercise `Orchestrator` and so belongs elsewhere 
- move it, then rework into `@BeforeMethod` init (since others depend on this)
   @Test
   public void createTopologySpec() {
-    IdentityFlowToJobSpecCompiler specCompiler = 
(IdentityFlowToJobSpecCompiler) this.orchestrator.getSpecCompiler();
+    IdentityFlowToJobSpecCompiler specCompiler = 
(IdentityFlowToJobSpecCompiler) 
this.dagMgrNotFlowLaunchHandlerBasedOrchestrator.getSpecCompiler();
 
     // List Current Specs
     Collection<Spec> specs = topologyCatalog.getSpecs();
@@ -271,11 +291,12 @@ public class OrchestratorTest {
     Assert.assertTrue(specCompiler.getTopologySpecMap().size() == 1, 
"SpecCompiler should contain 1 Spec after addition");
   }
 
-  @Test (dependsOnMethods = "createTopologySpec")
+  @Test
   public void createFlowSpec() throws Throwable {
-    // Since only 1 Topology with 1 SpecProducer has been added in previous 
test
-    // .. it should be available and responsible for our new FlowSpec
-    IdentityFlowToJobSpecCompiler specCompiler = 
(IdentityFlowToJobSpecCompiler) this.orchestrator.getSpecCompiler();
+    // TODO: fix this lingering inter-test dep from when `@BeforeClass` init, 
which we've since replaced by `Mockito.verify`-friendly `@BeforeMethod`
+    createTopologySpec(); // make 1 Topology with 1 SpecProducer available and 
responsible for our new FlowSpec
+
+    IdentityFlowToJobSpecCompiler specCompiler = 
(IdentityFlowToJobSpecCompiler) 
this.dagMgrNotFlowLaunchHandlerBasedOrchestrator.getSpecCompiler();
     SpecExecutor sei = 
specCompiler.getTopologySpecMap().values().iterator().next().getSpecExecutor();
 
     // List Current Specs
@@ -314,10 +335,12 @@ public class OrchestratorTest {
         + "Spec after addition");
   }
 
-  @Test (dependsOnMethods = "createFlowSpec")
-  public void deleteFlowSpec() throws Exception {
-    // Since only 1 Flow has been added in previous test it should be available
-    IdentityFlowToJobSpecCompiler specCompiler = 
(IdentityFlowToJobSpecCompiler) this.orchestrator.getSpecCompiler();
+  @Test
+  public void deleteFlowSpec() throws Throwable {
+    // TODO: fix this lingering inter-test dep from when `@BeforeClass` init, 
which we've since replaced by `Mockito.verify`-friendly `@BeforeMethod`
+    createFlowSpec(); // make 1 Flow available (for deletion herein)
+
+    IdentityFlowToJobSpecCompiler specCompiler = 
(IdentityFlowToJobSpecCompiler) 
this.dagMgrNotFlowLaunchHandlerBasedOrchestrator.getSpecCompiler();
     SpecExecutor sei = 
specCompiler.getTopologySpecMap().values().iterator().next().getSpecExecutor();
 
     // List Current Specs
@@ -355,26 +378,101 @@ public class OrchestratorTest {
         + "Spec after deletion");
   }
 
-  @Test (dependsOnMethods = "deleteFlowSpec")
-  public void doNotRegisterMetricsAdhocFlows() throws Exception {
-    MetricContext metricContext = 
this.orchestrator.getSharedFlowMetricsSingleton().getMetricContext();
+  @Test
+  public void doNotRegisterMetricsAdhocFlows() throws Throwable {
+    // TODO: fix this lingering inter-test dep from when `@BeforeClass` init, 
which we've since replaced by `Mockito.verify`-friendly `@BeforeMethod`
+    createTopologySpec(); // for flow compilation to pass
+
+    FlowId flowId = 
GobblinServiceManagerTest.createFlowIdWithUniqueName(TEST_FLOW_GROUP_NAME);
+    MetricContext metricContext = 
this.dagMgrNotFlowLaunchHandlerBasedOrchestrator.getSharedFlowMetricsSingleton().getMetricContext();
+    String metricName = 
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, 
flowId.getFlowGroup(), flowId.getFlowName(), ServiceMetricNames.COMPILED);
+
     this.topologyCatalog.getInitComplete().countDown(); // unblock 
orchestration
+
+    FlowSpec adhocSpec = createBasicFlowSpecForFlowId(flowId);
+    this.dagMgrNotFlowLaunchHandlerBasedOrchestrator.orchestrate(adhocSpec, 
new Properties(), 0, false);
+    
Assert.assertNull(metricContext.getParent().get().getGauges().get(metricName));
+
+    Properties scheduledProps = new Properties();
+    scheduledProps.setProperty("job.schedule", "0/2 * * * * ?");
+    FlowSpec scheduledSpec = createBasicFlowSpecForFlowId(flowId, 
scheduledProps);
+    
this.dagMgrNotFlowLaunchHandlerBasedOrchestrator.orchestrate(scheduledSpec, new 
Properties(), 0, false);
+    
Assert.assertNotNull(metricContext.getParent().get().getGauges().get(metricName));
+  }
+
+  @Test
+  public void removeFlowSpecWhenDagAdded() throws Throwable {
+    // TODO: fix this lingering inter-test dep from when `@BeforeClass` init, 
which we've since replaced by `Mockito.verify`-friendly `@BeforeMethod`
+    createTopologySpec(); // for flow compilation to pass
+
+    FlowId flowId = 
GobblinServiceManagerTest.createFlowIdWithUniqueName(TEST_FLOW_GROUP_NAME);
+    FlowSpec flowSpec = createBasicFlowSpecForFlowId(flowId);
+    this.topologyCatalog.getInitComplete().countDown(); // unblock 
orchestration
+
+    this.dagMgrNotFlowLaunchHandlerBasedOrchestrator.orchestrate(flowSpec, new 
Properties(), 0, false);
+
+    Mockito.verify(this.mockDagManager, Mockito.times(1)).addDag(any(), 
eq(true), eq(true));
+    Mockito.verify(this.mockDagManager, 
Mockito.times(1)).removeFlowSpecIfAdhoc(any());
+  }
+
+  @Test
+  public void removeFlowSpecEvenWhenDagNotAddedDueToCompilationFailure() 
throws Throwable {
+    // to cause flow compilation to fail, DO NOT EXECUTE: createTopologySpec();
+
+    FlowId flowId = 
GobblinServiceManagerTest.createFlowIdWithUniqueName(TEST_FLOW_GROUP_NAME);
+    FlowSpec flowSpec = createBasicFlowSpecForFlowId(flowId);
+    this.topologyCatalog.getInitComplete().countDown(); // unblock 
orchestration
+
+    this.dagMgrNotFlowLaunchHandlerBasedOrchestrator.orchestrate(flowSpec, new 
Properties(), 0, false);
+
+    // (verifies that compilation failure precedes enforcement of concurrent 
flow executions)
+    Mockito.verify(this.mockFlowStatusGenerator, 
Mockito.never()).isFlowRunning(any(), any(), anyLong());
+
+    Mockito.verify(this.mockDagManager, Mockito.never()).addDag(any(), 
anyBoolean(), anyBoolean());
+    Mockito.verify(this.mockDagManager, 
Mockito.times(1)).removeFlowSpecIfAdhoc(any());
+  }
+
+  @Test
+  public void removeFlowSpecEvenWhenDagNotAddedDueToConcurrentExecution() 
throws Throwable {
+    // TODO: fix this lingering inter-test dep from when `@BeforeClass` init, 
which we've since replaced by `Mockito.verify`-friendly `@BeforeMethod`
+    createTopologySpec(); // for flow compilation to pass
+
+    FlowId flowId = 
GobblinServiceManagerTest.createFlowIdWithUniqueName(TEST_FLOW_GROUP_NAME);
+    Properties noConcurrentExecsProps = new Properties();
+    noConcurrentExecsProps.setProperty("flow.allowConcurrentExecution", 
"false");
+    FlowSpec flowSpec = createBasicFlowSpecForFlowId(flowId, 
noConcurrentExecsProps);
+    this.topologyCatalog.getInitComplete().countDown(); // unblock 
orchestration
+
+    
Mockito.when(this.mockFlowStatusGenerator.isFlowRunning(eq(flowId.getFlowName()),
 eq(flowId.getFlowGroup()), anyLong())).thenReturn(true);
+
+    this.dagMgrNotFlowLaunchHandlerBasedOrchestrator.orchestrate(flowSpec, new 
Properties(), 0, false);
+
+    // (verifies enforcement of concurrent flow executions)
+    Mockito.verify(this.mockFlowStatusGenerator, 
Mockito.times(1)).isFlowRunning(eq(flowId.getFlowName()), 
eq(flowId.getFlowGroup()), anyLong());
+
+    Mockito.verify(this.mockDagManager, Mockito.never()).addDag(any(), 
anyBoolean(), anyBoolean());
+    Mockito.verify(this.mockDagManager, 
Mockito.times(1)).removeFlowSpecIfAdhoc(any());
+  }
+
+  public static FlowSpec createBasicFlowSpecForFlowId(FlowId flowId) throws 
URISyntaxException {
+    return createBasicFlowSpecForFlowId(flowId, new Properties());
+  }
+
+  public static FlowSpec createBasicFlowSpecForFlowId(FlowId flowId, 
Properties moreProps) throws URISyntaxException {
+    URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
     Properties flowProps = new Properties();
-    flowProps.setProperty(ConfigurationKeys.FLOW_NAME_KEY, "flow0");
-    flowProps.setProperty(ConfigurationKeys.FLOW_GROUP_KEY, "group0");
-    flowProps.put("specStore.fs.dir", FLOW_SPEC_STORE_DIR);
-    flowProps.put("specExecInstance.capabilities", "source:destination");
+    // needed by - Orchestrator::orchestrate
+    flowProps.setProperty(ConfigurationKeys.FLOW_GROUP_KEY, 
flowId.getFlowGroup());
+    flowProps.setProperty(ConfigurationKeys.FLOW_NAME_KEY, 
flowId.getFlowName());
+    // needed by - IdentityFlowToJobSpecCompiler::compileFlow
     flowProps.put("gobblin.flow.sourceIdentifier", "source");
     flowProps.put("gobblin.flow.destinationIdentifier", "destination");
-    flowProps.put("flow.allowConcurrentExecution", false);
-    FlowSpec adhocSpec = new FlowSpec(URI.create("flow0/group0"), "1", "", 
ConfigUtils.propertiesToConfig(flowProps) , flowProps, Optional.absent(), 
Optional.absent());
-    this.orchestrator.orchestrate(adhocSpec, flowProps, 0, false);
-    String metricName = 
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "group0", 
"flow0", ServiceMetricNames.COMPILED);
-    
Assert.assertNull(metricContext.getParent().get().getGauges().get(metricName));
+    // needed by - IdentityFlowToJobSpecCompiler::getJobExecutionPlans
+    flowProps.put("specExecInstance.capabilities", "source:destination");
 
-    flowProps.setProperty("job.schedule", "0/2 * * * * ?");
-    FlowSpec scheduledSpec = new FlowSpec(URI.create("flow0/group0"), "1", "", 
ConfigUtils.propertiesToConfig(flowProps) , flowProps, Optional.absent(), 
Optional.absent());
-    this.orchestrator.orchestrate(scheduledSpec, flowProps, 0, false);
-    
Assert.assertNotNull(metricContext.getParent().get().getGauges().get(metricName));
+    moreProps.entrySet().forEach(entry ->
+        flowProps.put(entry.getKey(), entry.getValue()));
+
+    return new FlowSpec(flowUri, "1", "", 
ConfigUtils.propertiesToConfig(flowProps), flowProps, Optional.absent(), 
Optional.absent());
   }
 }
\ No newline at end of file
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/PathUtils.java 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/PathUtils.java
index 0893915f9..d9d2ac11f 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/PathUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/PathUtils.java
@@ -50,8 +50,8 @@ public class PathUtils {
 
   /**
    * Checks whether possibleAncestor is an ancestor of fullPath.
-   * @param possibleAncestor Possible ancestor of fullPath.
-   * @param fullPath path to check.
+   * @param possibleAncestor Possible ancestor of fullPath, where scheme and 
authority are IGNORED
+   * @param fullPath path to check, where scheme and authority are IGNORED
    * @return true if possibleAncestor is an ancestor of fullPath.
    */
   public static boolean isAncestor(Path possibleAncestor, Path fullPath) {
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
index ab4ad5c9d..eb365a1e3 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
@@ -83,8 +83,8 @@ public class PropertiesUtils {
   /** @throws {@link NullPointerException} when `key` not in `properties` */
   public static String getRequiredPropRaw(Properties properties, String key, 
Optional<String> desc) {
     String value = properties.getProperty(key);
-    Preconditions.checkNotNull(value, "'" + key + "' must be set" + 
desc.transform
-        (s -> " (to " + desc + ")").or(""));
+    Preconditions.checkNotNull(value, "'" + key + "' must be set" + 
desc.transform((String s) ->
+        " (to " + desc + ")").or(""));
     return value;
   }
 

Reply via email to