This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 1abd4cc42 [GOBBLIN-2083] set max connections to 2000 in mysql started
through the docker image to avoid "too many connections" error (#3969)
1abd4cc42 is described below
commit 1abd4cc42ffd47130b0556c978f6df858156f636
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Tue Jun 11 15:00:14 2024 -0700
[GOBBLIN-2083] set max connections to 2000 in mysql started through the
docker image to avoid "too many connections" error (#3969)
* set max connections to 2000 in mysql docker image to avoid "too many
connections" error
* fix tests using AssertWithBackOff
---
.github/workflows/build_and_test.yaml | 1 +
.../hive/materializer/HiveMaterializerTest.java | 29 +++----
.../retention/sql/SqlBasedRetentionPoc.java | 89 ++++++++++++----------
.../embedded/EmbeddedGobblinDistcpTest.java | 4 +-
.../testing/TestMetastoreDatabaseFactory.java | 6 +-
.../conversion/hive/validation/ValidationJob.java | 48 +++++-------
.../gobblin/compliance/HiveProxyQueryExecutor.java | 12 +--
.../service/monitoring/FlowStatusGenerator.java | 2 +-
.../gobblin/service/GobblinServiceManagerTest.java | 4 +-
.../orchestration/MysqlUserQuotaManagerTest.java | 14 ++--
.../scheduler/GobblinServiceJobSchedulerTest.java | 4 +-
.../service/monitoring/FsFlowGraphMonitorTest.java | 39 +++++-----
.../tunnel/TestTunnelWithArbitraryTCPTraffic.java | 21 ++---
13 files changed, 133 insertions(+), 140 deletions(-)
diff --git a/.github/workflows/build_and_test.yaml
b/.github/workflows/build_and_test.yaml
index b70e18ab3..b179a306a 100644
--- a/.github/workflows/build_and_test.yaml
+++ b/.github/workflows/build_and_test.yaml
@@ -134,6 +134,7 @@ jobs:
sudo dpkg -l | grep -i mysql
sudo apt-get clean
sudo apt-get install -y mysql-client
+ mysql --host 127.0.0.1 --port 3306 -uroot -ppassword -e "SET
GLOBAL max_connections=2000"
mysql --host 127.0.0.1 --port 3306 -uroot -ppassword -e "SHOW
DATABASES"
- name: Cache Gradle Dependencies
uses: actions/cache@v2
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerTest.java
index 7512b35c7..c3d043726 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerTest.java
@@ -27,6 +27,19 @@ import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.WorkUnitState;
import
org.apache.gobblin.data.management.conversion.hive.LocalHiveMetastoreTestUtils;
@@ -40,18 +53,6 @@ import org.apache.gobblin.runtime.TaskContext;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.AutoReturnableObject;
import org.apache.gobblin.util.HiveJdbcConnector;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.ql.metadata.Table;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-import com.google.common.io.Files;
@Test (groups = {"disabledOnCI"})
@@ -234,10 +235,10 @@ public class HiveMaterializerTest {
}
private List<List<String>> executeStatementAndGetResults(HiveJdbcConnector
connector, String query, int columns) throws SQLException {
- Connection conn = connector.getConnection();
List<List<String>> result = new ArrayList<>();
- try (Statement stmt = conn.createStatement()) {
+ try (Connection conn = connector.getConnection();
+ Statement stmt = conn.createStatement()) {
stmt.execute(query);
ResultSet rs = stmt.getResultSet();
while (rs.next()) {
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/sql/SqlBasedRetentionPoc.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/sql/SqlBasedRetentionPoc.java
index e7899f800..51004a08d 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/sql/SqlBasedRetentionPoc.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/sql/SqlBasedRetentionPoc.java
@@ -22,8 +22,6 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
-import com.zaxxer.hikari.HikariDataSource;
-
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.Path;
import org.joda.time.DateTime;
@@ -33,6 +31,8 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import com.zaxxer.hikari.HikariDataSource;
+
/**
* A Proof of concept to represent Retention policies as SQL queries. The POC
uses Apache Derby in-memory database to
@@ -71,8 +71,11 @@ public class SqlBasedRetentionPoc {
execute("CREATE FUNCTION TIMESTAMP_DIFF(timestamp1 TIMESTAMP, timestamp2
TIMESTAMP, unitString VARCHAR(50)) RETURNS BIGINT PARAMETER STYLE JAVA NO SQL
LANGUAGE JAVA EXTERNAL NAME
'org.apache.gobblin.data.management.retention.sql.SqlUdfs.timestamp_diff'");
}
- @AfterClass
+ @AfterClass (alwaysRun = true)
public void cleanUp() throws Exception {
+ if (connection != null) {
+ connection.close();
+ }
dataSource.close();
}
@@ -89,17 +92,17 @@ public class SqlBasedRetentionPoc {
insertSnapshot(new
Path("/data/databases/Forum/Comments/1453889323804-PT-440936752"));
// Derby does not support LIMIT keyword. The suggested workaround is to
setMaxRows in the PreparedStatement
- PreparedStatement statement = connection.prepareStatement("SELECT name
FROM Snapshots ORDER BY ts desc");
- statement.setMaxRows(2);
-
- ResultSet rs = statement.executeQuery();
-
- // Snapshots to be retained
- rs.next();
- Assert.assertEquals(rs.getString(1), "1453889323804-PT-440936752");
- rs.next();
- Assert.assertEquals(rs.getString(1), "1453860526464-PT-440847244");
-
+ try (PreparedStatement statement = connection.prepareStatement("SELECT
name FROM Snapshots ORDER BY ts desc")) {
+ statement.setMaxRows(2);
+
+ try (ResultSet rs = statement.executeQuery()) {
+ // Snapshots to be retained
+ rs.next();
+ Assert.assertEquals(rs.getString(1), "1453889323804-PT-440936752");
+ rs.next();
+ Assert.assertEquals(rs.getString(1), "1453860526464-PT-440847244");
+ }
+ }
}
/**
@@ -119,18 +122,19 @@ public class SqlBasedRetentionPoc {
Timestamp currentTimestamp =
new
Timestamp(DateTimeFormat.forPattern(DAILY_PARTITION_PATTERN).parseDateTime("2016/01/25").getMillis());
- PreparedStatement statement =
- connection.prepareStatement("SELECT path FROM Daily_Partitions WHERE
TIMESTAMP_DIFF(?, ts, 'Days') > ?");
- statement.setTimestamp(1, currentTimestamp);
- statement.setLong(2, TWO_YEARS_IN_DAYS);
- ResultSet rs = statement.executeQuery();
-
- // Daily partitions to be cleaned
- rs.next();
- Assert.assertEquals(rs.getString(1),
"/data/tracking/MetricEvent/daily/2014/01/22");
- rs.next();
- Assert.assertEquals(rs.getString(1),
"/data/tracking/MetricEvent/daily/2013/01/25");
-
+ try (PreparedStatement statement =
+ connection.prepareStatement("SELECT path FROM Daily_Partitions WHERE
TIMESTAMP_DIFF(?, ts, 'Days') > ?")) {
+ statement.setTimestamp(1, currentTimestamp);
+ statement.setLong(2, TWO_YEARS_IN_DAYS);
+ try (ResultSet rs = statement.executeQuery()) {
+
+ // Daily partitions to be cleaned
+ rs.next();
+ Assert.assertEquals(rs.getString(1),
"/data/tracking/MetricEvent/daily/2014/01/22");
+ rs.next();
+ Assert.assertEquals(rs.getString(1),
"/data/tracking/MetricEvent/daily/2013/01/25");
+ }
+ }
}
private void insertSnapshot(Path snapshotPath) throws Exception {
@@ -140,15 +144,15 @@ public class SqlBasedRetentionPoc {
long ts = Long.parseLong(StringUtils.substringBefore(snapshotName,
"-PT-"));
long recordCount = Long.parseLong(StringUtils.substringAfter(snapshotName,
"-PT-"));
- PreparedStatement insert = connection.prepareStatement("INSERT INTO
Snapshots VALUES (?, ?, ?, ?, ?)");
- insert.setString(1, datasetPath);
- insert.setString(2, snapshotName);
- insert.setString(3, snapshotPath.toString());
- insert.setTimestamp(4, new Timestamp(ts));
- insert.setLong(5, recordCount);
-
- insert.executeUpdate();
+ try (PreparedStatement insert = connection.prepareStatement("INSERT INTO
Snapshots VALUES (?, ?, ?, ?, ?)")) {
+ insert.setString(1, datasetPath);
+ insert.setString(2, snapshotName);
+ insert.setString(3, snapshotPath.toString());
+ insert.setTimestamp(4, new Timestamp(ts));
+ insert.setLong(5, recordCount);
+ insert.executeUpdate();
+ }
}
private void insertDailyPartition(Path dailyPartitionPath) throws Exception {
@@ -159,17 +163,18 @@ public class SqlBasedRetentionPoc {
DateTimeFormat.forPattern(DAILY_PARTITION_PATTERN).parseDateTime(
StringUtils.substringAfter(dailyPartitionPath.toString(), "daily"
+ Path.SEPARATOR));
- PreparedStatement insert = connection.prepareStatement("INSERT INTO
Daily_Partitions VALUES (?, ?, ?)");
- insert.setString(1, datasetPath);
- insert.setString(2, dailyPartitionPath.toString());
- insert.setTimestamp(3, new Timestamp(partition.getMillis()));
-
- insert.executeUpdate();
+ try (PreparedStatement insert = connection.prepareStatement("INSERT INTO
Daily_Partitions VALUES (?, ?, ?)")) {
+ insert.setString(1, datasetPath);
+ insert.setString(2, dailyPartitionPath.toString());
+ insert.setTimestamp(3, new Timestamp(partition.getMillis()));
+ insert.executeUpdate();
+ }
}
private void execute(String query) throws SQLException {
- PreparedStatement insertStatement = connection.prepareStatement(query);
- insertStatement.executeUpdate();
+ try (PreparedStatement insertStatement =
connection.prepareStatement(query)) {
+ insertStatement.executeUpdate();
+ }
}
}
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
index 2fefded1f..83491ca5c 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
@@ -151,10 +151,12 @@ public class EmbeddedGobblinDistcpTest {
metaStoreClient.tableExists(TARGET_DB, TEST_TABLE);
FileSystem fs = FileSystem.getLocal(new Configuration());
fs.exists(new Path(TARGET_PATH));
+
+ statement.close();
}
// Tearing down the Hive components from derby driver if there's anything
generated through the test.
- @AfterClass
+ @AfterClass (alwaysRun = true)
public void hiveTearDown() throws Exception {
FileSystem fs = FileSystem.getLocal(new Configuration());
Path targetPath = new Path(TARGET_PATH);
diff --git
a/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/testing/TestMetastoreDatabaseFactory.java
b/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/testing/TestMetastoreDatabaseFactory.java
index bf5295db2..1d63c86f6 100644
---
a/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/testing/TestMetastoreDatabaseFactory.java
+++
b/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/testing/TestMetastoreDatabaseFactory.java
@@ -56,7 +56,7 @@ public class TestMetastoreDatabaseFactory {
public static ITestMetastoreDatabase get(String version, Config dbConfig)
throws Exception {
try {
synchronized (syncObject) {
- ensureDatabaseExists(dbConfig);
+ ensureDatabaseServerExists(dbConfig);
TestMetadataDatabase instance = new
TestMetadataDatabase(testMetastoreDatabaseServer, version);
instances.add(instance);
return instance;
@@ -72,14 +72,14 @@ public class TestMetastoreDatabaseFactory {
static void release(ITestMetastoreDatabase instance) throws IOException {
synchronized (syncObject) {
- if (instances.remove(instance) && instances.size() == 0) {
+ if (instances.remove(instance) && instances.isEmpty()) {
testMetastoreDatabaseServer.close();
testMetastoreDatabaseServer = null;
}
}
}
- private static void ensureDatabaseExists(Config dbConfig) throws Exception
{
+ private static void ensureDatabaseServerExists(Config dbConfig) throws
Exception {
if (testMetastoreDatabaseServer == null) {
try (Mutex ignored = new Mutex()) {
if (testMetastoreDatabaseServer == null) {
diff --git
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/data/management/conversion/hive/validation/ValidationJob.java
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/data/management/conversion/hive/validation/ValidationJob.java
index 57117cb0c..710a9a8eb 100644
---
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/data/management/conversion/hive/validation/ValidationJob.java
+++
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/data/management/conversion/hive/validation/ValidationJob.java
@@ -16,14 +16,10 @@
*/
package org.apache.gobblin.data.management.conversion.hive.validation;
-import org.apache.gobblin.config.client.ConfigClient;
-import org.apache.gobblin.config.client.api.VersionStabilityPolicy;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import
org.apache.gobblin.data.management.conversion.hive.task.HiveConverterUtils;
-import org.apache.gobblin.util.PathUtils;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
+import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
@@ -58,8 +54,6 @@ import org.apache.log4j.Logger;
import org.joda.time.DateTime;
import org.slf4j.LoggerFactory;
-import azkaban.jobExecutor.AbstractJob;
-
import com.google.common.base.Charsets;
import com.google.common.base.Enums;
import com.google.common.base.Optional;
@@ -74,6 +68,11 @@ import
com.google.common.util.concurrent.UncheckedExecutionException;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import azkaban.jobExecutor.AbstractJob;
+
+import org.apache.gobblin.config.client.ConfigClient;
+import org.apache.gobblin.config.client.api.VersionStabilityPolicy;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import
org.apache.gobblin.data.management.conversion.hive.dataset.ConvertibleHiveDataset;
import
org.apache.gobblin.data.management.conversion.hive.dataset.ConvertibleHiveDatasetFinder;
import
org.apache.gobblin.data.management.conversion.hive.events.EventConstants;
@@ -82,12 +81,12 @@ import
org.apache.gobblin.data.management.conversion.hive.provider.UpdateNotFoun
import
org.apache.gobblin.data.management.conversion.hive.provider.UpdateProviderFactory;
import
org.apache.gobblin.data.management.conversion.hive.query.HiveValidationQueryGenerator;
import org.apache.gobblin.data.management.conversion.hive.source.HiveSource;
+import
org.apache.gobblin.data.management.conversion.hive.task.HiveConverterUtils;
import org.apache.gobblin.data.management.copy.hive.HiveDataset;
import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder;
import org.apache.gobblin.data.management.copy.hive.HiveUtils;
import org.apache.gobblin.hive.HiveMetastoreClientPool;
import org.apache.gobblin.hive.HiveSerDeWrapper;
-import org.apache.gobblin.util.HiveJdbcConnector;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.EventSubmitter;
@@ -95,6 +94,8 @@ import org.apache.gobblin.util.AutoReturnableObject;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.HiveJdbcConnector;
+import org.apache.gobblin.util.PathUtils;
/**
@@ -490,25 +491,23 @@ public class ValidationJob extends AbstractJob {
*/
@SuppressWarnings("unused")
private List<Long> getValidationOutputFromHiveJdbc(List<String> queries)
throws IOException {
- if (null == queries || queries.size() == 0) {
+ if (null == queries || queries.isEmpty()) {
log.warn("No queries specified to be executed");
return Collections.emptyList();
}
- Statement statement = null;
List<Long> rowCounts = Lists.newArrayList();
- Closer closer = Closer.create();
-
- try {
- HiveJdbcConnector hiveJdbcConnector =
HiveJdbcConnector.newConnectorWithProps(props);
- statement = hiveJdbcConnector.getConnection().createStatement();
+ try (HiveJdbcConnector hiveJdbcConnector =
HiveJdbcConnector.newConnectorWithProps(props);
+ Connection connection = hiveJdbcConnector.getConnection();
+ Statement statement = connection.createStatement()){
for (String query : queries) {
log.info("Executing query: " + query);
boolean result = statement.execute(query);
if (result) {
- ResultSet resultSet = statement.getResultSet();
- if (resultSet.next()) {
- rowCounts.add(resultSet.getLong(1));
+ try (ResultSet resultSet = statement.getResultSet()) {
+ if (resultSet.next()) {
+ rowCounts.add(resultSet.getLong(1));
+ }
}
} else {
log.warn("Query output for: " + query + " : " + result);
@@ -517,19 +516,6 @@ public class ValidationJob extends AbstractJob {
} catch (SQLException e) {
throw new RuntimeException(e);
- } finally {
- try {
- closer.close();
- } catch (Exception e) {
- log.warn("Could not close HiveJdbcConnector", e);
- }
- if (null != statement) {
- try {
- statement.close();
- } catch (SQLException e) {
- log.warn("Could not close Hive statement", e);
- }
- }
}
return rowCounts;
diff --git
a/gobblin-modules/gobblin-compliance/src/main/java/org/apache/gobblin/compliance/HiveProxyQueryExecutor.java
b/gobblin-modules/gobblin-compliance/src/main/java/org/apache/gobblin/compliance/HiveProxyQueryExecutor.java
index 02e90adc9..1a7f5fbda 100644
---
a/gobblin-modules/gobblin-compliance/src/main/java/org/apache/gobblin/compliance/HiveProxyQueryExecutor.java
+++
b/gobblin-modules/gobblin-compliance/src/main/java/org/apache/gobblin/compliance/HiveProxyQueryExecutor.java
@@ -28,7 +28,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.jdbc.HiveConnection;
import org.apache.thrift.TException;
@@ -118,12 +117,9 @@ public class HiveProxyQueryExecutor implements
QueryExecutor, Closeable {
String realm = this.state.getProp(ConfigurationKeys.KERBEROS_REALM);
UserGroupInformation loginUser = UserGroupInformation
.loginUserFromKeytabAndReturnUGI(HostUtils.getPrincipalUsingHostname(superUser,
realm), keytabLocation);
- loginUser.doAs(new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run()
- throws MetaException, SQLException, ClassNotFoundException {
- for (String proxy : proxies) {
- HiveConnection hiveConnection =
getHiveConnection(Optional.fromNullable(proxy));
+ loginUser.doAs((PrivilegedExceptionAction<Void>) () -> {
+ for (String proxy : proxies) {
+ try (HiveConnection hiveConnection =
getHiveConnection(Optional.fromNullable(proxy))) {
Statement statement = hiveConnection.createStatement();
statementMap.put(proxy, statement);
connectionMap.put(proxy, hiveConnection);
@@ -131,8 +127,8 @@ public class HiveProxyQueryExecutor implements
QueryExecutor, Closeable {
statement.execute(setting);
}
}
- return null;
}
+ return null;
});
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
index 4307659b4..c6183cc52 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
@@ -30,8 +30,8 @@ import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import javax.inject.Inject;
-
import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.service.ExecutionStatus;
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
index b9861ede3..63327b291 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
@@ -242,7 +242,7 @@ public class GobblinServiceManagerTest {
}
}
- @AfterClass
+ @AfterClass (alwaysRun = true)
public void cleanUp() throws Exception {
// Shutdown Service
try {
@@ -405,7 +405,7 @@ public class GobblinServiceManagerTest {
}
}
- @Test (dependsOnMethods = "testUncompilableJob")
+ @Test (dependsOnMethods = "testRunQuotaExceeds")
public void testExplainJob() throws Exception {
int sizeBeforeTest =
this.gobblinServiceManager.getFlowCatalog().getSpecs().size();
FlowId flowId = createFlowIdWithUniqueName(TEST_GROUP_NAME);
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManagerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManagerTest.java
index f3874bbdb..c040e5178 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManagerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManagerTest.java
@@ -41,6 +41,7 @@ public class MysqlUserQuotaManagerTest {
private ITestMetastoreDatabase testDb;
private MysqlUserQuotaManager quotaManager;
public static int INCREMENTS = 1000;
+ Connection connection;
@BeforeClass
public void setUp() throws Exception {
@@ -54,18 +55,21 @@ public class MysqlUserQuotaManagerTest {
.build();
this.quotaManager = new MysqlUserQuotaManager(config);
+ this.connection = this.quotaManager.quotaStore.dataSource.getConnection();
}
@AfterClass(alwaysRun = true)
- public void tearDown() throws IOException {
+ public void tearDown() throws IOException, SQLException {
// `.close()` to avoid (in the aggregate, across multiple suites) -
java.sql.SQLNonTransientConnectionException: Too many connections
+ if (this.connection != null) {
+ this.connection.close();
+ }
this.testDb.close();
}
@Test
public void testRunningDagStore() throws Exception {
String dagId =
DagManagerUtils.generateDagId(DagManagerTest.buildDag("dagId", 1234L, "",
1).getNodes().get(0)).toString();
- Connection connection =
this.quotaManager.quotaStore.dataSource.getConnection();
Assert.assertFalse(this.quotaManager.containsDagId(dagId));
this.quotaManager.addDagId(connection, dagId);
connection.commit();
@@ -75,12 +79,10 @@ public class MysqlUserQuotaManagerTest {
Assert.assertFalse(this.quotaManager.containsDagId(dagId));
Assert.assertFalse(this.quotaManager.removeDagId(connection, dagId));
connection.commit();
- connection.close();
}
@Test
public void testIncreaseCount() throws Exception {
- Connection connection =
this.quotaManager.quotaStore.dataSource.getConnection();
int prevCount = this.quotaManager.incrementJobCount(connection,
PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT);
connection.commit();
Assert.assertEquals(prevCount, 0);
@@ -97,12 +99,10 @@ public class MysqlUserQuotaManagerTest {
prevCount = this.quotaManager.incrementJobCount(connection, PROXY_USER,
AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT);
connection.commit();
Assert.assertEquals(prevCount, 1);
- connection.close();
}
@Test(dependsOnMethods = "testIncreaseCount")
public void testDecreaseCount() throws Exception {
- Connection connection =
this.quotaManager.quotaStore.dataSource.getConnection();
this.quotaManager.decrementJobCount(connection, PROXY_USER,
AbstractUserQuotaManager.CountType.USER_COUNT);
connection.commit();
Assert.assertEquals(this.quotaManager.getCount(PROXY_USER,
AbstractUserQuotaManager.CountType.USER_COUNT), 1);
@@ -135,7 +135,7 @@ public class MysqlUserQuotaManagerTest {
public void run() {
int i = 0;
while (i++ < INCREMENTS) {
- try (Connection connection =
MysqlUserQuotaManagerTest.this.quotaManager.quotaStore.dataSource.getConnection();)
{
+ try (Connection connection =
MysqlUserQuotaManagerTest.this.quotaManager.quotaStore.dataSource.getConnection())
{
if (increaseOrDecrease) {
MysqlUserQuotaManagerTest.this.quotaManager.incrementJobCount(connection,
PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT);
} else {
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
index c264bbfec..dfc7d3a83 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
@@ -333,7 +333,6 @@ public class GobblinServiceJobSchedulerTest {
FlowCatalog flowCatalog = new
FlowCatalog(ConfigUtils.propertiesToConfig(properties));
ServiceBasedAppLauncher serviceLauncher = new
ServiceBasedAppLauncher(properties, "GaaSJobSchedulerTest");
-
serviceLauncher.addService(flowCatalog);
serviceLauncher.start();
@@ -384,7 +383,8 @@ public class GobblinServiceJobSchedulerTest {
Assert.assertEquals(schedulerWithWarmStandbyEnabled.scheduledFlowSpecs.size(),
1);
schedulerWithWarmStandbyEnabled.onAddSpec(flowSpec1);
// Second flow should be added to scheduled flows since no quota check in
this case
-
Assert.assertEquals(schedulerWithWarmStandbyEnabled.scheduledFlowSpecs.size(),
2);
+
AssertWithBackoff.create().maxSleepMs(200L).timeoutMs(5000L).backoffFactor(1)
+ .assertTrue(input ->
schedulerWithWarmStandbyEnabled.scheduledFlowSpecs.size() == 2, "Waiting for
add spec to complete");
// set scheduler to be inactive and unschedule flows
schedulerWithWarmStandbyEnabled.setActive(false);
Assert.assertEquals(schedulerWithWarmStandbyEnabled.scheduledFlowSpecs.size(),
0);
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitorTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitorTest.java
index 1b3397756..ea4eaf17c 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitorTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitorTest.java
@@ -17,14 +17,6 @@
package org.apache.gobblin.service.monitoring;
-import com.google.common.base.Charsets;
-import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
-import com.google.common.io.Files;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigValueFactory;
-
import java.io.File;
import java.io.IOException;
import java.net.URI;
@@ -37,6 +29,23 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.Path;
+import org.eclipse.jgit.transport.RefSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.io.Files;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.api.TopologySpec;
@@ -49,14 +58,7 @@ import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
import
org.apache.gobblin.service.modules.template_catalog.UpdatableFSFlowTemplateCatalog;
-import org.apache.hadoop.fs.Path;
-import org.eclipse.jgit.transport.RefSpec;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
+import org.apache.gobblin.testing.AssertWithBackoff;
public class FsFlowGraphMonitorTest {
@@ -239,9 +241,8 @@ public class FsFlowGraphMonitorTest {
//If deleting all the templates, the cache of flow templates will be
cleared and the flowgraph will be unable to add edges on reload.
cleanUpDir(this.flowTemplateCatalogFolder.getAbsolutePath());
- Thread.sleep(3000);
- Assert.assertEquals(this.flowGraph.get().getEdges("node1").size(), 0);
-
+
AssertWithBackoff.create().maxSleepMs(200L).timeoutMs(10000L).backoffFactor(1)
+ .assertTrue(input -> this.flowGraph.get().getEdges("node1").isEmpty(),
"Waiting for flowgraph to become empty");
URI flowTemplateCatalogUri =
this.getClass().getClassLoader().getResource("template_catalog").toURI();
// Adding the flowtemplates back will make the edges eligible to be added
again on reload.
FileUtils.copyDirectory(new File(flowTemplateCatalogUri.getPath()),
this.flowTemplateCatalogFolder);
diff --git
a/gobblin-tunnel/src/test/java/org/apache/gobblin/tunnel/TestTunnelWithArbitraryTCPTraffic.java
b/gobblin-tunnel/src/test/java/org/apache/gobblin/tunnel/TestTunnelWithArbitraryTCPTraffic.java
index cd4fb7a3f..667a39857 100644
---
a/gobblin-tunnel/src/test/java/org/apache/gobblin/tunnel/TestTunnelWithArbitraryTCPTraffic.java
+++
b/gobblin-tunnel/src/test/java/org/apache/gobblin/tunnel/TestTunnelWithArbitraryTCPTraffic.java
@@ -17,11 +17,6 @@
package org.apache.gobblin.tunnel;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotEquals;
-import static org.testng.Assert.assertTrue;
-
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.IOException;
@@ -36,6 +31,7 @@ import java.security.NoSuchAlgorithmException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
+import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -49,6 +45,11 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertTrue;
+
/**
* Tests for Tunnel with arbitrary TCP traffic.
*
@@ -594,15 +595,15 @@ public class TestTunnelWithArbitraryTCPTraffic {
MockServer proxyServer = startConnectProxyServer();
Tunnel tunnel = Tunnel.build("useastdb.ensembl.org", 5306,
"localhost", proxyServer.getServerSocketPort());
+ int port = tunnel.getPort();
- try {
- int port = tunnel.getPort();
+ try (Connection connection =
+ DriverManager.getConnection("jdbc:mysql://localhost:" + port +
"/homo_sapiens_core_82_38?user=anonymous");
+ Statement statement = connection.createStatement()) {
- Connection connection =
- DriverManager.getConnection("jdbc:mysql://localhost:" + port +
"/homo_sapiens_core_82_38?user=anonymous");
String query2 = "SELECT DISTINCT gene_id, biotype, source, description
from gene LIMIT 1000";
- ResultSet resultSet = connection.createStatement().executeQuery(query2);
+ ResultSet resultSet = statement.executeQuery(query2);
int row = 0;