[ 
https://issues.apache.org/jira/browse/BEAM-3500?focusedWorklogId=83767&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83767
 ]

ASF GitHub Bot logged work on BEAM-3500:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 23/Mar/18 19:07
            Start Date: 23/Mar/18 19:07
    Worklog Time Spent: 10m 
      Work Description: jkff closed pull request #4461: [BEAM-3500] "Attach" 
JDBC connection to the bundle and add DataSourceFactory allowing full control 
of the way the DataSource is created
URL: https://github.com/apache/beam/pull/4461
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/java/io/jdbc/pom.xml b/sdks/java/io/jdbc/pom.xml
index 9151cfb80ae..577d4f55a74 100644
--- a/sdks/java/io/jdbc/pom.xml
+++ b/sdks/java/io/jdbc/pom.xml
@@ -279,6 +279,11 @@
       <artifactId>commons-dbcp2</artifactId>
       <version>2.1.1</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-pool2</artifactId>
+      <version>2.4.2</version>
+    </dependency>
 
     <dependency>
       <groupId>joda-time</groupId>
diff --git 
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
index f7a66045886..35e4aacd5ac 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
@@ -52,6 +52,11 @@
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PDone;
 import org.apache.commons.dbcp2.BasicDataSource;
+import org.apache.commons.dbcp2.DataSourceConnectionFactory;
+import org.apache.commons.dbcp2.PoolableConnectionFactory;
+import org.apache.commons.dbcp2.PoolingDataSource;
+import org.apache.commons.pool2.impl.GenericObjectPool;
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
 import org.joda.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -235,17 +240,17 @@ public static DataSourceConfiguration create(DataSource 
dataSource) {
       checkArgument(dataSource != null, "dataSource can not be null");
       checkArgument(dataSource instanceof Serializable, "dataSource must be 
Serializable");
       return new AutoValue_JdbcIO_DataSourceConfiguration.Builder()
-          .setDataSource(dataSource)
-          .build();
+              .setDataSource(dataSource)
+              .build();
     }
 
     public static DataSourceConfiguration create(String driverClassName, 
String url) {
       checkArgument(driverClassName != null, "driverClassName can not be 
null");
       checkArgument(url != null, "url can not be null");
       return new AutoValue_JdbcIO_DataSourceConfiguration.Builder()
-          
.setDriverClassName(ValueProvider.StaticValueProvider.of(driverClassName))
-          .setUrl(ValueProvider.StaticValueProvider.of(url))
-          .build();
+              
.setDriverClassName(ValueProvider.StaticValueProvider.of(driverClassName))
+              .setUrl(ValueProvider.StaticValueProvider.of(url))
+              .build();
     }
 
     public static DataSourceConfiguration create(ValueProvider<String> 
driverClassName,
@@ -254,8 +259,7 @@ public static DataSourceConfiguration 
create(ValueProvider<String> driverClassNa
       checkArgument(url != null, "url can not be null");
       return new AutoValue_JdbcIO_DataSourceConfiguration.Builder()
               .setDriverClassName(driverClassName)
-              .setUrl(url)
-              .build();
+              .setUrl(url).build();
     }
 
     public DataSourceConfiguration withUsername(String username) {
@@ -307,9 +311,10 @@ private void populateDisplayData(DisplayData.Builder 
builder) {
       }
     }
 
-    DataSource buildDatasource() throws Exception{
+    DataSource buildDatasource() throws Exception {
+      DataSource current = null;
       if (getDataSource() != null) {
-        return getDataSource();
+        current = getDataSource();
       } else {
         BasicDataSource basicDataSource = new BasicDataSource();
         if (getDriverClassName() != null) {
@@ -327,8 +332,25 @@ DataSource buildDatasource() throws Exception{
         if (getConnectionProperties() != null && 
getConnectionProperties().get() != null) {
           
basicDataSource.setConnectionProperties(getConnectionProperties().get());
         }
-        return basicDataSource;
+        current = basicDataSource;
       }
+
+      // wrapping the datasource as a pooling datasource
+      DataSourceConnectionFactory connectionFactory = new 
DataSourceConnectionFactory(current);
+      PoolableConnectionFactory poolableConnectionFactory =
+              new PoolableConnectionFactory(connectionFactory, null);
+      GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
+      poolConfig.setMaxTotal(1);
+      poolConfig.setMinIdle(0);
+      poolConfig.setMinEvictableIdleTimeMillis(10000);
+      poolConfig.setSoftMinEvictableIdleTimeMillis(30000);
+      GenericObjectPool connectionPool =
+              new GenericObjectPool(poolableConnectionFactory, poolConfig);
+      poolableConnectionFactory.setPool(connectionPool);
+      poolableConnectionFactory.setDefaultAutoCommit(false);
+      poolableConnectionFactory.setDefaultReadOnly(false);
+      PoolingDataSource poolingDataSource = new 
PoolingDataSource(connectionPool);
+      return poolingDataSource;
     }
 
   }
@@ -364,7 +386,6 @@ DataSource buildDatasource() throws Exception{
     }
 
     public Read<T> withDataSourceConfiguration(DataSourceConfiguration 
configuration) {
-      checkArgument(configuration != null, "configuration can not be null");
       return toBuilder().setDataSourceConfiguration(configuration).build();
     }
 
@@ -398,8 +419,8 @@ DataSource buildDatasource() throws Exception{
       checkArgument(getQuery() != null, "withQuery() is required");
       checkArgument(getRowMapper() != null, "withRowMapper() is required");
       checkArgument(getCoder() != null, "withCoder() is required");
-      checkArgument(
-          getDataSourceConfiguration() != null, "withDataSourceConfiguration() 
is required");
+      checkArgument((getDataSourceConfiguration() != null),
+              "withDataSourceConfiguration() is required");
 
       return input
           .apply(Create.of((Void) null))
@@ -455,8 +476,6 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
 
     public ReadAll<ParameterT, OutputT> withDataSourceConfiguration(
             DataSourceConfiguration configuration) {
-      checkArgument(configuration != null, 
"JdbcIO.readAll().withDataSourceConfiguration"
-              + "(configuration) called with null configuration");
       return toBuilder().setDataSourceConfiguration(configuration).build();
     }
 
@@ -635,8 +654,8 @@ public void teardown() throws Exception {
 
     @Override
     public PDone expand(PCollection<T> input) {
-      checkArgument(
-          getDataSourceConfiguration() != null, "withDataSourceConfiguration() 
is required");
+      checkArgument(getDataSourceConfiguration() != null,
+              "withDataSourceConfiguration() is required");
       checkArgument(getStatement() != null, "withStatement() is required");
       checkArgument(
           getPreparedStatementSetter() != null, "withPreparedStatementSetter() 
is required");
@@ -656,6 +675,7 @@ public PDone expand(PCollection<T> input) {
 
       private DataSource dataSource;
       private Connection connection;
+      private PreparedStatement preparedStatement;
       private List<T> records = new ArrayList<>();
 
       public WriteFn(Write<T> spec) {
@@ -665,8 +685,13 @@ public WriteFn(Write<T> spec) {
       @Setup
       public void setup() throws Exception {
         dataSource = spec.getDataSourceConfiguration().buildDatasource();
+      }
+
+      @StartBundle
+      public void startBundle() throws Exception {
         connection = dataSource.getConnection();
         connection.setAutoCommit(false);
+        preparedStatement = connection.prepareStatement(spec.getStatement());
       }
 
       @ProcessElement
@@ -693,6 +718,15 @@ private void processRecord(T record, PreparedStatement 
preparedStatement) {
       @FinishBundle
       public void finishBundle() throws Exception {
         executeBatch();
+        try {
+          if (preparedStatement != null) {
+            preparedStatement.close();
+          }
+        } finally {
+          if (connection != null) {
+            connection.close();
+          }
+        }
       }
 
       private void executeBatch() throws SQLException, IOException, 
InterruptedException {
@@ -734,9 +768,6 @@ private void executeBatch() throws SQLException, 
IOException, InterruptedExcepti
 
       @Teardown
       public void teardown() throws Exception {
-        if (connection != null) {
-          connection.close();
-        }
         if (dataSource instanceof AutoCloseable) {
           ((AutoCloseable) dataSource).close();
         }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 83767)
    Time Spent: 7h 20m  (was: 7h 10m)

> JdbcIO: Improve connection management
> -------------------------------------
>
>                 Key: BEAM-3500
>                 URL: https://issues.apache.org/jira/browse/BEAM-3500
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-jdbc
>    Affects Versions: 2.2.0
>            Reporter: Pawel Bartoszek
>            Assignee: Jean-Baptiste Onofré
>            Priority: Major
>          Time Spent: 7h 20m
>  Remaining Estimate: 0h
>
> JdbcIO write DoFn acquires connection in {{@Setup}} and release it in 
> {{@Teardown}} methods, which means that connection might stay opened for days 
> in streaming job case. Keeping single connection open for so long might be 
> very risky as it's exposed to database, network etc issues.
> *Taking connection from the pool when it is actually needed*
> I suggest that connection would be taken from the connection pool in 
> {{executeBatch}} method and released when the batch is flushed. This will 
> allow the pool to take care of any returned unhealthy connections etc.
> *Make JdbcIO accept data source factory*
>  It would be nice if JdbcIO accepted DataSourceFactory rather than DataSource 
> itself. I am saying that because sink checks if DataSource implements 
> `Serializable` interface, which make it impossible to pass 
> BasicDataSource(used internally by sink) as it doesn’t implement this 
> interface. Something like:
> {code:java}
> interface DataSourceFactory extends Serializable{
>      DataSource create();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to