This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 47a6114  [BEAM-9629] Fix several connection leak issues. (#12209)
47a6114 is described below

commit 47a611423053dc50bd55a156a5f58c040c9ab1cd
Author: Lukasz Cwik <lukec...@gmail.com>
AuthorDate: Fri Jul 10 10:44:13 2020 -0700

    [BEAM-9629] Fix several connection leak issues. (#12209)
    
    * Make the default max connections to be unlimited. This mirrors the 2.17 
and earlier behavior since we used to create one DataSource per execution 
thread (and hence one connection).
    * Move connection fetching to @ProcessElement to not hold a connection for 
empty bundles.
    * Make sure that finalize() closes the connection so that it is returned to 
the pool if the bundle fails.
---
 .../java/org/apache/beam/sdk/io/jdbc/JdbcIO.java   | 69 ++++++++++++++++------
 1 file changed, 51 insertions(+), 18 deletions(-)

diff --git 
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
index f52ff40..d03925e 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
@@ -147,10 +147,10 @@ import org.slf4j.LoggerFactory;
  * );
  * }</pre>
  *
- * By default, the provided function instantiates a DataSource per execution 
thread. In some
- * circumstances, such as DataSources that have a pool of connections, this 
can quickly overwhelm
- * the database by requesting too many connections. In that case you should 
make the DataSource a
- * static singleton so it gets instantiated only once per JVM.
+ * <p>By default, the provided function requests a DataSource per execution 
thread. In some
+ * circumstances this can quickly overwhelm the database by requesting too 
many connections. In that
+ * case you should look into sharing a single instance of a {@link 
PoolingDataSource} across all the
+ * execution threads.
  *
  * <h3>Writing to JDBC datasource</h3>
  *
@@ -883,11 +883,14 @@ public class JdbcIO {
     @Setup
     public void setup() throws Exception {
       dataSource = dataSourceProviderFn.apply(null);
-      connection = dataSource.getConnection();
     }
 
     @ProcessElement
     public void processElement(ProcessContext context) throws Exception {
+      // Only acquire the connection if we need to perform a read.
+      if (connection == null) {
+        connection = dataSource.getConnection();
+      }
       try (PreparedStatement statement =
           connection.prepareStatement(
               query.get(), ResultSet.TYPE_FORWARD_ONLY, 
ResultSet.CONCUR_READ_ONLY)) {
@@ -901,9 +904,24 @@ public class JdbcIO {
       }
     }
 
-    @Teardown
-    public void teardown() throws Exception {
-      connection.close();
+    @FinishBundle
+    public void finishBundle() throws Exception {
+      cleanUpConnection();
+    }
+
+    @Override
+    protected void finalize() throws Throwable {
+      cleanUpConnection();
+    }
+
+    private void cleanUpConnection() throws Exception {
+      if (connection != null) {
+        try {
+          connection.close();
+        } finally {
+          connection = null;
+        }
+      }
     }
   }
 
@@ -1354,13 +1372,6 @@ public class JdbcIO {
                 .withMaxRetries(retryConfiguration.getMaxAttempts());
       }
 
-      @StartBundle
-      public void startBundle() throws Exception {
-        connection = dataSource.getConnection();
-        connection.setAutoCommit(false);
-        preparedStatement = 
connection.prepareStatement(spec.getStatement().get());
-      }
-
       @ProcessElement
       public void processElement(ProcessContext context) throws Exception {
         T record = context.element();
@@ -1385,13 +1396,30 @@ public class JdbcIO {
       @FinishBundle
       public void finishBundle() throws Exception {
         executeBatch();
+        cleanUpStatementAndConnection();
+      }
+
+      @Override
+      protected void finalize() throws Throwable {
+        cleanUpStatementAndConnection();
+      }
+
+      private void cleanUpStatementAndConnection() throws Exception {
         try {
           if (preparedStatement != null) {
-            preparedStatement.close();
+            try {
+              preparedStatement.close();
+            } finally {
+              preparedStatement = null;
+            }
           }
         } finally {
           if (connection != null) {
-            connection.close();
+            try {
+              connection.close();
+            } finally {
+              connection = null;
+            }
           }
         }
       }
@@ -1400,6 +1428,12 @@ public class JdbcIO {
         if (records.isEmpty()) {
           return;
         }
+        // Only acquire the connection if there is something to write.
+        if (connection == null) {
+          connection = dataSource.getConnection();
+          connection.setAutoCommit(false);
+          preparedStatement = 
connection.prepareStatement(spec.getStatement().get());
+        }
         Sleeper sleeper = Sleeper.DEFAULT;
         BackOff backoff = retryBackOff.backoff();
         while (true) {
@@ -1498,7 +1532,6 @@ public class JdbcIO {
             PoolableConnectionFactory poolableConnectionFactory =
                 new PoolableConnectionFactory(connectionFactory, null);
             GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
-            poolConfig.setMaxTotal(1);
             poolConfig.setMinIdle(0);
             poolConfig.setMinEvictableIdleTimeMillis(10000);
             poolConfig.setSoftMinEvictableIdleTimeMillis(30000);

Reply via email to