This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 1abd4cc42 [GOBBLIN-2083] set max connections to 2000 in mysql started 
through the docker image to avoid "too many connections" error (#3969)
1abd4cc42 is described below

commit 1abd4cc42ffd47130b0556c978f6df858156f636
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Tue Jun 11 15:00:14 2024 -0700

    [GOBBLIN-2083] set max connections to 2000 in mysql started through the 
docker image to avoid "too many connections" error (#3969)
    
    * set max connections to 2000 in mysql docker image to avoid "too many 
connections" error
    * fix tests using AssertWithBackOff
---
 .github/workflows/build_and_test.yaml              |  1 +
 .../hive/materializer/HiveMaterializerTest.java    | 29 +++----
 .../retention/sql/SqlBasedRetentionPoc.java        | 89 ++++++++++++----------
 .../embedded/EmbeddedGobblinDistcpTest.java        |  4 +-
 .../testing/TestMetastoreDatabaseFactory.java      |  6 +-
 .../conversion/hive/validation/ValidationJob.java  | 48 +++++-------
 .../gobblin/compliance/HiveProxyQueryExecutor.java | 12 +--
 .../service/monitoring/FlowStatusGenerator.java    |  2 +-
 .../gobblin/service/GobblinServiceManagerTest.java |  4 +-
 .../orchestration/MysqlUserQuotaManagerTest.java   | 14 ++--
 .../scheduler/GobblinServiceJobSchedulerTest.java  |  4 +-
 .../service/monitoring/FsFlowGraphMonitorTest.java | 39 +++++-----
 .../tunnel/TestTunnelWithArbitraryTCPTraffic.java  | 21 ++---
 13 files changed, 133 insertions(+), 140 deletions(-)

diff --git a/.github/workflows/build_and_test.yaml 
b/.github/workflows/build_and_test.yaml
index b70e18ab3..b179a306a 100644
--- a/.github/workflows/build_and_test.yaml
+++ b/.github/workflows/build_and_test.yaml
@@ -134,6 +134,7 @@ jobs:
             sudo dpkg -l | grep -i mysql
             sudo apt-get clean
             sudo apt-get install -y mysql-client
+            mysql --host 127.0.0.1 --port 3306 -uroot -ppassword -e "SET 
GLOBAL max_connections=2000"
             mysql --host 127.0.0.1 --port 3306 -uroot -ppassword -e "SHOW 
DATABASES"
       - name: Cache Gradle Dependencies
         uses: actions/cache@v2
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerTest.java
index 7512b35c7..c3d043726 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerTest.java
@@ -27,6 +27,19 @@ import java.util.List;
 import java.util.Properties;
 import java.util.stream.Collectors;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.WorkUnitState;
 import 
org.apache.gobblin.data.management.conversion.hive.LocalHiveMetastoreTestUtils;
@@ -40,18 +53,6 @@ import org.apache.gobblin.runtime.TaskContext;
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.util.AutoReturnableObject;
 import org.apache.gobblin.util.HiveJdbcConnector;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.ql.metadata.Table;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-import com.google.common.io.Files;
 
 
 @Test (groups = {"disabledOnCI"})
@@ -234,10 +235,10 @@ public class HiveMaterializerTest {
   }
 
   private List<List<String>> executeStatementAndGetResults(HiveJdbcConnector 
connector, String query, int columns) throws SQLException {
-    Connection conn = connector.getConnection();
     List<List<String>> result = new ArrayList<>();
 
-    try (Statement stmt = conn.createStatement()) {
+    try (Connection conn = connector.getConnection();
+        Statement stmt = conn.createStatement()) {
       stmt.execute(query);
       ResultSet rs = stmt.getResultSet();
       while (rs.next()) {
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/sql/SqlBasedRetentionPoc.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/sql/SqlBasedRetentionPoc.java
index e7899f800..51004a08d 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/sql/SqlBasedRetentionPoc.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/sql/SqlBasedRetentionPoc.java
@@ -22,8 +22,6 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Timestamp;
 
-import com.zaxxer.hikari.HikariDataSource;
-
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.Path;
 import org.joda.time.DateTime;
@@ -33,6 +31,8 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import com.zaxxer.hikari.HikariDataSource;
+
 
 /**
  * A Proof of concept to represent Retention policies as SQL queries. The POC 
uses Apache Derby in-memory database to
@@ -71,8 +71,11 @@ public class SqlBasedRetentionPoc {
     execute("CREATE FUNCTION TIMESTAMP_DIFF(timestamp1 TIMESTAMP, timestamp2 
TIMESTAMP, unitString VARCHAR(50)) RETURNS BIGINT PARAMETER STYLE JAVA NO SQL 
LANGUAGE JAVA EXTERNAL NAME 
'org.apache.gobblin.data.management.retention.sql.SqlUdfs.timestamp_diff'");
   }
 
-  @AfterClass
+  @AfterClass (alwaysRun = true)
   public void cleanUp() throws Exception {
+    if (connection != null) {
+      connection.close();
+    }
     dataSource.close();
   }
 
@@ -89,17 +92,17 @@ public class SqlBasedRetentionPoc {
     insertSnapshot(new 
Path("/data/databases/Forum/Comments/1453889323804-PT-440936752"));
 
     // Derby does not support LIMIT keyword. The suggested workaround is to 
setMaxRows in the PreparedStatement
-    PreparedStatement statement = connection.prepareStatement("SELECT name 
FROM Snapshots ORDER BY ts desc");
-    statement.setMaxRows(2);
-
-    ResultSet rs = statement.executeQuery();
-
-    // Snapshots to be retained
-    rs.next();
-    Assert.assertEquals(rs.getString(1), "1453889323804-PT-440936752");
-    rs.next();
-    Assert.assertEquals(rs.getString(1), "1453860526464-PT-440847244");
-
+    try (PreparedStatement statement = connection.prepareStatement("SELECT 
name FROM Snapshots ORDER BY ts desc")) {
+      statement.setMaxRows(2);
+
+      try (ResultSet rs = statement.executeQuery()) {
+        // Snapshots to be retained
+        rs.next();
+        Assert.assertEquals(rs.getString(1), "1453889323804-PT-440936752");
+        rs.next();
+        Assert.assertEquals(rs.getString(1), "1453860526464-PT-440847244");
+      }
+    }
   }
 
   /**
@@ -119,18 +122,19 @@ public class SqlBasedRetentionPoc {
     Timestamp currentTimestamp =
         new 
Timestamp(DateTimeFormat.forPattern(DAILY_PARTITION_PATTERN).parseDateTime("2016/01/25").getMillis());
 
-    PreparedStatement statement =
-        connection.prepareStatement("SELECT path FROM Daily_Partitions WHERE 
TIMESTAMP_DIFF(?, ts, 'Days') > ?");
-    statement.setTimestamp(1, currentTimestamp);
-    statement.setLong(2, TWO_YEARS_IN_DAYS);
-    ResultSet rs = statement.executeQuery();
-
-    // Daily partitions to be cleaned
-    rs.next();
-    Assert.assertEquals(rs.getString(1), 
"/data/tracking/MetricEvent/daily/2014/01/22");
-    rs.next();
-    Assert.assertEquals(rs.getString(1), 
"/data/tracking/MetricEvent/daily/2013/01/25");
-
+    try (PreparedStatement statement =
+        connection.prepareStatement("SELECT path FROM Daily_Partitions WHERE 
TIMESTAMP_DIFF(?, ts, 'Days') > ?")) {
+      statement.setTimestamp(1, currentTimestamp);
+      statement.setLong(2, TWO_YEARS_IN_DAYS);
+      try (ResultSet rs = statement.executeQuery()) {
+
+        // Daily partitions to be cleaned
+        rs.next();
+        Assert.assertEquals(rs.getString(1), 
"/data/tracking/MetricEvent/daily/2014/01/22");
+        rs.next();
+        Assert.assertEquals(rs.getString(1), 
"/data/tracking/MetricEvent/daily/2013/01/25");
+      }
+    }
   }
 
   private void insertSnapshot(Path snapshotPath) throws Exception {
@@ -140,15 +144,15 @@ public class SqlBasedRetentionPoc {
     long ts = Long.parseLong(StringUtils.substringBefore(snapshotName, 
"-PT-"));
     long recordCount = Long.parseLong(StringUtils.substringAfter(snapshotName, 
"-PT-"));
 
-    PreparedStatement insert = connection.prepareStatement("INSERT INTO 
Snapshots VALUES (?, ?, ?, ?, ?)");
-    insert.setString(1, datasetPath);
-    insert.setString(2, snapshotName);
-    insert.setString(3, snapshotPath.toString());
-    insert.setTimestamp(4, new Timestamp(ts));
-    insert.setLong(5, recordCount);
-
-    insert.executeUpdate();
+    try (PreparedStatement insert = connection.prepareStatement("INSERT INTO 
Snapshots VALUES (?, ?, ?, ?, ?)")) {
+      insert.setString(1, datasetPath);
+      insert.setString(2, snapshotName);
+      insert.setString(3, snapshotPath.toString());
+      insert.setTimestamp(4, new Timestamp(ts));
+      insert.setLong(5, recordCount);
 
+      insert.executeUpdate();
+    }
   }
 
   private void insertDailyPartition(Path dailyPartitionPath) throws Exception {
@@ -159,17 +163,18 @@ public class SqlBasedRetentionPoc {
         DateTimeFormat.forPattern(DAILY_PARTITION_PATTERN).parseDateTime(
             StringUtils.substringAfter(dailyPartitionPath.toString(), "daily" 
+ Path.SEPARATOR));
 
-    PreparedStatement insert = connection.prepareStatement("INSERT INTO 
Daily_Partitions VALUES (?, ?, ?)");
-    insert.setString(1, datasetPath);
-    insert.setString(2, dailyPartitionPath.toString());
-    insert.setTimestamp(3, new Timestamp(partition.getMillis()));
-
-    insert.executeUpdate();
+    try (PreparedStatement insert = connection.prepareStatement("INSERT INTO 
Daily_Partitions VALUES (?, ?, ?)")) {
+      insert.setString(1, datasetPath);
+      insert.setString(2, dailyPartitionPath.toString());
+      insert.setTimestamp(3, new Timestamp(partition.getMillis()));
 
+      insert.executeUpdate();
+    }
   }
 
   private void execute(String query) throws SQLException {
-    PreparedStatement insertStatement = connection.prepareStatement(query);
-    insertStatement.executeUpdate();
+    try (PreparedStatement insertStatement = 
connection.prepareStatement(query)) {
+      insertStatement.executeUpdate();
+    }
   }
 }
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
index 2fefded1f..83491ca5c 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
@@ -151,10 +151,12 @@ public class EmbeddedGobblinDistcpTest {
     metaStoreClient.tableExists(TARGET_DB, TEST_TABLE);
     FileSystem fs = FileSystem.getLocal(new Configuration());
     fs.exists(new Path(TARGET_PATH));
+
+    statement.close();
   }
 
   // Tearing down the Hive components from derby driver if there's anything 
generated through the test.
-  @AfterClass
+  @AfterClass (alwaysRun = true)
   public void hiveTearDown() throws Exception {
     FileSystem fs = FileSystem.getLocal(new Configuration());
     Path targetPath = new Path(TARGET_PATH);
diff --git 
a/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/testing/TestMetastoreDatabaseFactory.java
 
b/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/testing/TestMetastoreDatabaseFactory.java
index bf5295db2..1d63c86f6 100644
--- 
a/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/testing/TestMetastoreDatabaseFactory.java
+++ 
b/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/testing/TestMetastoreDatabaseFactory.java
@@ -56,7 +56,7 @@ public class TestMetastoreDatabaseFactory {
     public static ITestMetastoreDatabase get(String version, Config dbConfig) 
throws Exception {
         try {
             synchronized (syncObject) {
-                ensureDatabaseExists(dbConfig);
+                ensureDatabaseServerExists(dbConfig);
                 TestMetadataDatabase instance = new 
TestMetadataDatabase(testMetastoreDatabaseServer, version);
                 instances.add(instance);
                 return instance;
@@ -72,14 +72,14 @@ public class TestMetastoreDatabaseFactory {
 
     static void release(ITestMetastoreDatabase instance) throws IOException {
         synchronized (syncObject) {
-            if (instances.remove(instance) && instances.size() == 0) {
+            if (instances.remove(instance) && instances.isEmpty()) {
                 testMetastoreDatabaseServer.close();
                 testMetastoreDatabaseServer = null;
             }
         }
     }
 
-    private static void ensureDatabaseExists(Config dbConfig) throws Exception 
{
+    private static void ensureDatabaseServerExists(Config dbConfig) throws 
Exception {
         if (testMetastoreDatabaseServer == null) {
             try (Mutex ignored = new Mutex()) {
                 if (testMetastoreDatabaseServer == null) {
diff --git 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/data/management/conversion/hive/validation/ValidationJob.java
 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/data/management/conversion/hive/validation/ValidationJob.java
index 57117cb0c..710a9a8eb 100644
--- 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/data/management/conversion/hive/validation/ValidationJob.java
+++ 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/data/management/conversion/hive/validation/ValidationJob.java
@@ -16,14 +16,10 @@
  */
 package org.apache.gobblin.data.management.conversion.hive.validation;
 
-import org.apache.gobblin.config.client.ConfigClient;
-import org.apache.gobblin.config.client.api.VersionStabilityPolicy;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import 
org.apache.gobblin.data.management.conversion.hive.task.HiveConverterUtils;
-import org.apache.gobblin.util.PathUtils;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.net.URI;
+import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
@@ -58,8 +54,6 @@ import org.apache.log4j.Logger;
 import org.joda.time.DateTime;
 import org.slf4j.LoggerFactory;
 
-import azkaban.jobExecutor.AbstractJob;
-
 import com.google.common.base.Charsets;
 import com.google.common.base.Enums;
 import com.google.common.base.Optional;
@@ -74,6 +68,11 @@ import 
com.google.common.util.concurrent.UncheckedExecutionException;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
+import azkaban.jobExecutor.AbstractJob;
+
+import org.apache.gobblin.config.client.ConfigClient;
+import org.apache.gobblin.config.client.api.VersionStabilityPolicy;
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import 
org.apache.gobblin.data.management.conversion.hive.dataset.ConvertibleHiveDataset;
 import 
org.apache.gobblin.data.management.conversion.hive.dataset.ConvertibleHiveDatasetFinder;
 import 
org.apache.gobblin.data.management.conversion.hive.events.EventConstants;
@@ -82,12 +81,12 @@ import 
org.apache.gobblin.data.management.conversion.hive.provider.UpdateNotFoun
 import 
org.apache.gobblin.data.management.conversion.hive.provider.UpdateProviderFactory;
 import 
org.apache.gobblin.data.management.conversion.hive.query.HiveValidationQueryGenerator;
 import org.apache.gobblin.data.management.conversion.hive.source.HiveSource;
+import 
org.apache.gobblin.data.management.conversion.hive.task.HiveConverterUtils;
 import org.apache.gobblin.data.management.copy.hive.HiveDataset;
 import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder;
 import org.apache.gobblin.data.management.copy.hive.HiveUtils;
 import org.apache.gobblin.hive.HiveMetastoreClientPool;
 import org.apache.gobblin.hive.HiveSerDeWrapper;
-import org.apache.gobblin.util.HiveJdbcConnector;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.event.EventSubmitter;
@@ -95,6 +94,8 @@ import org.apache.gobblin.util.AutoReturnableObject;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.ExecutorsUtils;
 import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.HiveJdbcConnector;
+import org.apache.gobblin.util.PathUtils;
 
 
 /**
@@ -490,25 +491,23 @@ public class ValidationJob extends AbstractJob {
    */
   @SuppressWarnings("unused")
   private List<Long> getValidationOutputFromHiveJdbc(List<String> queries) 
throws IOException {
-    if (null == queries || queries.size() == 0) {
+    if (null == queries || queries.isEmpty()) {
       log.warn("No queries specified to be executed");
       return Collections.emptyList();
     }
-    Statement statement = null;
     List<Long> rowCounts = Lists.newArrayList();
-    Closer closer = Closer.create();
-
-    try {
-      HiveJdbcConnector hiveJdbcConnector = 
HiveJdbcConnector.newConnectorWithProps(props);
-      statement = hiveJdbcConnector.getConnection().createStatement();
 
+    try (HiveJdbcConnector hiveJdbcConnector = 
HiveJdbcConnector.newConnectorWithProps(props);
+        Connection connection = hiveJdbcConnector.getConnection();
+        Statement statement = connection.createStatement()){
       for (String query : queries) {
         log.info("Executing query: " + query);
         boolean result = statement.execute(query);
         if (result) {
-          ResultSet resultSet = statement.getResultSet();
-          if (resultSet.next()) {
-            rowCounts.add(resultSet.getLong(1));
+          try (ResultSet resultSet = statement.getResultSet()) {
+            if (resultSet.next()) {
+              rowCounts.add(resultSet.getLong(1));
+            }
           }
         } else {
           log.warn("Query output for: " + query + " : " + result);
@@ -517,19 +516,6 @@ public class ValidationJob extends AbstractJob {
 
     } catch (SQLException e) {
       throw new RuntimeException(e);
-    } finally {
-      try {
-        closer.close();
-      } catch (Exception e) {
-        log.warn("Could not close HiveJdbcConnector", e);
-      }
-      if (null != statement) {
-        try {
-          statement.close();
-        } catch (SQLException e) {
-          log.warn("Could not close Hive statement", e);
-        }
-      }
     }
 
     return rowCounts;
diff --git 
a/gobblin-modules/gobblin-compliance/src/main/java/org/apache/gobblin/compliance/HiveProxyQueryExecutor.java
 
b/gobblin-modules/gobblin-compliance/src/main/java/org/apache/gobblin/compliance/HiveProxyQueryExecutor.java
index 02e90adc9..1a7f5fbda 100644
--- 
a/gobblin-modules/gobblin-compliance/src/main/java/org/apache/gobblin/compliance/HiveProxyQueryExecutor.java
+++ 
b/gobblin-modules/gobblin-compliance/src/main/java/org/apache/gobblin/compliance/HiveProxyQueryExecutor.java
@@ -28,7 +28,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hive.jdbc.HiveConnection;
 import org.apache.thrift.TException;
@@ -118,12 +117,9 @@ public class HiveProxyQueryExecutor implements 
QueryExecutor, Closeable {
     String realm = this.state.getProp(ConfigurationKeys.KERBEROS_REALM);
     UserGroupInformation loginUser = UserGroupInformation
         
.loginUserFromKeytabAndReturnUGI(HostUtils.getPrincipalUsingHostname(superUser, 
realm), keytabLocation);
-    loginUser.doAs(new PrivilegedExceptionAction<Void>() {
-      @Override
-      public Void run()
-          throws MetaException, SQLException, ClassNotFoundException {
-        for (String proxy : proxies) {
-          HiveConnection hiveConnection = 
getHiveConnection(Optional.fromNullable(proxy));
+    loginUser.doAs((PrivilegedExceptionAction<Void>) () -> {
+      for (String proxy : proxies) {
+        try (HiveConnection hiveConnection = 
getHiveConnection(Optional.fromNullable(proxy))) {
           Statement statement = hiveConnection.createStatement();
           statementMap.put(proxy, statement);
           connectionMap.put(proxy, hiveConnection);
@@ -131,8 +127,8 @@ public class HiveProxyQueryExecutor implements 
QueryExecutor, Closeable {
             statement.execute(setting);
           }
         }
-        return null;
       }
+      return null;
     });
   }
 
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
index 4307659b4..c6183cc52 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
@@ -30,8 +30,8 @@ import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 
 import javax.inject.Inject;
-
 import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.service.ExecutionStatus;
 
 
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
index b9861ede3..63327b291 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
@@ -242,7 +242,7 @@ public class GobblinServiceManagerTest {
     }
   }
 
-  @AfterClass
+  @AfterClass (alwaysRun = true)
   public void cleanUp() throws Exception {
     // Shutdown Service
     try {
@@ -405,7 +405,7 @@ public class GobblinServiceManagerTest {
     }
   }
 
-  @Test (dependsOnMethods = "testUncompilableJob")
+  @Test (dependsOnMethods = "testRunQuotaExceeds")
   public void testExplainJob() throws Exception {
     int sizeBeforeTest = 
this.gobblinServiceManager.getFlowCatalog().getSpecs().size();
     FlowId flowId = createFlowIdWithUniqueName(TEST_GROUP_NAME);
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManagerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManagerTest.java
index f3874bbdb..c040e5178 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManagerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManagerTest.java
@@ -41,6 +41,7 @@ public class MysqlUserQuotaManagerTest {
   private ITestMetastoreDatabase testDb;
   private MysqlUserQuotaManager quotaManager;
   public static int INCREMENTS = 1000;
+  Connection connection;
 
   @BeforeClass
   public void setUp() throws Exception {
@@ -54,18 +55,21 @@ public class MysqlUserQuotaManagerTest {
         .build();
 
     this.quotaManager = new MysqlUserQuotaManager(config);
+    this.connection = this.quotaManager.quotaStore.dataSource.getConnection();
   }
 
   @AfterClass(alwaysRun = true)
-  public void tearDown() throws IOException {
+  public void tearDown() throws IOException, SQLException {
     // `.close()` to avoid (in the aggregate, across multiple suites) - 
java.sql.SQLNonTransientConnectionException: Too many connections
+    if (this.connection != null) {
+      this.connection.close();
+    }
     this.testDb.close();
   }
 
   @Test
   public void testRunningDagStore() throws Exception {
     String dagId = 
DagManagerUtils.generateDagId(DagManagerTest.buildDag("dagId", 1234L, "", 
1).getNodes().get(0)).toString();
-    Connection connection = 
this.quotaManager.quotaStore.dataSource.getConnection();
     Assert.assertFalse(this.quotaManager.containsDagId(dagId));
     this.quotaManager.addDagId(connection, dagId);
     connection.commit();
@@ -75,12 +79,10 @@ public class MysqlUserQuotaManagerTest {
     Assert.assertFalse(this.quotaManager.containsDagId(dagId));
     Assert.assertFalse(this.quotaManager.removeDagId(connection, dagId));
     connection.commit();
-    connection.close();
   }
 
     @Test
   public void testIncreaseCount() throws Exception {
-    Connection connection = 
this.quotaManager.quotaStore.dataSource.getConnection();
     int prevCount = this.quotaManager.incrementJobCount(connection, 
PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT);
     connection.commit();
     Assert.assertEquals(prevCount, 0);
@@ -97,12 +99,10 @@ public class MysqlUserQuotaManagerTest {
     prevCount = this.quotaManager.incrementJobCount(connection, PROXY_USER, 
AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT);
     connection.commit();
     Assert.assertEquals(prevCount, 1);
-    connection.close();
   }
 
   @Test(dependsOnMethods = "testIncreaseCount")
   public void testDecreaseCount() throws Exception {
-    Connection connection = 
this.quotaManager.quotaStore.dataSource.getConnection();
     this.quotaManager.decrementJobCount(connection, PROXY_USER, 
AbstractUserQuotaManager.CountType.USER_COUNT);
     connection.commit();
     Assert.assertEquals(this.quotaManager.getCount(PROXY_USER, 
AbstractUserQuotaManager.CountType.USER_COUNT), 1);
@@ -135,7 +135,7 @@ public class MysqlUserQuotaManagerTest {
     public void run() {
       int i = 0;
       while (i++ < INCREMENTS) {
-        try (Connection connection = 
MysqlUserQuotaManagerTest.this.quotaManager.quotaStore.dataSource.getConnection();)
 {
+        try (Connection connection = 
MysqlUserQuotaManagerTest.this.quotaManager.quotaStore.dataSource.getConnection())
 {
           if (increaseOrDecrease) {
             
MysqlUserQuotaManagerTest.this.quotaManager.incrementJobCount(connection, 
PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT);
           } else {
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
index c264bbfec..dfc7d3a83 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
@@ -333,7 +333,6 @@ public class GobblinServiceJobSchedulerTest {
     FlowCatalog flowCatalog = new 
FlowCatalog(ConfigUtils.propertiesToConfig(properties));
     ServiceBasedAppLauncher serviceLauncher = new 
ServiceBasedAppLauncher(properties, "GaaSJobSchedulerTest");
 
-
     serviceLauncher.addService(flowCatalog);
     serviceLauncher.start();
 
@@ -384,7 +383,8 @@ public class GobblinServiceJobSchedulerTest {
     
Assert.assertEquals(schedulerWithWarmStandbyEnabled.scheduledFlowSpecs.size(), 
1);
     schedulerWithWarmStandbyEnabled.onAddSpec(flowSpec1);
     // Second flow should be added to scheduled flows since no quota check in 
this case
-    
Assert.assertEquals(schedulerWithWarmStandbyEnabled.scheduledFlowSpecs.size(), 
2);
+    
AssertWithBackoff.create().maxSleepMs(200L).timeoutMs(5000L).backoffFactor(1)
+        .assertTrue(input -> 
schedulerWithWarmStandbyEnabled.scheduledFlowSpecs.size() == 2, "Waiting for 
add spec to complete");
     // set scheduler to be inactive and unschedule flows
     schedulerWithWarmStandbyEnabled.setActive(false);
     
Assert.assertEquals(schedulerWithWarmStandbyEnabled.scheduledFlowSpecs.size(), 
0);
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitorTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitorTest.java
index 1b3397756..ea4eaf17c 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitorTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitorTest.java
@@ -17,14 +17,6 @@
 
 package org.apache.gobblin.service.monitoring;
 
-import com.google.common.base.Charsets;
-import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
-import com.google.common.io.Files;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigValueFactory;
-
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
@@ -37,6 +29,23 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.Path;
+import org.eclipse.jgit.transport.RefSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.io.Files;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
 import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.api.TopologySpec;
@@ -49,14 +58,7 @@ import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
 import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
 import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
 import 
org.apache.gobblin.service.modules.template_catalog.UpdatableFSFlowTemplateCatalog;
-import org.apache.hadoop.fs.Path;
-import org.eclipse.jgit.transport.RefSpec;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
+import org.apache.gobblin.testing.AssertWithBackoff;
 
 
 public class FsFlowGraphMonitorTest {
@@ -239,9 +241,8 @@ public class FsFlowGraphMonitorTest {
 
     //If deleting all the templates, the cache of flow templates will be 
cleared and the flowgraph will be unable to add edges on reload.
     cleanUpDir(this.flowTemplateCatalogFolder.getAbsolutePath());
-    Thread.sleep(3000);
-    Assert.assertEquals(this.flowGraph.get().getEdges("node1").size(), 0);
-
+    
AssertWithBackoff.create().maxSleepMs(200L).timeoutMs(10000L).backoffFactor(1)
+        .assertTrue(input -> this.flowGraph.get().getEdges("node1").isEmpty(), 
"Waiting for flowgraph to become empty");
     URI flowTemplateCatalogUri = 
this.getClass().getClassLoader().getResource("template_catalog").toURI();
     // Adding the flowtemplates back will make the edges eligible to be added 
again on reload.
     FileUtils.copyDirectory(new File(flowTemplateCatalogUri.getPath()), 
this.flowTemplateCatalogFolder);
diff --git 
a/gobblin-tunnel/src/test/java/org/apache/gobblin/tunnel/TestTunnelWithArbitraryTCPTraffic.java
 
b/gobblin-tunnel/src/test/java/org/apache/gobblin/tunnel/TestTunnelWithArbitraryTCPTraffic.java
index cd4fb7a3f..667a39857 100644
--- 
a/gobblin-tunnel/src/test/java/org/apache/gobblin/tunnel/TestTunnelWithArbitraryTCPTraffic.java
+++ 
b/gobblin-tunnel/src/test/java/org/apache/gobblin/tunnel/TestTunnelWithArbitraryTCPTraffic.java
@@ -17,11 +17,6 @@
 
 package org.apache.gobblin.tunnel;
 
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotEquals;
-import static org.testng.Assert.assertTrue;
-
 import java.io.BufferedOutputStream;
 import java.io.BufferedReader;
 import java.io.IOException;
@@ -36,6 +31,7 @@ import java.security.NoSuchAlgorithmException;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
+import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -49,6 +45,11 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertTrue;
+
 /**
  * Tests for Tunnel with arbitrary TCP traffic.
  *
@@ -594,15 +595,15 @@ public class TestTunnelWithArbitraryTCPTraffic {
     MockServer proxyServer = startConnectProxyServer();
     Tunnel tunnel = Tunnel.build("useastdb.ensembl.org", 5306,
         "localhost", proxyServer.getServerSocketPort());
+    int port = tunnel.getPort();
 
-    try {
-      int port = tunnel.getPort();
+    try (Connection connection =
+        DriverManager.getConnection("jdbc:mysql://localhost:" + port + 
"/homo_sapiens_core_82_38?user=anonymous");
+        Statement statement = connection.createStatement()) {
 
-      Connection connection =
-          DriverManager.getConnection("jdbc:mysql://localhost:" + port + 
"/homo_sapiens_core_82_38?user=anonymous");
       String query2 = "SELECT DISTINCT gene_id, biotype, source, description 
from gene LIMIT 1000";
 
-      ResultSet resultSet = connection.createStatement().executeQuery(query2);
+      ResultSet resultSet = statement.executeQuery(query2);
 
       int row = 0;
 

Reply via email to