This is an automated email from the ASF dual-hosted git repository. lcwik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 47a6114 [BEAM-9629] Fix several connection leak issues. (#12209) 47a6114 is described below commit 47a611423053dc50bd55a156a5f58c040c9ab1cd Author: Lukasz Cwik <lukec...@gmail.com> AuthorDate: Fri Jul 10 10:44:13 2020 -0700 [BEAM-9629] Fix several connection leak issues. (#12209) * Make the default max connections to be unlimited. This mirrors the 2.17 and earlier behavior since we used to create one DataSource per execution thread (and hence one connection). * Move connection fetching to @ProcessElement to not hold a connection for empty bundles. * Make sure that finalize() closes the connection so that it is returned to the pool if the bundle fails. --- .../java/org/apache/beam/sdk/io/jdbc/JdbcIO.java | 69 ++++++++++++++++------ 1 file changed, 51 insertions(+), 18 deletions(-) diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java index f52ff40..d03925e 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java @@ -147,10 +147,10 @@ import org.slf4j.LoggerFactory; * ); * }</pre> * - * By default, the provided function instantiates a DataSource per execution thread. In some - * circumstances, such as DataSources that have a pool of connections, this can quickly overwhelm - * the database by requesting too many connections. In that case you should make the DataSource a - * static singleton so it gets instantiated only once per JVM. + * <p>By default, the provided function requests a DataSource per execution thread. In some + * circumstances this can quickly overwhelm the database by requesting too many connections. In that + * case you should look into sharing a single instance of a {@link PoolingDataSource} across all the + * execution threads. * * <h3>Writing to JDBC datasource</h3> * @@ -883,11 +883,14 @@ public class JdbcIO { @Setup public void setup() throws Exception { dataSource = dataSourceProviderFn.apply(null); - connection = dataSource.getConnection(); } @ProcessElement public void processElement(ProcessContext context) throws Exception { + // Only acquire the connection if we need to perform a read. + if (connection == null) { + connection = dataSource.getConnection(); + } try (PreparedStatement statement = connection.prepareStatement( query.get(), ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) { @@ -901,9 +904,24 @@ public class JdbcIO { } } - @Teardown - public void teardown() throws Exception { - connection.close(); + @FinishBundle + public void finishBundle() throws Exception { + cleanUpConnection(); + } + + @Override + protected void finalize() throws Throwable { + cleanUpConnection(); + } + + private void cleanUpConnection() throws Exception { + if (connection != null) { + try { + connection.close(); + } finally { + connection = null; + } + } } } @@ -1354,13 +1372,6 @@ public class JdbcIO { .withMaxRetries(retryConfiguration.getMaxAttempts()); } - @StartBundle - public void startBundle() throws Exception { - connection = dataSource.getConnection(); - connection.setAutoCommit(false); - preparedStatement = connection.prepareStatement(spec.getStatement().get()); - } - @ProcessElement public void processElement(ProcessContext context) throws Exception { T record = context.element(); @@ -1385,13 +1396,30 @@ public class JdbcIO { @FinishBundle public void finishBundle() throws Exception { executeBatch(); + cleanUpStatementAndConnection(); + } + + @Override + protected void finalize() throws Throwable { + cleanUpStatementAndConnection(); + } + + private void cleanUpStatementAndConnection() throws Exception { try { if (preparedStatement != null) { - preparedStatement.close(); + try { + preparedStatement.close(); + } finally { + preparedStatement = null; + } } } finally { if (connection != null) { - connection.close(); + try { + connection.close(); + } finally { + connection = null; + } } } } @@ -1400,6 +1428,12 @@ public class JdbcIO { if (records.isEmpty()) { return; } + // Only acquire the connection if there is something to write. + if (connection == null) { + connection = dataSource.getConnection(); + connection.setAutoCommit(false); + preparedStatement = connection.prepareStatement(spec.getStatement().get()); + } Sleeper sleeper = Sleeper.DEFAULT; BackOff backoff = retryBackOff.backoff(); while (true) { @@ -1498,7 +1532,6 @@ public class JdbcIO { PoolableConnectionFactory poolableConnectionFactory = new PoolableConnectionFactory(connectionFactory, null); GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig(); - poolConfig.setMaxTotal(1); poolConfig.setMinIdle(0); poolConfig.setMinEvictableIdleTimeMillis(10000); poolConfig.setSoftMinEvictableIdleTimeMillis(30000);