This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 843391304 [GOBBLIN-2062] Ensure the `Orchestrator` always has
`DagManager` remove `FlowSpec` - even when orchestration has failed (#3944)
843391304 is described below
commit 8433913043e9106e59771e24f7c0876d891bbfed
Author: Kip Kohn <[email protected]>
AuthorDate: Tue Jun 11 18:41:07 2024 -0700
[GOBBLIN-2062] Ensure the `Orchestrator` always has `DagManager` remove
`FlowSpec` - even when orchestration has failed (#3944)
* Ensure the `Orchestrator` always has `DagManager` remove `FlowSpec` -
even when orchestration has failed
* Address several compilation warnings related to lombok
* Ensure `OrchestratorTest` cleans up its `ITestMetastoreDatabase`
* Avoid NPE in `GobblinServiceManagerTest::cleanUp`
* Remove out-dated and confusing comment in `GobblinServiceJobScheduler`
---
gobblin-restli/server.gradle | 1 +
.../service/modules/orchestration/DagManager.java | 14 +-
.../modules/orchestration/LeaseAttemptStatus.java | 5 +
.../modules/orchestration/Orchestrator.java | 56 +++---
.../scheduler/GobblinServiceJobScheduler.java | 1 -
.../utils/FlowCompilationValidationHelper.java | 4 +-
.../gobblin/service/GobblinServiceManagerTest.java | 11 +-
.../modules/orchestration/OrchestratorTest.java | 212 +++++++++++++++------
.../java/org/apache/gobblin/util/PathUtils.java | 4 +-
.../org/apache/gobblin/util/PropertiesUtils.java | 4 +-
10 files changed, 209 insertions(+), 103 deletions(-)
diff --git a/gobblin-restli/server.gradle b/gobblin-restli/server.gradle
index 1f3bb08d6..ebba20cd7 100644
--- a/gobblin-restli/server.gradle
+++ b/gobblin-restli/server.gradle
@@ -45,6 +45,7 @@ dependencies {
}
compile externalDependency.gson
+ compile externalDependency.lombok
compile externalDependency.pegasus.restliServer
compile externalDependency.pegasus.restliCommon
compile externalDependency.pegasus.restliNettyStandalone
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 51a846cb8..5c03b35f6 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -278,18 +278,10 @@ public class DagManager extends AbstractIdleService {
}
/**
- * Method to submit a {@link Dag} to the {@link DagManager} and delete adhoc
flowSpecs from the FlowCatalog after
- * persisting it in the other addDag method called. The DagManager's failure
recovery method ensures the flow will be
- * executed in the event of downtime.
- * @param flowSpec
- * @param dag
- * @param persist
- * @param setStatus
- * @throws IOException
+ * Delete adhoc flowSpecs from the {@link FlowCatalog} after (separately)
persisting via {@link DagManager#addDag(Dag, boolean, boolean)}.
+ * This DagManager's failure recovery mechanisms ensure the flow will be
executed, even in the event of downtime.
*/
- public synchronized void addDagAndRemoveAdhocFlowSpec(FlowSpec flowSpec,
Dag<JobExecutionPlan> dag, boolean persist, boolean setStatus)
- throws IOException {
- addDag(dag, persist, setStatus);
+ public synchronized void removeFlowSpecIfAdhoc(FlowSpec flowSpec) throws
IOException {
// Only the active dagManager should delete the flowSpec
if (isActive) {
deleteSpecFromCatalogIfAdhoc(flowSpec);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java
index 1087c6be4..efa6ee881 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import lombok.AccessLevel;
import lombok.Data;
+import lombok.EqualsAndHashCode;
import lombok.Getter;
@@ -60,6 +61,8 @@ public abstract class LeaseAttemptStatus {
completeLease method from a caller without access to the {@link
MultiActiveLeaseArbiter}.
*/
@Data
+ // avoid - warning: Generating equals/hashCode implementation but without a
call to superclass, even though this class does not extend java.lang.Object
+ @EqualsAndHashCode(callSuper=false)
public static class LeaseObtainedStatus extends LeaseAttemptStatus {
private final DagActionStore.DagAction consensusDagAction;
private final long eventTimeMillis;
@@ -86,6 +89,8 @@ public abstract class LeaseAttemptStatus {
the lease has completed or expired
*/
@Data
+ // avoid - warning: Generating equals/hashCode implementation but without a
call to superclass, even though this class does not extend java.lang.Object
+ @EqualsAndHashCode(callSuper=false)
public static class LeasedToAnotherStatus extends LeaseAttemptStatus {
private final DagActionStore.DagAction consensusDagAction;
private final long eventTimeMillis;
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 86f1d12a9..d82d4f9a7 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -229,33 +229,39 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
_log.info("Multi-active scheduler finished handling trigger event:
[{}, is: {}, triggerEventTimestamp: {}]",
launchDagAction, isReminderEvent ? "reminder" : "original",
triggerTimestampMillis);
} else {
- TimingEvent flowCompilationTimer = new
TimingEvent(this.eventSubmitter, TimingEvent.FlowTimings.FLOW_COMPILED);
- Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata(flowSpec);
- Optional<Dag<JobExecutionPlan>> compiledDagOptional =
-
this.flowCompilationValidationHelper.validateAndHandleConcurrentExecution(flowConfig,
flowSpec, flowGroup,
- flowName, flowMetadata);
-
- if (!compiledDagOptional.isPresent()) {
- Instrumented.markMeter(this.flowOrchestrationFailedMeter);
- return;
- }
- Dag<JobExecutionPlan> compiledDag = compiledDagOptional.get();
- if (compiledDag.isEmpty()) {
-
FlowCompilationValidationHelper.populateFlowCompilationFailedEventMessage(eventSubmitter,
flowSpec, flowMetadata);
- Instrumented.markMeter(this.flowOrchestrationFailedMeter);
+ try {
+ TimingEvent flowCompilationTimer = new
TimingEvent(this.eventSubmitter, TimingEvent.FlowTimings.FLOW_COMPILED);
+ Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata(flowSpec);
+ Optional<Dag<JobExecutionPlan>> compiledDagOptional =
+
this.flowCompilationValidationHelper.validateAndHandleConcurrentExecution(flowConfig,
flowSpec, flowGroup,
+ flowName, flowMetadata);
+
+ if (!compiledDagOptional.isPresent()) {
+ Instrumented.markMeter(this.flowOrchestrationFailedMeter);
+ return;
+ }
+ Dag<JobExecutionPlan> compiledDag = compiledDagOptional.get();
+ if (compiledDag.isEmpty()) {
+
FlowCompilationValidationHelper.populateFlowCompilationFailedEventMessage(eventSubmitter,
flowSpec,
+ flowMetadata);
+ Instrumented.markMeter(this.flowOrchestrationFailedMeter);
+
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
+ SharedFlowMetricsSingleton.CompiledState.FAILED);
+ _log.warn("Cannot determine an executor to run on for Spec: " +
spec);
+ return;
+ }
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
- SharedFlowMetricsSingleton.CompiledState.FAILED);
- _log.warn("Cannot determine an executor to run on for Spec: " +
spec);
- return;
- }
- sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
- SharedFlowMetricsSingleton.CompiledState.SUCCESSFUL);
+ SharedFlowMetricsSingleton.CompiledState.SUCCESSFUL);
-
FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata,
compiledDag);
- flowCompilationTimer.stop(flowMetadata);
+
FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata,
compiledDag);
+ flowCompilationTimer.stop(flowMetadata);
- // Depending on if DagManager is present, handle execution
- submitFlowToDagManager(flowSpec, compiledDag);
+ // Depending on if DagManager is present, handle execution
+ submitFlowToDagManager(flowSpec, compiledDag);
+ } finally {
+ // remove from the flow catalog, regardless of whether the flow was
successfully validated and permitted to exec (concurrently)
+ this.dagManager.removeFlowSpecIfAdhoc(flowSpec);
+ }
}
} else {
Instrumented.markMeter(this.flowOrchestrationFailedMeter);
@@ -283,7 +289,7 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
Note that the responsibility of the multi-active scheduler mode ends
after this method is completed AND the
consumption of a launch type event is committed to the consumer.
*/
- this.dagManager.addDagAndRemoveAdhocFlowSpec(flowSpec,
jobExecutionPlanDag, true, true);
+ this.dagManager.addDag(jobExecutionPlanDag, true, true);
} catch (Exception ex) {
String failureMessage = "Failed to add Job Execution Plan due to: " +
ex.getMessage();
_log.warn("Orchestrator call - " + failureMessage, ex);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index 4b47264f4..e8cc6ccb0 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -825,7 +825,6 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
}
}
}
- // Note that we only remove the spec from the flow catalog after it
is orchestrated
GobblinServiceJobScheduler.this.scheduledFlowSpecs.remove(specUri.toString());
GobblinServiceJobScheduler.this.lastUpdatedTimeForFlowSpec.remove(specUri.toString());
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
index af90d5b7b..7ddfdb3ba 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
@@ -145,7 +145,7 @@ public class FlowCompilationValidationHelper {
}
addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
- if (isExecutionPermitted(flowStatusGenerator, flowName, flowGroup,
allowConcurrentExecution,
+ if (isExecutionPermitted(flowStatusGenerator, flowGroup, flowName,
allowConcurrentExecution,
Long.parseLong(flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD))))
{
return Optional.fromNullable(jobExecutionPlanDag);
} else {
@@ -176,7 +176,7 @@ public class FlowCompilationValidationHelper {
* @param allowConcurrentExecution
* @return true if the {@link FlowSpec} allows concurrent executions or if
no other instance of the flow is currently RUNNING.
*/
- private boolean isExecutionPermitted(FlowStatusGenerator
flowStatusGenerator, String flowName, String flowGroup,
+ private boolean isExecutionPermitted(FlowStatusGenerator
flowStatusGenerator, String flowGroup, String flowName,
boolean allowConcurrentExecution, long flowExecutionId) {
return allowConcurrentExecution ||
!flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId);
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
index 63327b291..3d21c9c00 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
@@ -231,6 +231,8 @@ public class GobblinServiceManagerTest {
DagManager spiedDagManager = spy(gobblinServiceManager.getDagManager());
doNothing().when(spiedDagManager).setActive(anyBoolean());
+ // WARNING: this `spiedDagManager` WILL NOT BE the one used by the
`Orchestrator`: its DM has apparently already been
+ // provided to the `Orchestrator` ctor, prior to this replacement here of
`GobblinServiceManager.dagManager`
gobblinServiceManager.dagManager = spiedDagManager;
return gobblinServiceManager;
}
@@ -276,7 +278,10 @@ public class GobblinServiceManagerTest {
}
mysql.stop();
- testMetastoreDatabase.close();
+
+ if (testMetastoreDatabase != null) {
+ testMetastoreDatabase.close();
+ }
}
/**
@@ -367,7 +372,7 @@ public class GobblinServiceManagerTest {
this.flowConfigClient.createFlowConfig(flowConfig);
- // FlowSpec still remains it is only deleted by inactiveDagManager
+ // FlowSpec still remains: it is only deleted by activeDagManager
AssertWithBackoff.create().maxSleepMs(200L).timeoutMs(4000L).backoffFactor(1)
.assertTrue(input ->
this.gobblinServiceManager.getFlowCatalog().getSpecs().size() == 1,
"Waiting for job to get orchestrated...");
@@ -822,7 +827,7 @@ public class GobblinServiceManagerTest {
/*
Creates a unique flowId for a given group using the current timestamp as the
flowName.
*/
- public FlowId createFlowIdWithUniqueName(String groupName) {
+ public static FlowId createFlowIdWithUniqueName(String groupName) {
return new
FlowId().setFlowGroup(groupName).setFlowName(String.valueOf(System.currentTimeMillis()));
}
}
\ No newline at end of file
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
index 86d6ff104..dc2a1b40f 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
@@ -26,11 +26,14 @@ import java.util.Properties;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.Path;
+import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import com.codahale.metrics.MetricRegistry;
@@ -53,6 +56,8 @@ import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.GobblinServiceManagerTest;
import org.apache.gobblin.service.modules.flow.IdentityFlowToJobSpecCompiler;
import
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
@@ -60,8 +65,7 @@ import
org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PathUtils;
-import static org.mockito.ArgumentMatchers.anyMap;
-import static org.mockito.Mockito.doNothing;
+import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.mock;
@@ -72,6 +76,7 @@ public class OrchestratorTest {
private static final String SPEC_STORE_PARENT_DIR = "/tmp/orchestrator/";
private static final String SPEC_DESCRIPTION = "Test Orchestrator";
private static final String SPEC_VERSION = FlowSpec.Builder.DEFAULT_VERSION;
+ private static final String TEST_FLOW_GROUP_NAME = "myTestFlowGroup";
private static final String TOPOLOGY_SPEC_STORE_DIR =
"/tmp/orchestrator/topologyTestSpecStore";
private static final String FLOW_SPEC_STORE_DIR =
"/tmp/orchestrator/flowTestSpecStore";
private static final String FLOW_SPEC_GROUP_DIR =
"/tmp/orchestrator/flowTestSpecStore/flowTestGroupDir";
@@ -82,14 +87,23 @@ public class OrchestratorTest {
private FlowCatalog flowCatalog;
private FlowSpec flowSpec;
- private Orchestrator orchestrator;
+
+ private ITestMetastoreDatabase testMetastoreDatabase;
+ private FlowStatusGenerator mockFlowStatusGenerator;
+ private DagManager mockDagManager;
+ private Orchestrator dagMgrNotFlowLaunchHandlerBasedOrchestrator;
private static final String TEST_USER = "testUser";
private static final String TEST_PASSWORD = "testPassword";
private static final String TEST_TABLE = "quotas";
- private static ITestMetastoreDatabase testDb;
+
@BeforeClass
- public void setup() throws Exception {
- testDb = TestMetastoreDatabaseFactory.get();
+ public void setUpClass() throws Exception {
+ // PERF: when within `@{Before,After}Class` the 7 current tests take only
33s; when `@{Before,After}Method` `.get()`s a per-test DB, the same take 1m49s!
+ this.testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+ }
+
+ @BeforeMethod
+ public void setUp() throws Exception {
cleanUpDir(TOPOLOGY_SPEC_STORE_DIR);
cleanUpDir(FLOW_SPEC_STORE_DIR);
@@ -112,14 +126,14 @@ public class OrchestratorTest {
Optional.of(logger), Optional.<MetricContext>absent(), true, true);
this.serviceLauncher.addService(flowCatalog);
- FlowStatusGenerator mockStatusGenerator = mock(FlowStatusGenerator.class);
- FlowLaunchHandler mockFlowTriggerHandler = mock(FlowLaunchHandler.class);
- DagManager mockDagManager = mock(DagManager.class);
- doNothing().when(mockDagManager).setTopologySpecMap(anyMap());
+ this.mockFlowStatusGenerator = mock(FlowStatusGenerator.class);
+ this.mockDagManager = mock(DagManager.class);
+ Mockito.doNothing().when(mockDagManager).setTopologySpecMap(anyMap());
+
Config config = ConfigBuilder.create()
.addPrimitive(MostlyMySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
MostlyMySqlDagManagementStateStoreTest.TestMysqlDagStateStore.class.getName())
-
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY),
testDb.getJdbcUrl())
+
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY),
this.testMetastoreDatabase.getJdbcUrl())
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY),
TEST_USER)
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY),
TEST_PASSWORD)
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
TEST_TABLE).build();
@@ -129,12 +143,13 @@ public class OrchestratorTest {
SharedFlowMetricsSingleton sharedFlowMetricsSingleton = new
SharedFlowMetricsSingleton(ConfigUtils.propertiesToConfig(orchestratorProperties));
- this.orchestrator = new
Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),
- this.topologyCatalog, mockDagManager, Optional.of(logger),
mockStatusGenerator,
- Optional.of(mockFlowTriggerHandler), sharedFlowMetricsSingleton,
Optional.of(mock(FlowCatalog.class)), Optional.of(dagManagementStateStore),
- new FlowCompilationValidationHelper(config,
sharedFlowMetricsSingleton, mock(UserQuotaManager.class), mockStatusGenerator));
- this.topologyCatalog.addListener(orchestrator);
- this.flowCatalog.addListener(orchestrator);
+ FlowCompilationValidationHelper flowCompilationValidationHelper = new
FlowCompilationValidationHelper(config, sharedFlowMetricsSingleton,
mock(UserQuotaManager.class), mockFlowStatusGenerator);
+ this.dagMgrNotFlowLaunchHandlerBasedOrchestrator = new
Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),
+ this.topologyCatalog, mockDagManager, Optional.of(logger),
mockFlowStatusGenerator,
+ Optional.absent(), sharedFlowMetricsSingleton,
Optional.of(mock(FlowCatalog.class)), Optional.of(dagManagementStateStore),
+ flowCompilationValidationHelper);
+
this.topologyCatalog.addListener(dagMgrNotFlowLaunchHandlerBasedOrchestrator);
+ this.flowCatalog.addListener(dagMgrNotFlowLaunchHandlerBasedOrchestrator);
// Start application
this.serviceLauncher.start();
// Create Spec to play with
@@ -142,6 +157,25 @@ public class OrchestratorTest {
this.flowSpec = initFlowSpec();
}
+ @AfterMethod
+ public void tearDown() throws Exception {
+ // Shutdown Catalog
+ this.serviceLauncher.stop();
+
+ File specStoreDir = new File(SPEC_STORE_PARENT_DIR);
+ if (specStoreDir.exists()) {
+ FileUtils.deleteDirectory(specStoreDir);
+ }
+ }
+
+ @AfterClass(alwaysRun = true)
+ public void tearDownClass() throws Exception {
+ if (this.testMetastoreDatabase != null) {
+ // `.close()` to avoid (in the aggregate, across multiple suites) -
java.sql.SQLNonTransientConnectionException: Too many connections
+ this.testMetastoreDatabase.close();
+ }
+ }
+
private void cleanUpDir(String dir) throws Exception {
File specStoreDir = new File(dir);
if (specStoreDir.exists()) {
@@ -221,24 +255,10 @@ public class OrchestratorTest {
return uri;
}
- @AfterClass
- public void cleanUp() throws Exception {
- // Shutdown Catalog
- this.serviceLauncher.stop();
-
- File specStoreDir = new File(SPEC_STORE_PARENT_DIR);
- if (specStoreDir.exists()) {
- FileUtils.deleteDirectory(specStoreDir);
- }
-
- if (testDb != null) {
- testDb.close();
- }
- }
-
+ // TODO: this test doesn't exercise `Orchestrator` and so belongs elsewhere
- move it, then rework into `@BeforeMethod` init (since others depend on this)
@Test
public void createTopologySpec() {
- IdentityFlowToJobSpecCompiler specCompiler =
(IdentityFlowToJobSpecCompiler) this.orchestrator.getSpecCompiler();
+ IdentityFlowToJobSpecCompiler specCompiler =
(IdentityFlowToJobSpecCompiler)
this.dagMgrNotFlowLaunchHandlerBasedOrchestrator.getSpecCompiler();
// List Current Specs
Collection<Spec> specs = topologyCatalog.getSpecs();
@@ -271,11 +291,12 @@ public class OrchestratorTest {
Assert.assertTrue(specCompiler.getTopologySpecMap().size() == 1,
"SpecCompiler should contain 1 Spec after addition");
}
- @Test (dependsOnMethods = "createTopologySpec")
+ @Test
public void createFlowSpec() throws Throwable {
- // Since only 1 Topology with 1 SpecProducer has been added in previous
test
- // .. it should be available and responsible for our new FlowSpec
- IdentityFlowToJobSpecCompiler specCompiler =
(IdentityFlowToJobSpecCompiler) this.orchestrator.getSpecCompiler();
+ // TODO: fix this lingering inter-test dep from when `@BeforeClass` init,
which we've since replaced by `Mockito.verify`-friendly `@BeforeMethod`
+ createTopologySpec(); // make 1 Topology with 1 SpecProducer available and
responsible for our new FlowSpec
+
+ IdentityFlowToJobSpecCompiler specCompiler =
(IdentityFlowToJobSpecCompiler)
this.dagMgrNotFlowLaunchHandlerBasedOrchestrator.getSpecCompiler();
SpecExecutor sei =
specCompiler.getTopologySpecMap().values().iterator().next().getSpecExecutor();
// List Current Specs
@@ -314,10 +335,12 @@ public class OrchestratorTest {
+ "Spec after addition");
}
- @Test (dependsOnMethods = "createFlowSpec")
- public void deleteFlowSpec() throws Exception {
- // Since only 1 Flow has been added in previous test it should be available
- IdentityFlowToJobSpecCompiler specCompiler =
(IdentityFlowToJobSpecCompiler) this.orchestrator.getSpecCompiler();
+ @Test
+ public void deleteFlowSpec() throws Throwable {
+ // TODO: fix this lingering inter-test dep from when `@BeforeClass` init,
which we've since replaced by `Mockito.verify`-friendly `@BeforeMethod`
+ createFlowSpec(); // make 1 Flow available (for deletion herein)
+
+ IdentityFlowToJobSpecCompiler specCompiler =
(IdentityFlowToJobSpecCompiler)
this.dagMgrNotFlowLaunchHandlerBasedOrchestrator.getSpecCompiler();
SpecExecutor sei =
specCompiler.getTopologySpecMap().values().iterator().next().getSpecExecutor();
// List Current Specs
@@ -355,26 +378,101 @@ public class OrchestratorTest {
+ "Spec after deletion");
}
- @Test (dependsOnMethods = "deleteFlowSpec")
- public void doNotRegisterMetricsAdhocFlows() throws Exception {
- MetricContext metricContext =
this.orchestrator.getSharedFlowMetricsSingleton().getMetricContext();
+ @Test
+ public void doNotRegisterMetricsAdhocFlows() throws Throwable {
+ // TODO: fix this lingering inter-test dep from when `@BeforeClass` init,
which we've since replaced by `Mockito.verify`-friendly `@BeforeMethod`
+ createTopologySpec(); // for flow compilation to pass
+
+ FlowId flowId =
GobblinServiceManagerTest.createFlowIdWithUniqueName(TEST_FLOW_GROUP_NAME);
+ MetricContext metricContext =
this.dagMgrNotFlowLaunchHandlerBasedOrchestrator.getSharedFlowMetricsSingleton().getMetricContext();
+ String metricName =
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
flowId.getFlowGroup(), flowId.getFlowName(), ServiceMetricNames.COMPILED);
+
this.topologyCatalog.getInitComplete().countDown(); // unblock
orchestration
+
+ FlowSpec adhocSpec = createBasicFlowSpecForFlowId(flowId);
+ this.dagMgrNotFlowLaunchHandlerBasedOrchestrator.orchestrate(adhocSpec,
new Properties(), 0, false);
+
Assert.assertNull(metricContext.getParent().get().getGauges().get(metricName));
+
+ Properties scheduledProps = new Properties();
+ scheduledProps.setProperty("job.schedule", "0/2 * * * * ?");
+ FlowSpec scheduledSpec = createBasicFlowSpecForFlowId(flowId,
scheduledProps);
+
this.dagMgrNotFlowLaunchHandlerBasedOrchestrator.orchestrate(scheduledSpec, new
Properties(), 0, false);
+
Assert.assertNotNull(metricContext.getParent().get().getGauges().get(metricName));
+ }
+
+ @Test
+ public void removeFlowSpecWhenDagAdded() throws Throwable {
+ // TODO: fix this lingering inter-test dep from when `@BeforeClass` init,
which we've since replaced by `Mockito.verify`-friendly `@BeforeMethod`
+ createTopologySpec(); // for flow compilation to pass
+
+ FlowId flowId =
GobblinServiceManagerTest.createFlowIdWithUniqueName(TEST_FLOW_GROUP_NAME);
+ FlowSpec flowSpec = createBasicFlowSpecForFlowId(flowId);
+ this.topologyCatalog.getInitComplete().countDown(); // unblock
orchestration
+
+ this.dagMgrNotFlowLaunchHandlerBasedOrchestrator.orchestrate(flowSpec, new
Properties(), 0, false);
+
+ Mockito.verify(this.mockDagManager, Mockito.times(1)).addDag(any(),
eq(true), eq(true));
+ Mockito.verify(this.mockDagManager,
Mockito.times(1)).removeFlowSpecIfAdhoc(any());
+ }
+
+ @Test
+ public void removeFlowSpecEvenWhenDagNotAddedDueToCompilationFailure()
throws Throwable {
+ // to cause flow compilation to fail, DO NOT EXECUTE: createTopologySpec();
+
+ FlowId flowId =
GobblinServiceManagerTest.createFlowIdWithUniqueName(TEST_FLOW_GROUP_NAME);
+ FlowSpec flowSpec = createBasicFlowSpecForFlowId(flowId);
+ this.topologyCatalog.getInitComplete().countDown(); // unblock
orchestration
+
+ this.dagMgrNotFlowLaunchHandlerBasedOrchestrator.orchestrate(flowSpec, new
Properties(), 0, false);
+
+ // (verifies that compilation failure precedes enforcement of concurrent
flow executions)
+ Mockito.verify(this.mockFlowStatusGenerator,
Mockito.never()).isFlowRunning(any(), any(), anyLong());
+
+ Mockito.verify(this.mockDagManager, Mockito.never()).addDag(any(),
anyBoolean(), anyBoolean());
+ Mockito.verify(this.mockDagManager,
Mockito.times(1)).removeFlowSpecIfAdhoc(any());
+ }
+
+ @Test
+ public void removeFlowSpecEvenWhenDagNotAddedDueToConcurrentExecution()
throws Throwable {
+ // TODO: fix this lingering inter-test dep from when `@BeforeClass` init,
which we've since replaced by `Mockito.verify`-friendly `@BeforeMethod`
+ createTopologySpec(); // for flow compilation to pass
+
+ FlowId flowId =
GobblinServiceManagerTest.createFlowIdWithUniqueName(TEST_FLOW_GROUP_NAME);
+ Properties noConcurrentExecsProps = new Properties();
+ noConcurrentExecsProps.setProperty("flow.allowConcurrentExecution",
"false");
+ FlowSpec flowSpec = createBasicFlowSpecForFlowId(flowId,
noConcurrentExecsProps);
+ this.topologyCatalog.getInitComplete().countDown(); // unblock
orchestration
+
+
Mockito.when(this.mockFlowStatusGenerator.isFlowRunning(eq(flowId.getFlowName()),
eq(flowId.getFlowGroup()), anyLong())).thenReturn(true);
+
+ this.dagMgrNotFlowLaunchHandlerBasedOrchestrator.orchestrate(flowSpec, new
Properties(), 0, false);
+
+ // (verifies enforcement of concurrent flow executions)
+ Mockito.verify(this.mockFlowStatusGenerator,
Mockito.times(1)).isFlowRunning(eq(flowId.getFlowName()),
eq(flowId.getFlowGroup()), anyLong());
+
+ Mockito.verify(this.mockDagManager, Mockito.never()).addDag(any(),
anyBoolean(), anyBoolean());
+ Mockito.verify(this.mockDagManager,
Mockito.times(1)).removeFlowSpecIfAdhoc(any());
+ }
+
+ public static FlowSpec createBasicFlowSpecForFlowId(FlowId flowId) throws
URISyntaxException {
+ return createBasicFlowSpecForFlowId(flowId, new Properties());
+ }
+
+ public static FlowSpec createBasicFlowSpecForFlowId(FlowId flowId,
Properties moreProps) throws URISyntaxException {
+ URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
Properties flowProps = new Properties();
- flowProps.setProperty(ConfigurationKeys.FLOW_NAME_KEY, "flow0");
- flowProps.setProperty(ConfigurationKeys.FLOW_GROUP_KEY, "group0");
- flowProps.put("specStore.fs.dir", FLOW_SPEC_STORE_DIR);
- flowProps.put("specExecInstance.capabilities", "source:destination");
+ // needed by - Orchestrator::orchestrate
+ flowProps.setProperty(ConfigurationKeys.FLOW_GROUP_KEY,
flowId.getFlowGroup());
+ flowProps.setProperty(ConfigurationKeys.FLOW_NAME_KEY,
flowId.getFlowName());
+ // needed by - IdentityFlowToJobSpecCompiler::compileFlow
flowProps.put("gobblin.flow.sourceIdentifier", "source");
flowProps.put("gobblin.flow.destinationIdentifier", "destination");
- flowProps.put("flow.allowConcurrentExecution", false);
- FlowSpec adhocSpec = new FlowSpec(URI.create("flow0/group0"), "1", "",
ConfigUtils.propertiesToConfig(flowProps) , flowProps, Optional.absent(),
Optional.absent());
- this.orchestrator.orchestrate(adhocSpec, flowProps, 0, false);
- String metricName =
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "group0",
"flow0", ServiceMetricNames.COMPILED);
-
Assert.assertNull(metricContext.getParent().get().getGauges().get(metricName));
+ // needed by - IdentityFlowToJobSpecCompiler::getJobExecutionPlans
+ flowProps.put("specExecInstance.capabilities", "source:destination");
- flowProps.setProperty("job.schedule", "0/2 * * * * ?");
- FlowSpec scheduledSpec = new FlowSpec(URI.create("flow0/group0"), "1", "",
ConfigUtils.propertiesToConfig(flowProps) , flowProps, Optional.absent(),
Optional.absent());
- this.orchestrator.orchestrate(scheduledSpec, flowProps, 0, false);
-
Assert.assertNotNull(metricContext.getParent().get().getGauges().get(metricName));
+ moreProps.entrySet().forEach(entry ->
+ flowProps.put(entry.getKey(), entry.getValue()));
+
+ return new FlowSpec(flowUri, "1", "",
ConfigUtils.propertiesToConfig(flowProps), flowProps, Optional.absent(),
Optional.absent());
}
}
\ No newline at end of file
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/PathUtils.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/PathUtils.java
index 0893915f9..d9d2ac11f 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/PathUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/PathUtils.java
@@ -50,8 +50,8 @@ public class PathUtils {
/**
* Checks whether possibleAncestor is an ancestor of fullPath.
- * @param possibleAncestor Possible ancestor of fullPath.
- * @param fullPath path to check.
+ * @param possibleAncestor Possible ancestor of fullPath, where scheme and
authority are IGNORED
+ * @param fullPath path to check, where scheme and authority are IGNORED
* @return true if possibleAncestor is an ancestor of fullPath.
*/
public static boolean isAncestor(Path possibleAncestor, Path fullPath) {
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
index ab4ad5c9d..eb365a1e3 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
@@ -83,8 +83,8 @@ public class PropertiesUtils {
/** @throws {@link NullPointerException} when `key` not in `properties` */
public static String getRequiredPropRaw(Properties properties, String key,
Optional<String> desc) {
String value = properties.getProperty(key);
- Preconditions.checkNotNull(value, "'" + key + "' must be set" +
desc.transform
- (s -> " (to " + desc + ")").or(""));
+ Preconditions.checkNotNull(value, "'" + key + "' must be set" +
desc.transform((String s) ->
+ " (to " + desc + ")").or(""));
return value;
}