[
https://issues.apache.org/jira/browse/BEAM-3500?focusedWorklogId=83767&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83767
]
ASF GitHub Bot logged work on BEAM-3500:
----------------------------------------
Author: ASF GitHub Bot
Created on: 23/Mar/18 19:07
Start Date: 23/Mar/18 19:07
Worklog Time Spent: 10m
Work Description: jkff closed pull request #4461: [BEAM-3500] "Attach"
JDBC connection to the bundle and add DataSourceFactory allowing full control
of the way the DataSource is created
URL: https://github.com/apache/beam/pull/4461
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/sdks/java/io/jdbc/pom.xml b/sdks/java/io/jdbc/pom.xml
index 9151cfb80ae..577d4f55a74 100644
--- a/sdks/java/io/jdbc/pom.xml
+++ b/sdks/java/io/jdbc/pom.xml
@@ -279,6 +279,11 @@
<artifactId>commons-dbcp2</artifactId>
<version>2.1.1</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-pool2</artifactId>
+ <version>2.4.2</version>
+ </dependency>
<dependency>
<groupId>joda-time</groupId>
diff --git
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
index f7a66045886..35e4aacd5ac 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
@@ -52,6 +52,11 @@
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PDone;
import org.apache.commons.dbcp2.BasicDataSource;
+import org.apache.commons.dbcp2.DataSourceConnectionFactory;
+import org.apache.commons.dbcp2.PoolableConnectionFactory;
+import org.apache.commons.dbcp2.PoolingDataSource;
+import org.apache.commons.pool2.impl.GenericObjectPool;
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -235,17 +240,17 @@ public static DataSourceConfiguration create(DataSource
dataSource) {
checkArgument(dataSource != null, "dataSource can not be null");
checkArgument(dataSource instanceof Serializable, "dataSource must be
Serializable");
return new AutoValue_JdbcIO_DataSourceConfiguration.Builder()
- .setDataSource(dataSource)
- .build();
+ .setDataSource(dataSource)
+ .build();
}
public static DataSourceConfiguration create(String driverClassName,
String url) {
checkArgument(driverClassName != null, "driverClassName can not be
null");
checkArgument(url != null, "url can not be null");
return new AutoValue_JdbcIO_DataSourceConfiguration.Builder()
-
.setDriverClassName(ValueProvider.StaticValueProvider.of(driverClassName))
- .setUrl(ValueProvider.StaticValueProvider.of(url))
- .build();
+
.setDriverClassName(ValueProvider.StaticValueProvider.of(driverClassName))
+ .setUrl(ValueProvider.StaticValueProvider.of(url))
+ .build();
}
public static DataSourceConfiguration create(ValueProvider<String>
driverClassName,
@@ -254,8 +259,7 @@ public static DataSourceConfiguration
create(ValueProvider<String> driverClassNa
checkArgument(url != null, "url can not be null");
return new AutoValue_JdbcIO_DataSourceConfiguration.Builder()
.setDriverClassName(driverClassName)
- .setUrl(url)
- .build();
+ .setUrl(url).build();
}
public DataSourceConfiguration withUsername(String username) {
@@ -307,9 +311,10 @@ private void populateDisplayData(DisplayData.Builder
builder) {
}
}
- DataSource buildDatasource() throws Exception{
+ DataSource buildDatasource() throws Exception {
+ DataSource current = null;
if (getDataSource() != null) {
- return getDataSource();
+ current = getDataSource();
} else {
BasicDataSource basicDataSource = new BasicDataSource();
if (getDriverClassName() != null) {
@@ -327,8 +332,25 @@ DataSource buildDatasource() throws Exception{
if (getConnectionProperties() != null &&
getConnectionProperties().get() != null) {
basicDataSource.setConnectionProperties(getConnectionProperties().get());
}
- return basicDataSource;
+ current = basicDataSource;
}
+
+ // wrapping the datasource as a pooling datasource
+ DataSourceConnectionFactory connectionFactory = new
DataSourceConnectionFactory(current);
+ PoolableConnectionFactory poolableConnectionFactory =
+ new PoolableConnectionFactory(connectionFactory, null);
+ GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
+ poolConfig.setMaxTotal(1);
+ poolConfig.setMinIdle(0);
+ poolConfig.setMinEvictableIdleTimeMillis(10000);
+ poolConfig.setSoftMinEvictableIdleTimeMillis(30000);
+ GenericObjectPool connectionPool =
+ new GenericObjectPool(poolableConnectionFactory, poolConfig);
+ poolableConnectionFactory.setPool(connectionPool);
+ poolableConnectionFactory.setDefaultAutoCommit(false);
+ poolableConnectionFactory.setDefaultReadOnly(false);
+ PoolingDataSource poolingDataSource = new
PoolingDataSource(connectionPool);
+ return poolingDataSource;
}
}
@@ -364,7 +386,6 @@ DataSource buildDatasource() throws Exception{
}
public Read<T> withDataSourceConfiguration(DataSourceConfiguration
configuration) {
- checkArgument(configuration != null, "configuration can not be null");
return toBuilder().setDataSourceConfiguration(configuration).build();
}
@@ -398,8 +419,8 @@ DataSource buildDatasource() throws Exception{
checkArgument(getQuery() != null, "withQuery() is required");
checkArgument(getRowMapper() != null, "withRowMapper() is required");
checkArgument(getCoder() != null, "withCoder() is required");
- checkArgument(
- getDataSourceConfiguration() != null, "withDataSourceConfiguration()
is required");
+ checkArgument((getDataSourceConfiguration() != null),
+ "withDataSourceConfiguration() is required");
return input
.apply(Create.of((Void) null))
@@ -455,8 +476,6 @@ public void populateDisplayData(DisplayData.Builder
builder) {
public ReadAll<ParameterT, OutputT> withDataSourceConfiguration(
DataSourceConfiguration configuration) {
- checkArgument(configuration != null,
"JdbcIO.readAll().withDataSourceConfiguration"
- + "(configuration) called with null configuration");
return toBuilder().setDataSourceConfiguration(configuration).build();
}
@@ -635,8 +654,8 @@ public void teardown() throws Exception {
@Override
public PDone expand(PCollection<T> input) {
- checkArgument(
- getDataSourceConfiguration() != null, "withDataSourceConfiguration()
is required");
+ checkArgument(getDataSourceConfiguration() != null,
+ "withDataSourceConfiguration() is required");
checkArgument(getStatement() != null, "withStatement() is required");
checkArgument(
getPreparedStatementSetter() != null, "withPreparedStatementSetter()
is required");
@@ -656,6 +675,7 @@ public PDone expand(PCollection<T> input) {
private DataSource dataSource;
private Connection connection;
+ private PreparedStatement preparedStatement;
private List<T> records = new ArrayList<>();
public WriteFn(Write<T> spec) {
@@ -665,8 +685,13 @@ public WriteFn(Write<T> spec) {
@Setup
public void setup() throws Exception {
dataSource = spec.getDataSourceConfiguration().buildDatasource();
+ }
+
+ @StartBundle
+ public void startBundle() throws Exception {
connection = dataSource.getConnection();
connection.setAutoCommit(false);
+ preparedStatement = connection.prepareStatement(spec.getStatement());
}
@ProcessElement
@@ -693,6 +718,15 @@ private void processRecord(T record, PreparedStatement
preparedStatement) {
@FinishBundle
public void finishBundle() throws Exception {
executeBatch();
+ try {
+ if (preparedStatement != null) {
+ preparedStatement.close();
+ }
+ } finally {
+ if (connection != null) {
+ connection.close();
+ }
+ }
}
private void executeBatch() throws SQLException, IOException,
InterruptedException {
@@ -734,9 +768,6 @@ private void executeBatch() throws SQLException,
IOException, InterruptedExcepti
@Teardown
public void teardown() throws Exception {
- if (connection != null) {
- connection.close();
- }
if (dataSource instanceof AutoCloseable) {
((AutoCloseable) dataSource).close();
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 83767)
Time Spent: 7h 20m (was: 7h 10m)
> JdbcIO: Improve connection management
> -------------------------------------
>
> Key: BEAM-3500
> URL: https://issues.apache.org/jira/browse/BEAM-3500
> Project: Beam
> Issue Type: Improvement
> Components: io-java-jdbc
> Affects Versions: 2.2.0
> Reporter: Pawel Bartoszek
> Assignee: Jean-Baptiste Onofré
> Priority: Major
> Time Spent: 7h 20m
> Remaining Estimate: 0h
>
> JdbcIO write DoFn acquires connection in {{@Setup}} and release it in
> {{@Teardown}} methods, which means that connection might stay opened for days
> in streaming job case. Keeping single connection open for so long might be
> very risky as it's exposed to database, network etc issues.
> *Taking connection from the pool when it is actually needed*
> I suggest that connection would be taken from the connection pool in
> {{executeBatch}} method and released when the batch is flushed. This will
> allow the pool to take care of any returned unhealthy connections etc.
> *Make JdbcIO accept data source factory*
> It would be nice if JdbcIO accepted DataSourceFactory rather than DataSource
> itself. I am saying that because sink checks if DataSource implements
> `Serializable` interface, which make it impossible to pass
> BasicDataSource(used internally by sink) as it doesn’t implement this
> interface. Something like:
> {code:java}
> interface DataSourceFactory extends Serializable{
> DataSource create();
> }
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)