This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new c823e9883 [GOBBLIN-2065] Increase `TestMetastoreDatabaseServer`
concurrent cnxns; plus make closing `ITestMetastoreDatabase` `alwaysRun = true`
and doc reason (#3948)
c823e9883 is described below
commit c823e9883ef464c017a1b2b98362ba61b86446d8
Author: Kip Kohn <[email protected]>
AuthorDate: Thu May 9 10:26:46 2024 -0700
[GOBBLIN-2065] Increase `TestMetastoreDatabaseServer` concurrent cnxns;
plus make closing `ITestMetastoreDatabase` `alwaysRun = true` and doc reason
(#3948)
* Make closing `ITestMetastoreDatabase` more uniform, document reason, and
ensure `alwaysRun = true`
* Increase concurrent cnxns for `TestMetastoreDatabaseServer` to 501
* Make remaining `GobblinServiceHATest` `tearDown` resilient to
`mysql.stop()` failure
---
.../CleanableMysqlDatasetStoreDatasetTest.java | 1 +
.../testing/TestMetastoreDatabaseServer.java | 2 ++
.../runtime/DagActionStoreChangeMonitorTest.java | 6 ++--
.../runtime/MysqlDatasetStateStoreTest.java | 5 +--
.../gobblin/runtime/cli/JobStateStoreCliTest.java | 15 +++++----
.../runtime/job_catalog/TestMysqlJobCatalog.java | 28 +++++++++-------
.../runtime/spec_store/MysqlBaseSpecStoreTest.java | 11 ++++---
.../runtime/spec_store/MysqlSpecStoreTest.java | 11 ++++---
.../spec_store/MysqlSpecStoreWithUpdateTest.java | 9 ++---
.../service/modules/core/GobblinServiceHATest.java | 15 +++++----
.../modules/core/GobblinServiceRedirectTest.java | 9 ++---
.../DagManagementTaskStreamImplTest.java | 17 +++++-----
.../modules/orchestration/DagManagerFlowTest.java | 9 ++---
.../orchestration/DagProcessingEngineTest.java | 19 ++++++-----
.../MostlyMySqlDagManagementStateStoreTest.java | 11 ++++---
.../orchestration/MysqlDagActionStoreTest.java | 8 ++---
.../orchestration/MysqlDagStateStoreTest.java | 1 +
.../MysqlMultiActiveLeaseArbiterTest.java | 15 +++++----
.../orchestration/MysqlUserQuotaManagerTest.java | 18 +++++-----
.../orchestration/proc/KillDagProcTest.java | 11 ++++---
.../orchestration/proc/LaunchDagProcTest.java | 13 ++++----
.../orchestration/proc/ReevaluateDagProcTest.java | 38 ++++++++++++++--------
.../monitoring/MysqlJobStatusRetrieverTest.java | 7 +++-
23 files changed, 162 insertions(+), 117 deletions(-)
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/CleanableMysqlDatasetStoreDatasetTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/CleanableMysqlDatasetStoreDatasetTest.java
index 4a693a16c..8976e7a88 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/CleanableMysqlDatasetStoreDatasetTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/CleanableMysqlDatasetStoreDatasetTest.java
@@ -112,6 +112,7 @@ public class CleanableMysqlDatasetStoreDatasetTest {
@AfterClass(alwaysRun = true)
public void tearDown() throws Exception {
if (testMetastoreDatabase != null) {
+ // `.close()` to avoid (in the aggregate, across multiple suites) -
java.sql.SQLNonTransientConnectionException: Too many connections
testMetastoreDatabase.close();
}
}
diff --git
a/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/testing/TestMetastoreDatabaseServer.java
b/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/testing/TestMetastoreDatabaseServer.java
index ebbd0ad98..5aa164619 100644
---
a/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/testing/TestMetastoreDatabaseServer.java
+++
b/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/testing/TestMetastoreDatabaseServer.java
@@ -96,6 +96,8 @@ class TestMetastoreDatabaseServer implements Closeable {
.withPort(this.dbPort)
.withUser(this.dbUserName, this.dbUserPassword)
.withServerVariable("explicit_defaults_for_timestamp", "off")
+ // default `max_connections` is apparently 151 - see:
https://dev.mysql.com/doc/refman/8.0/en/server-system-variables.html#sysvar_max_connections
+ .withServerVariable("max_connections", "501")
.build();
if (this.embeddedMysqlEnabled) {
testingMySqlServer = EmbeddedMysql.anEmbeddedMysql(config).start();
diff --git
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
index f93836978..54bc6ace5 100644
---
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
+++
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
@@ -17,7 +17,6 @@
package org.apache.gobblin.runtime;
-import java.io.IOException;
import java.net.URI;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -124,8 +123,9 @@ public class DagActionStoreChangeMonitorTest {
this.testDb = TestMetastoreDatabaseFactory.get();
}
- @AfterClass
- public void tearDown() throws IOException {
+ @AfterClass(alwaysRun = true)
+ public void tearDown() throws Exception {
+ // `.close()` to avoid (in the aggregate, across multiple suites) -
java.sql.SQLNonTransientConnectionException: Too many connections
this.testDb.close();
}
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreTest.java
index ce5532fe3..bc81183b0 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreTest.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreTest.java
@@ -328,11 +328,12 @@ public class MysqlDatasetStateStoreTest {
Assert.assertNull(datasetState);
}
- @AfterClass
- public void tearDown() throws IOException {
+ @AfterClass(alwaysRun = true)
+ public void tearDown() throws Exception {
dbJobStateStore.delete(TEST_JOB_NAME);
dbDatasetStateStore.delete(TEST_JOB_NAME);
if (testMetastoreDatabase != null) {
+ // `.close()` to avoid (in the aggregate, across multiple suites) -
java.sql.SQLNonTransientConnectionException: Too many connections
testMetastoreDatabase.close();
}
}
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/cli/JobStateStoreCliTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/cli/JobStateStoreCliTest.java
index da7c3248c..96f659980 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/cli/JobStateStoreCliTest.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/cli/JobStateStoreCliTest.java
@@ -132,6 +132,14 @@ public class JobStateStoreCliTest {
jobState);
}
+ @AfterClass(alwaysRun = true)
+ public void tearDown() throws Exception {
+ if (this.testMetastoreDatabase != null) {
+ // `.close()` to avoid (in the aggregate, across multiple suites) -
java.sql.SQLNonTransientConnectionException: Too many connections
+ this.testMetastoreDatabase.close();
+ }
+ }
+
@Test
public void testClBulkDelete() throws Exception {
String deleteFileText = TEST_JOB_NAME +"\n" + TEST_JOB_NAME2;
@@ -175,11 +183,4 @@ public class JobStateStoreCliTest {
Assert.assertNull(jobState);
}
-
- @AfterClass(alwaysRun = true)
- public void tearDown() throws Exception {
- if (this.testMetastoreDatabase != null) {
- this.testMetastoreDatabase.close();
- }
- }
}
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_catalog/TestMysqlJobCatalog.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_catalog/TestMysqlJobCatalog.java
index a5b9f10e7..ba4a8e11f 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_catalog/TestMysqlJobCatalog.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_catalog/TestMysqlJobCatalog.java
@@ -24,6 +24,7 @@ import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import com.google.common.base.Predicates;
@@ -46,17 +47,21 @@ public class TestMysqlJobCatalog {
private static final String PASSWORD = "testPassword";
private static final String TABLE = "job_catalog";
+ private ITestMetastoreDatabase testDb;
private MysqlJobCatalog cat;
- private static ITestMetastoreDatabase testDb;
+
+ @BeforeClass
+ public void setUpClass() throws Exception {
+ // PERF: when within `@{Before,After}Class` the 2 current tests take only
24s; when `@{Before,After}Method` `.get()`s a per-test DB, the same take 38s
+ this.testDb = TestMetastoreDatabaseFactory.get();
+ }
/** create a new DB/`JobCatalog` for each test, so they're completely
independent */
@BeforeMethod
public void setUp() throws Exception {
- testDb = TestMetastoreDatabaseFactory.get();
-
Config config = ConfigBuilder.create()
.addPrimitive(ConfigurationKeys.METRICS_ENABLED_KEY, "true")
- .addPrimitive(MysqlJobCatalog.DB_CONFIG_PREFIX + "." +
ConfigurationKeys.STATE_STORE_DB_URL_KEY, testDb.getJdbcUrl())
+ .addPrimitive(MysqlJobCatalog.DB_CONFIG_PREFIX + "." +
ConfigurationKeys.STATE_STORE_DB_URL_KEY, this.testDb.getJdbcUrl())
.addPrimitive(MysqlJobCatalog.DB_CONFIG_PREFIX + "." +
ConfigurationKeys.STATE_STORE_DB_USER_KEY, USER)
.addPrimitive(MysqlJobCatalog.DB_CONFIG_PREFIX + "." +
ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, PASSWORD)
.addPrimitive(MysqlJobCatalog.DB_CONFIG_PREFIX + "." +
ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TABLE)
@@ -65,6 +70,14 @@ public class TestMysqlJobCatalog {
this.cat = new MysqlJobCatalog(config);
}
+ @AfterClass(alwaysRun = true)
+ public void tearDownClass() throws Exception {
+ if (this.testDb != null) {
+ // `.close()` to avoid (in the aggregate, across multiple suites) -
java.sql.SQLNonTransientConnectionException: Too many connections
+ this.testDb.close();
+ }
+ }
+
@Test
public void testCallbacks() throws Exception {
cat.startAsync();
@@ -214,11 +227,4 @@ public class TestMysqlJobCatalog {
cat.stopAsync();
cat.awaitTerminated(1, TimeUnit.SECONDS);
}
-
- @AfterClass(alwaysRun = true)
- public void tearDown() throws Exception {
- if (testDb != null) {
- testDb.close();
- }
- }
}
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStoreTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStoreTest.java
index d2f97f8dc..692b2c2b8 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStoreTest.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStoreTest.java
@@ -48,11 +48,11 @@ public class MysqlBaseSpecStoreTest {
private static final String PASSWORD = "testPassword";
private static final String TABLE = "base_spec_store";
+ private ITestMetastoreDatabase testDb;
private MysqlBaseSpecStore specStore;
private final URI uri1 = new URI(new
TopologySpec.Builder().getDefaultTopologyCatalogURI().toString() + "1");
private final URI uri2 = new URI(new
TopologySpec.Builder().getDefaultTopologyCatalogURI().toString() + "2");
private TopologySpec topoSpec1, topoSpec2;
- private static ITestMetastoreDatabase testDb;
public MysqlBaseSpecStoreTest()
throws URISyntaxException { // (based on `uri1` and other
initializations just above)
@@ -60,11 +60,11 @@ public class MysqlBaseSpecStoreTest {
@BeforeClass
public void setUp() throws Exception {
- testDb = TestMetastoreDatabaseFactory.get();
+ this.testDb = TestMetastoreDatabaseFactory.get();
// prefix keys to demonstrate disambiguation mechanism used to side-step
intentially-sabatoged non-prefixed, 'fallback'
Config config = ConfigBuilder.create()
- .addPrimitive(ConfigurationKeys.STATE_STORE_DB_URL_KEY, " SABATOGE! !"
+ testDb.getJdbcUrl())
+ .addPrimitive(ConfigurationKeys.STATE_STORE_DB_URL_KEY, " SABATOGE! !"
+ this.testDb.getJdbcUrl())
.addPrimitive(MysqlBaseSpecStore.CONFIG_PREFIX + "." +
ConfigurationKeys.STATE_STORE_DB_URL_KEY, testDb.getJdbcUrl())
.addPrimitive(MysqlBaseSpecStore.CONFIG_PREFIX + "." +
ConfigurationKeys.STATE_STORE_DB_USER_KEY, USER)
.addPrimitive(MysqlBaseSpecStore.CONFIG_PREFIX + "." +
ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, PASSWORD)
@@ -93,8 +93,9 @@ public class MysqlBaseSpecStoreTest {
@AfterClass(alwaysRun = true)
public void tearDown() throws Exception {
- if (testDb != null) {
- testDb.close();
+ if (this.testDb != null) {
+ // `.close()` to avoid (in the aggregate, across multiple suites) -
java.sql.SQLNonTransientConnectionException: Too many connections
+ this.testDb.close();
}
}
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
index 8927f0ec5..425f8aad7 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
@@ -58,6 +58,7 @@ public class MysqlSpecStoreTest {
private static final String PASSWORD = "testPassword";
private static final String TABLE = "spec_store";
+ private ITestMetastoreDatabase testDb;
private MysqlSpecStore specStore;
private MysqlSpecStore oldSpecStore;
private final URI uri1 = FlowSpec.Utils.createFlowSpecUri(new
FlowId().setFlowName("fg1").setFlowGroup("fn1"));
@@ -65,7 +66,6 @@ public class MysqlSpecStoreTest {
private final URI uri3 = FlowSpec.Utils.createFlowSpecUri(new
FlowId().setFlowName("fg3").setFlowGroup("fn3"));
private final URI uri4 = FlowSpec.Utils.createFlowSpecUri(new
FlowId().setFlowName("fg4").setFlowGroup("fn4"));
private FlowSpec flowSpec1, flowSpec2, flowSpec3, flowSpec4;
- private static ITestMetastoreDatabase testDb;
public MysqlSpecStoreTest()
throws URISyntaxException { // (based on `uri1` and other
initializations just above)
@@ -73,10 +73,10 @@ public class MysqlSpecStoreTest {
@BeforeClass
public void setUp() throws Exception {
- testDb = TestMetastoreDatabaseFactory.get();
+ this.testDb = TestMetastoreDatabaseFactory.get();
Config config = ConfigBuilder.create()
- .addPrimitive(ConfigurationKeys.STATE_STORE_DB_URL_KEY,
testDb.getJdbcUrl())
+ .addPrimitive(ConfigurationKeys.STATE_STORE_DB_URL_KEY,
this.testDb.getJdbcUrl())
.addPrimitive(ConfigurationKeys.STATE_STORE_DB_USER_KEY, USER)
.addPrimitive(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, PASSWORD)
.addPrimitive(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TABLE)
@@ -134,8 +134,9 @@ public class MysqlSpecStoreTest {
@AfterClass(alwaysRun = true)
public void tearDown() throws Exception {
- if (testDb != null) {
- testDb.close();
+ if (this.testDb != null) {
+ // `.close()` to avoid (in the aggregate, across multiple suites) -
java.sql.SQLNonTransientConnectionException: Too many connections
+ this.testDb.close();
}
}
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreWithUpdateTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreWithUpdateTest.java
index bf6e4fe99..8b12c46f6 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreWithUpdateTest.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreWithUpdateTest.java
@@ -58,6 +58,7 @@ public class MysqlSpecStoreWithUpdateTest {
private static final String PASSWORD = "testPassword";
private static final String TABLE = "spec_store";
+ private ITestMetastoreDatabase testDb;
private MysqlSpecStoreWithUpdate specStore;
private MysqlSpecStore oldSpecStore;
private final URI uri1 = FlowSpec.Utils.createFlowSpecUri(new
FlowId().setFlowName("fg1").setFlowGroup("fn1"));
@@ -65,7 +66,6 @@ public class MysqlSpecStoreWithUpdateTest {
private final URI uri3 = FlowSpec.Utils.createFlowSpecUri(new
FlowId().setFlowName("fg3").setFlowGroup("fn3"));
private final URI uri4 = FlowSpec.Utils.createFlowSpecUri(new
FlowId().setFlowName("fg4").setFlowGroup("fn4"));
private FlowSpec flowSpec1, flowSpec2, flowSpec3, flowSpec4,
flowSpec4_update;
- private static ITestMetastoreDatabase testDb;
public MysqlSpecStoreWithUpdateTest()
throws URISyntaxException { // (based on `uri1` and other
initializations just above)
@@ -73,7 +73,7 @@ public class MysqlSpecStoreWithUpdateTest {
@BeforeClass
public void setUp() throws Exception {
- testDb = TestMetastoreDatabaseFactory.get();
+ this.testDb = TestMetastoreDatabaseFactory.get();
Config config = ConfigBuilder.create()
.addPrimitive(ConfigurationKeys.STATE_STORE_DB_URL_KEY,
testDb.getJdbcUrl())
@@ -144,8 +144,9 @@ public class MysqlSpecStoreWithUpdateTest {
@AfterClass(alwaysRun = true)
public void tearDown() throws Exception {
- if (testDb != null) {
- testDb.close();
+ if (this.testDb != null) {
+ // `.close()` to avoid (in the aggregate, across multiple suites) -
java.sql.SQLNonTransientConnectionException: Too many connections
+ this.testDb.close();
}
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
index a231a9eec..5a3a24465 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
@@ -101,8 +101,8 @@ public class GobblinServiceHATest {
private TestingServer testingZKServer;
+ private ITestMetastoreDatabase testMetastoreDatabase;
private MySQLContainer mysql;
- private static ITestMetastoreDatabase testMetastoreDatabase;
@BeforeClass
public void setup() throws Exception {
@@ -122,7 +122,6 @@ public class GobblinServiceHATest {
logger.info("Testing ZK Server listening on: " +
testingZKServer.getConnectString());
HelixUtils.createGobblinHelixCluster(testingZKServer.getConnectString(),
TEST_HELIX_CLUSTER_NAME);
-
testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
Properties commonServiceCoreProperties = new Properties();
@@ -202,9 +201,6 @@ public class GobblinServiceHATest {
@AfterClass(alwaysRun = true)
public void cleanUp() throws Exception {
- if (testMetastoreDatabase != null) {
- testMetastoreDatabase.close();
- }
// Shutdown Node 1
try {
logger.info("+++++++++++++++++++ start shutdown noad1");
@@ -254,7 +250,14 @@ public class GobblinServiceHATest {
cleanUpDir(COMMON_SPEC_STORE_PARENT_DIR);
- mysql.stop();
+ try {
+ mysql.stop();
+ } catch (Exception e) {
+ logger.warn("Could not completely stop Mysql");
+ }
+
+ // `.close()` to avoid (in the aggregate, across multiple suites) -
java.sql.SQLNonTransientConnectionException: Too many connections
+ testMetastoreDatabase.close();
}
@Test
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceRedirectTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceRedirectTest.java
index 280b05465..d9007e6da 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceRedirectTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceRedirectTest.java
@@ -108,8 +108,8 @@ public class GobblinServiceRedirectTest {
private Properties node1ServiceCoreProperties;
private Properties node2ServiceCoreProperties;
+ private ITestMetastoreDatabase testMetastoreDatabase;
private MySQLContainer mysql;
- private static ITestMetastoreDatabase testMetastoreDatabase;
@BeforeClass
public void setup() throws Exception {
@@ -124,7 +124,7 @@ public class GobblinServiceRedirectTest {
logger.info("Testing ZK Server listening on: " +
testingZKServer.getConnectString());
HelixUtils.createGobblinHelixCluster(testingZKServer.getConnectString(),
TEST_HELIX_CLUSTER_NAME);
- testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+ this.testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
Properties commonServiceCoreProperties = new Properties();
@@ -224,8 +224,9 @@ public class GobblinServiceRedirectTest {
}
mysql.stop();
- if (testMetastoreDatabase != null) {
- testMetastoreDatabase.close();
+ if (this.testMetastoreDatabase != null) {
+ // `.close()` to avoid (in the aggregate, across multiple suites) -
java.sql.SQLNonTransientConnectionException: Too many connections
+ this.testMetastoreDatabase.close();
}
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
index ec9377cec..607d44389 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
@@ -50,7 +50,7 @@ public class DagManagementTaskStreamImplTest {
private static final String TEST_USER = "testUser";
private static final String TEST_PASSWORD = "testPassword";
private static final String TEST_TABLE = "quotas";
- static ITestMetastoreDatabase testMetastoreDatabase;
+ private ITestMetastoreDatabase testMetastoreDatabase;
DagProcessingEngine.DagProcEngineThread dagProcEngineThread;
DagManagementTaskStreamImpl dagManagementTaskStream;
DagProcFactory dagProcFactory;
@@ -58,11 +58,11 @@ public class DagManagementTaskStreamImplTest {
@BeforeClass
public void setUp() throws Exception {
// Setting up mock DB
- testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+ this.testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
ConfigBuilder configBuilder = ConfigBuilder.create();
configBuilder.addPrimitive(MostlyMySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
MostlyMySqlDagManagementStateStoreTest.TestMysqlDagStateStore.class.getName())
-
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY),
testMetastoreDatabase.getJdbcUrl())
+
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY),
this.testMetastoreDatabase.getJdbcUrl())
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY),
TEST_USER)
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY),
TEST_PASSWORD)
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
TEST_TABLE);
@@ -85,6 +85,12 @@ public class DagManagementTaskStreamImplTest {
this.dagManagementTaskStream, this.dagProcFactory,
dagManagementStateStore, 0);
}
+ @AfterClass(alwaysRun = true)
+ public void tearDown() throws IOException {
+ // `.close()` to avoid (in the aggregate, across multiple suites) -
java.sql.SQLNonTransientConnectionException: Too many connections
+ this.testMetastoreDatabase.close();
+ }
+
/* This tests adding and removal of dag actions from dag task stream with a
launch task. It verifies that the
{@link DagManagementTaskStreamImpl#next()} call blocks until a {@link
LeaseAttemptStatus.LeaseObtainedStatus} is
returned for a particular action.
@@ -111,9 +117,4 @@ public class DagManagementTaskStreamImplTest {
DagProc dagProc = dagTask.host(this.dagProcFactory);
Assert.assertNotNull(dagProc);
}
-
- @AfterClass
- public void tearDown() throws IOException {
- testMetastoreDatabase.close();
- }
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
index 4f4fa98e7..88a572d73 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
@@ -73,17 +73,17 @@ public class DagManagerFlowTest {
private static final String flowName = "testFlowName";
private static final String flowExecutionId = "12345677";
private static final String flowExecutionId_2 = "12345678";
+ private ITestMetastoreDatabase testDb;
private DagActionStore dagActionStore;
- private static ITestMetastoreDatabase testDb;
@BeforeClass
public void setUp() throws Exception {
Properties props = new Properties();
props.put(DagManager.JOB_STATUS_POLLING_INTERVAL_KEY, 1);
- testDb = TestMetastoreDatabaseFactory.get();
+ this.testDb = TestMetastoreDatabaseFactory.get();
Config config = ConfigBuilder.create()
- .addPrimitive("MysqlDagActionStore." +
ConfigurationKeys.STATE_STORE_DB_URL_KEY, testDb.getJdbcUrl())
+ .addPrimitive("MysqlDagActionStore." +
ConfigurationKeys.STATE_STORE_DB_URL_KEY, this.testDb.getJdbcUrl())
.addPrimitive("MysqlDagActionStore." +
ConfigurationKeys.STATE_STORE_DB_USER_KEY, USER)
.addPrimitive("MysqlDagActionStore." +
ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, PASSWORD)
.addPrimitive("MysqlDagActionStore." +
ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TABLE)
@@ -99,11 +99,12 @@ public class DagManagerFlowTest {
Thread.sleep(30000);
}
- @AfterClass
+ @AfterClass(alwaysRun = true)
public void cleanUp() throws Exception {
dagManager.setActive(false);
Assert.assertEquals(dagManager.getHouseKeepingThreadPool().isShutdown(),
true);
if (testDb != null) {
+ // `.close()` to avoid (in the aggregate, across multiple suites) -
java.sql.SQLNonTransientConnectionException: Too many connections
testDb.close();
}
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
index 3f967dbd6..9718222a1 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
@@ -59,16 +59,16 @@ public class DagProcessingEngineTest {
private DagTaskStream dagTaskStream;
private DagProcFactory dagProcFactory;
private MostlyMySqlDagManagementStateStore dagManagementStateStore;
- static ITestMetastoreDatabase testMetastoreDatabase;
+ private ITestMetastoreDatabase testMetastoreDatabase;
static DagActionStore dagActionStore;
static LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus;
@BeforeClass
public void setUp() throws Exception {
// Setting up mock DB
- testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
- dagActionStore = mock(DagActionStore.class);
- doReturn(true).when(dagActionStore).deleteDagAction(any());
+ testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+ dagActionStore = mock(DagActionStore.class);
+ doReturn(true).when(dagActionStore).deleteDagAction(any());
leaseObtainedStatus = mock(LeaseAttemptStatus.LeaseObtainedStatus.class);
doReturn(true).when(leaseObtainedStatus).completeLease();
@@ -104,6 +104,12 @@ public class DagProcessingEngineTest {
dagProcessingEngine.startAsync();
}
+ @AfterClass(alwaysRun = true)
+ public void tearDown() throws IOException {
+ // `.close()` to avoid (in the aggregate, across multiple suites) -
java.sql.SQLNonTransientConnectionException: Too many connections
+ testMetastoreDatabase.close();
+ }
+
static class MockedDagTaskStream implements DagTaskStream {
public static final int MAX_NUM_OF_TASKS = 1000;
public static final int FAILING_DAGS_FREQUENCY = 10;
@@ -186,9 +192,4 @@ public class DagProcessingEngineTest {
Assert.assertEquals(this.dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.getCount(),
expectedExceptions);
}
-
- @AfterClass
- public void tearDown() throws IOException {
- testMetastoreDatabase.close();
- }
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
index f828a576d..6d6f8b431 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
@@ -57,24 +57,25 @@ import static org.mockito.Mockito.mock;
*/
public class MostlyMySqlDagManagementStateStoreTest {
+ private ITestMetastoreDatabase testDb;
private MostlyMySqlDagManagementStateStore dagManagementStateStore;
private static final String TEST_USER = "testUser";
private static final String TEST_PASSWORD = "testPassword";
private static final String TEST_DAG_STATE_STORE = "TestDagStateStore";
private static final String TEST_TABLE = "quotas";
- private static ITestMetastoreDatabase testDb;
@BeforeClass
public void setUp() throws Exception {
// Setting up mock DB
- testDb = TestMetastoreDatabaseFactory.get();
- this.dagManagementStateStore = getDummyDMSS(testDb);
+ this.testDb = TestMetastoreDatabaseFactory.get();
+ this.dagManagementStateStore = getDummyDMSS(this.testDb);
}
@AfterClass(alwaysRun = true)
public void tearDown() throws Exception {
- if (testDb != null) {
- testDb.close();
+ if (this.testDb != null) {
+ // `.close()` to avoid (in the aggregate, across multiple suites) -
java.sql.SQLNonTransientConnectionException: Too many connections
+ this.testDb.close();
}
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
index d88e19b30..0c39abd16 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
@@ -45,15 +45,14 @@ public class MysqlDagActionStoreTest {
private static final String flowExecutionId = "12345677";
private static final String flowExecutionId_2 = "12345678";
private static final String flowExecutionId_3 = "12345679";
- private MysqlDagActionStore mysqlDagActionStore;
-
private ITestMetastoreDatabase testDb;
+ private MysqlDagActionStore mysqlDagActionStore;
@BeforeClass
public void setUp() throws Exception {
this.testDb = TestMetastoreDatabaseFactory.get();
Config config = ConfigBuilder.create()
- .addPrimitive("MysqlDagActionStore." +
ConfigurationKeys.STATE_STORE_DB_URL_KEY, testDb.getJdbcUrl())
+ .addPrimitive("MysqlDagActionStore." +
ConfigurationKeys.STATE_STORE_DB_URL_KEY, this.testDb.getJdbcUrl())
.addPrimitive("MysqlDagActionStore." +
ConfigurationKeys.STATE_STORE_DB_USER_KEY, USER)
.addPrimitive("MysqlDagActionStore." +
ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, PASSWORD)
.addPrimitive("MysqlDagActionStore." +
ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TABLE)
@@ -62,8 +61,9 @@ public class MysqlDagActionStoreTest {
this.mysqlDagActionStore = new MysqlDagActionStore(config);
}
- @AfterClass
+ @AfterClass(alwaysRun = true)
public void tearDown() throws IOException {
+ // `.close()` to avoid (in the aggregate, across multiple suites) -
java.sql.SQLNonTransientConnectionException: Too many connections
this.testDb.close();
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreTest.java
index f4ea414d0..59816742a 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreTest.java
@@ -73,6 +73,7 @@ public class MysqlDagStateStoreTest {
@AfterClass(alwaysRun = true)
public void tearDown() throws Exception {
if (testDb != null) {
+ // `.close()` to avoid (in the aggregate, across multiple suites) -
java.sql.SQLNonTransientConnectionException: Too many connections
testDb.close();
}
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
index 48795176e..2699148b6 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
@@ -61,12 +61,12 @@ public class MysqlMultiActiveLeaseArbiterTest {
new DagActionStore.DagAction(flowGroup2, flowName, flowExecutionId,
jobName, DagActionStore.DagActionType.LAUNCH);
private static final long eventTimeMillis = System.currentTimeMillis();
private static final Timestamp dummyTimestamp = new Timestamp(99999);
+ private ITestMetastoreDatabase testDb;
private MysqlMultiActiveLeaseArbiter mysqlMultiActiveLeaseArbiter;
private String formattedAcquireLeaseIfMatchingAllStatement =
String.format(MysqlMultiActiveLeaseArbiter.CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT,
TABLE);
private String formattedAcquireLeaseIfFinishedStatement =
String.format(MysqlMultiActiveLeaseArbiter.CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT,
TABLE);
- ITestMetastoreDatabase testDb;
// The setup functionality verifies that the initialization of the tables is
done correctly and verifies any SQL
// syntax errors.
@@ -77,7 +77,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
Config config = ConfigBuilder.create()
.addPrimitive(ConfigurationKeys.SCHEDULER_EVENT_EPSILON_MILLIS_KEY,
EPSILON)
.addPrimitive(ConfigurationKeys.SCHEDULER_EVENT_LINGER_MILLIS_KEY,
LINGER)
- .addPrimitive(ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX + "." +
ConfigurationKeys.STATE_STORE_DB_URL_KEY, testDb.getJdbcUrl())
+ .addPrimitive(ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX + "." +
ConfigurationKeys.STATE_STORE_DB_URL_KEY, this.testDb.getJdbcUrl())
.addPrimitive(ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX + "." +
ConfigurationKeys.STATE_STORE_DB_USER_KEY, USER)
.addPrimitive(ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX + "." +
ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, PASSWORD)
.addPrimitive(ConfigurationKeys.LEASE_DETERMINATION_STORE_DB_TABLE_KEY, TABLE)
@@ -87,6 +87,12 @@ public class MysqlMultiActiveLeaseArbiterTest {
this.mysqlMultiActiveLeaseArbiter = new
MysqlMultiActiveLeaseArbiter(config);
}
+ @AfterClass(alwaysRun = true)
+ public void tearDown() throws IOException {
+ // `.close()` to avoid (in the aggregate, across multiple suites) -
java.sql.SQLNonTransientConnectionException: Too many connections
+ this.testDb.close();
+ }
+
/*
Tests all cases of trying to acquire a lease (CASES 1-6 detailed below)
for a flow action event with one
participant involved. All of the cases allow the flowExecutionId to be
updated by lease arbiter by setting
@@ -354,9 +360,4 @@ public class MysqlMultiActiveLeaseArbiterTest {
Assert.assertTrue(firstObtainedStatus.getConsensusDagAction().equals(
new DagActionStore.DagAction(flowGroup2, flowName, flowExecutionId,
jobName, DagActionStore.DagActionType.LAUNCH)));
}
-
- @AfterClass
- public void tearDown() throws IOException {
- this.testDb.close();
- }
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManagerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManagerTest.java
index de4bcacee..f3874bbdb 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManagerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManagerTest.java
@@ -38,16 +38,16 @@ public class MysqlUserQuotaManagerTest {
private static final String PASSWORD = "testPassword";
private static final String TABLE = "quotas";
private static final String PROXY_USER = "abora";
+ private ITestMetastoreDatabase testDb;
private MysqlUserQuotaManager quotaManager;
public static int INCREMENTS = 1000;
- ITestMetastoreDatabase testDb;
@BeforeClass
public void setUp() throws Exception {
- testDb = TestMetastoreDatabaseFactory.get();
+ this.testDb = TestMetastoreDatabaseFactory.get();
Config config = ConfigBuilder.create()
- .addPrimitive(MysqlUserQuotaManager.CONFIG_PREFIX + '.' +
ConfigurationKeys.STATE_STORE_DB_URL_KEY, testDb.getJdbcUrl())
+ .addPrimitive(MysqlUserQuotaManager.CONFIG_PREFIX + '.' +
ConfigurationKeys.STATE_STORE_DB_URL_KEY, this.testDb.getJdbcUrl())
.addPrimitive(MysqlUserQuotaManager.CONFIG_PREFIX + '.' +
ConfigurationKeys.STATE_STORE_DB_USER_KEY, USER)
.addPrimitive(MysqlUserQuotaManager.CONFIG_PREFIX + '.' +
ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, PASSWORD)
.addPrimitive(MysqlUserQuotaManager.CONFIG_PREFIX + '.' +
ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TABLE)
@@ -55,6 +55,13 @@ public class MysqlUserQuotaManagerTest {
this.quotaManager = new MysqlUserQuotaManager(config);
}
+
+ @AfterClass(alwaysRun = true)
+ public void tearDown() throws IOException {
+ // `.close()` to avoid (in the aggregate, across multiple suites) -
java.sql.SQLNonTransientConnectionException: Too many connections
+ this.testDb.close();
+ }
+
@Test
public void testRunningDagStore() throws Exception {
String dagId =
DagManagerUtils.generateDagId(DagManagerTest.buildDag("dagId", 1234L, "",
1).getNodes().get(0)).toString();
@@ -168,9 +175,4 @@ public class MysqlUserQuotaManagerTest {
thread6.join();
Assert.assertEquals(this.quotaManager.getCount(PROXY_USER,
AbstractUserQuotaManager.CountType.USER_COUNT), -1);
}
-
- @AfterClass(alwaysRun = true)
- public void tearDown() throws IOException {
- testDb.close();
- }
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
index cc39a58c5..c283b49d5 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
@@ -64,12 +64,12 @@ import static org.mockito.Mockito.spy;
public class KillDagProcTest {
private MostlyMySqlDagManagementStateStore dagManagementStateStore;
- private static ITestMetastoreDatabase testDb;
+ private ITestMetastoreDatabase testDb;
@BeforeClass
public void setUp() throws Exception {
- testDb = TestMetastoreDatabaseFactory.get();
- this.dagManagementStateStore =
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(testDb));
+ this.testDb = TestMetastoreDatabaseFactory.get();
+ this.dagManagementStateStore =
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testDb));
doReturn(FlowSpec.builder().build()).when(this.dagManagementStateStore).getFlowSpec(any());
doNothing().when(this.dagManagementStateStore).tryAcquireQuota(any());
doNothing().when(this.dagManagementStateStore).addDagNodeState(any(),
any());
@@ -77,8 +77,9 @@ public class KillDagProcTest {
@AfterClass(alwaysRun = true)
public void tearDown() throws Exception {
- if (testDb != null) {
- testDb.close();
+ if (this.testDb != null) {
+ // `.close()` to avoid (in the aggregate, across multiple suites) -
java.sql.SQLNonTransientConnectionException: Too many connections
+ this.testDb.close();
}
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
index dbe6d1f58..48c65870e 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
@@ -62,8 +62,8 @@ import static org.mockito.Mockito.spy;
public class LaunchDagProcTest {
- private MostlyMySqlDagManagementStateStore dagManagementStateStore;
private ITestMetastoreDatabase testMetastoreDatabase;
+ private MostlyMySqlDagManagementStateStore dagManagementStateStore;
@BeforeClass
public void setUp() throws Exception {
@@ -74,6 +74,12 @@ public class LaunchDagProcTest {
doNothing().when(this.dagManagementStateStore).addDagNodeState(any(),
any());
}
+ @AfterClass(alwaysRun = true)
+ public void tearDown() throws Exception {
+ // `.close()` to avoid (in the aggregate, across multiple suites) -
java.sql.SQLNonTransientConnectionException: Too many connections
+ this.testMetastoreDatabase.close();
+ }
+
@Test
public void launchDag()
throws IOException, InterruptedException, URISyntaxException {
@@ -93,11 +99,6 @@ public class LaunchDagProcTest {
.filter(a ->
a.getMethod().getName().equals("addDagNodeState")).count());
}
- @AfterClass
- public void tearDown() throws Exception {
- this.testMetastoreDatabase.close();
- }
-
// This creates a dag like this
// D1 D2 D3
// \ | /
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
index 94dfd9c99..c9642d437 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
@@ -26,6 +26,9 @@ import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.mockito.Mockito;
import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import com.typesafe.config.ConfigFactory;
@@ -61,18 +64,36 @@ public class ReevaluateDagProcTest {
private final long flowExecutionId = System.currentTimeMillis();
private final String flowGroup = "fg";
- void mockDMSS(DagManagementStateStore dagManagementStateStore) throws
IOException, SpecNotFoundException {
+ private ITestMetastoreDatabase testMetastoreDatabase;
+ private DagManagementStateStore dagManagementStateStore;
+
+ @BeforeClass
+ public void setUpClass() throws Exception {
+ this.testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+ }
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ this.dagManagementStateStore =
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
+ mockDMSSCommonBehavior(dagManagementStateStore);
+ }
+
+ private void mockDMSSCommonBehavior(DagManagementStateStore
dagManagementStateStore) throws IOException, SpecNotFoundException {
doReturn(FlowSpec.builder().build()).when(dagManagementStateStore).getFlowSpec(any());
doNothing().when(dagManagementStateStore).tryAcquireQuota(any());
doNothing().when(dagManagementStateStore).addDagNodeState(any(), any());
+ doNothing().when(dagManagementStateStore).deleteDagNodeState(any(), any());
doReturn(true).when(dagManagementStateStore).releaseQuota(any());
}
+ @AfterClass(alwaysRun = true)
+ public void tearDownClass() throws Exception {
+ // `.close()` to avoid (in the aggregate, across multiple suites) -
java.sql.SQLNonTransientConnectionException: Too many connections
+ this.testMetastoreDatabase.close();
+ }
+
@Test
public void testOneNextJobToRun() throws Exception {
- ITestMetastoreDatabase testMetastoreDatabase =
TestMetastoreDatabaseFactory.get();
- DagManagementStateStore dagManagementStateStore =
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(testMetastoreDatabase));
- mockDMSS(dagManagementStateStore);
String flowName = "fn";
Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId,
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
2, "user5", ConfigFactory.empty()
@@ -93,7 +114,6 @@ public class ReevaluateDagProcTest {
doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
doReturn(new ImmutablePair<>(Optional.of(dag.getStartNodes().get(0)),
Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
doReturn(Optional.of(dag)).when(dagManagementStateStore).getParentDag(any());
- doNothing().when(dagManagementStateStore).deleteDagNodeState(any(), any());
ReevaluateDagProc
reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new
DagActionStore.DagAction(flowGroup, flowName,
@@ -114,16 +134,11 @@ public class ReevaluateDagProcTest {
// current job's state is deleted
Assert.assertEquals(Mockito.mockingDetails(dagManagementStateStore).getInvocations().stream()
.filter(a ->
a.getMethod().getName().equals("deleteDagNodeState")).count(), 1);
-
- testMetastoreDatabase.close();
}
// test when there does not exist a next job in the dag when the current
job's reevaluate dag action is processed
@Test
public void testNoNextJobToRun() throws Exception {
- ITestMetastoreDatabase testMetastoreDatabase =
TestMetastoreDatabaseFactory.get();
- DagManagementStateStore dagManagementStateStore =
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(testMetastoreDatabase));
- mockDMSS(dagManagementStateStore);
String flowName = "fn2";
Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("2", flowExecutionId,
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
1, "user5", ConfigFactory.empty()
@@ -138,7 +153,6 @@ public class ReevaluateDagProcTest {
doReturn(new ImmutablePair<>(Optional.of(dag.getStartNodes().get(0)),
Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
doReturn(Optional.of(dag)).when(dagManagementStateStore).getParentDag(any());
doReturn(true).when(dagManagementStateStore).releaseQuota(any());
- doNothing().when(dagManagementStateStore).deleteDagNodeState(any(), any());
List<SpecProducer<Spec>> specProducers = dag.getNodes().stream().map(n -> {
try {
@@ -171,7 +185,5 @@ public class ReevaluateDagProcTest {
// dag is deleted because the only job in the dag is completed
Assert.assertEquals(Mockito.mockingDetails(dagManagementStateStore).getInvocations().stream()
.filter(a -> a.getMethod().getName().equals("deleteDag")).count(), 1);
-
- testMetastoreDatabase.close();
}
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
index 591e65f66..5a595d652 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
@@ -23,6 +23,7 @@ import java.util.Properties;
import org.apache.commons.lang3.tuple.Pair;
import org.testng.Assert;
+import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -189,8 +190,12 @@ public class MysqlJobStatusRetrieverTest extends
JobStatusRetrieverTest {
this.dbJobStateStore.delete(KafkaJobStatusMonitor.jobStatusStoreName(FLOW_GROUP,
FLOW_NAME));
}
+ @AfterClass(alwaysRun = true)
@Override
public void tearDown() throws Exception {
- this.testMetastoreDatabase.close();
+ if (this.testMetastoreDatabase != null) {
+ // `.close()` to avoid (in the aggregate, across multiple suites) -
java.sql.SQLNonTransientConnectionException: Too many connections
+ this.testMetastoreDatabase.close();
+ }
}
}