This is an automated email from the ASF dual-hosted git repository.
aromanenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new bc9aa73 [BEAM-7041] Let the user control if he wants to wrap the
provided DataSource as a poolable one or not
new 702df1b Merge pull request #8257: [BEAM-7041] Let the user control if
he wants to wrap the provided DataSource as a poolable one or not
bc9aa73 is described below
commit bc9aa730009909d9c632fce669bff5ce25d9d81a
Author: Jean-Baptiste Onofré <[email protected]>
AuthorDate: Tue Apr 9 17:15:21 2019 +0200
[BEAM-7041] Let the user control if he wants to wrap the provided
DataSource as a poolable one or not
---
.../java/org/apache/beam/sdk/io/jdbc/JdbcIO.java | 45 ++++++++++++++--------
.../org/apache/beam/sdk/io/jdbc/JdbcIOTest.java | 14 +++++++
2 files changed, 44 insertions(+), 15 deletions(-)
diff --git
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
index e6f2699..8c824a8 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
@@ -242,6 +242,8 @@ public class JdbcIO {
@Nullable
abstract DataSource getDataSource();
+ abstract boolean isPoolingDataSource();
+
abstract Builder builder();
@AutoValue.Builder
@@ -258,14 +260,22 @@ public class JdbcIO {
abstract Builder setDataSource(DataSource dataSource);
+ abstract Builder setPoolingDataSource(boolean poolingDataSource);
+
abstract DataSourceConfiguration build();
}
public static DataSourceConfiguration create(DataSource dataSource) {
+ return create(dataSource, true);
+ }
+
+ public static DataSourceConfiguration create(
+ DataSource dataSource, boolean isPoolingDataSource) {
checkArgument(dataSource != null, "dataSource can not be null");
checkArgument(dataSource instanceof Serializable, "dataSource must be
Serializable");
return new AutoValue_JdbcIO_DataSourceConfiguration.Builder()
.setDataSource(dataSource)
+ .setPoolingDataSource(isPoolingDataSource)
.build();
}
@@ -284,6 +294,7 @@ public class JdbcIO {
return new AutoValue_JdbcIO_DataSourceConfiguration.Builder()
.setDriverClassName(driverClassName)
.setUrl(url)
+ .setPoolingDataSource(true)
.build();
}
@@ -356,21 +367,25 @@ public class JdbcIO {
current = basicDataSource;
}
- // wrapping the datasource as a pooling datasource
- DataSourceConnectionFactory connectionFactory = new
DataSourceConnectionFactory(current);
- PoolableConnectionFactory poolableConnectionFactory =
- new PoolableConnectionFactory(connectionFactory, null);
- GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
- poolConfig.setMaxTotal(1);
- poolConfig.setMinIdle(0);
- poolConfig.setMinEvictableIdleTimeMillis(10000);
- poolConfig.setSoftMinEvictableIdleTimeMillis(30000);
- GenericObjectPool connectionPool =
- new GenericObjectPool(poolableConnectionFactory, poolConfig);
- poolableConnectionFactory.setPool(connectionPool);
- poolableConnectionFactory.setDefaultAutoCommit(false);
- poolableConnectionFactory.setDefaultReadOnly(false);
- return new PoolingDataSource(connectionPool);
+ if (isPoolingDataSource()) {
+ // wrapping the datasource as a pooling datasource
+ DataSourceConnectionFactory connectionFactory = new
DataSourceConnectionFactory(current);
+ PoolableConnectionFactory poolableConnectionFactory =
+ new PoolableConnectionFactory(connectionFactory, null);
+ GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
+ poolConfig.setMaxTotal(1);
+ poolConfig.setMinIdle(0);
+ poolConfig.setMinEvictableIdleTimeMillis(10000);
+ poolConfig.setSoftMinEvictableIdleTimeMillis(30000);
+ GenericObjectPool connectionPool =
+ new GenericObjectPool(poolableConnectionFactory, poolConfig);
+ poolableConnectionFactory.setPool(connectionPool);
+ poolableConnectionFactory.setDefaultAutoCommit(false);
+ poolableConnectionFactory.setDefaultReadOnly(false);
+ return new PoolingDataSource(connectionPool);
+ } else {
+ return current;
+ }
}
}
diff --git
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
index 0e9127a..3e45363 100644
---
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
+++
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
@@ -45,6 +45,7 @@ import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.commons.dbcp2.PoolingDataSource;
import org.apache.derby.drda.NetworkServerControl;
import org.apache.derby.jdbc.ClientDataSource;
import org.junit.After;
@@ -143,6 +144,19 @@ public class JdbcIOTest implements Serializable {
}
@Test
+ public void testDataSourceConfigurationDataSourceWithoutPool() throws
Exception {
+ JdbcIO.DataSourceConfiguration config =
+ JdbcIO.DataSourceConfiguration.create(dataSource, false);
+ assertTrue(config.buildDatasource() instanceof ClientDataSource);
+ }
+
+ @Test
+ public void testDataSourceConfigurationDataSourceWithPool() throws Exception
{
+ JdbcIO.DataSourceConfiguration config =
JdbcIO.DataSourceConfiguration.create(dataSource, true);
+ assertTrue(config.buildDatasource() instanceof PoolingDataSource);
+ }
+
+ @Test
public void testDataSourceConfigurationDriverAndUrl() throws Exception {
JdbcIO.DataSourceConfiguration config =
JdbcIO.DataSourceConfiguration.create(