[ https://issues.apache.org/jira/browse/BEAM-7230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16939262#comment-16939262 ]
Anton Bankovskii edited comment on BEAM-7230 at 9/27/19 11:28 AM: ------------------------------------------------------------------ Unfortunately I came across the same behavior while executing the pipeline using the DataflowRunner. Exception in Dataflow console: {noformat} java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: Caused by: [SKIPPED] 14 more Caused by: java.lang.NullPointerException at org.apache.beam.sdk.io.jdbc.JdbcIO$PoolableDataSourceProvider.buildDataSource(JdbcIO.java:1394) at org.apache.beam.sdk.io.jdbc.JdbcIO$PoolableDataSourceProvider.apply(JdbcIO.java:1389) at org.apache.beam.sdk.io.jdbc.JdbcIO$PoolableDataSourceProvider.apply(JdbcIO.java:1369) at org.apache.beam.sdk.io.jdbc.JdbcIO$ReadFn.setup(JdbcIO.java:862){noformat} Testing locally with DirectRunner gives no error. Here's my code: {code:java} public class ReadFromJdbcFn extends PTransform<PBegin, PCollection<String>> { private JdbcToCsvOptions options; public ReadFromJdbcFn(JdbcToCsvOptions options) { this.options = options; } @Override public PCollection<String> expand(PBegin input) { final JdbcIO.DataSourceConfiguration dataSourceConfiguration = configure(options.getConnectionString(), options.getDriverName(), options.getUser(), options.getPassword(), options.getConnectionProperties()); final SerializableFunction<Void, DataSource> dataSourceProviderFunction = JdbcIO.PoolableDataSourceProvider.of(dataSourceConfiguration); return input.apply( JdbcIO.<String>read() .withDataSourceProviderFn(dataSourceProviderFunction) .withCoder(StringUtf8Coder.of()) .withFetchSize(options.getFetchSize()) .withQuery(options.getQuery()) .withRowMapper(new JdbcToCsvRowMapper())); } private static JdbcIO.DataSourceConfiguration configure(String connString, String driver, String user, String password, String connProperties) { final JdbcIO.DataSourceConfiguration dataSourceConfiguration = JdbcIO.DataSourceConfiguration.create(driver, connString) .withUsername(user) .withPassword(password); return connProperties == null ? dataSourceConfiguration : dataSourceConfiguration.withConnectionProperties(connProperties); } } {code} Also, I wonder will the Dataflow autoscale it's worker's count based on the amount of data read from database? was (Author: stabmeqt): Unfortunately I came across the same behavior while executing the pipeline using the DataflowRunner. Exception in Dataflow console: {noformat} java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: Caused by: [SKIPPED] 14 more Caused by: java.lang.NullPointerException at org.apache.beam.sdk.io.jdbc.JdbcIO$PoolableDataSourceProvider.buildDataSource(JdbcIO.java:1394) at org.apache.beam.sdk.io.jdbc.JdbcIO$PoolableDataSourceProvider.apply(JdbcIO.java:1389) at org.apache.beam.sdk.io.jdbc.JdbcIO$PoolableDataSourceProvider.apply(JdbcIO.java:1369) at org.apache.beam.sdk.io.jdbc.JdbcIO$ReadFn.setup(JdbcIO.java:862){noformat} Testing locally with DirectRunner gives no error. Here's my code: {code:java} public class ReadFromJdbcFn extends PTransform<PBegin, PCollection<String>> { private JdbcToCsvOptions options; public ReadFromJdbcFn(JdbcToCsvOptions options) { this.options = options; } @Override public PCollection<String> expand(PBegin input) { final JdbcIO.DataSourceConfiguration dataSourceConfiguration = configure(options.getConnectionString(), options.getDriverName(), options.getUser(), options.getPassword(), options.getConnectionProperties()); final SerializableFunction<Void, DataSource> dataSourceProviderFunction = JdbcIO.PoolableDataSourceProvider.of(dataSourceConfiguration); return input.apply( JdbcIO.<String>read() .withDataSourceProviderFn(dataSourceProviderFunction) .withCoder(StringUtf8Coder.of()) .withFetchSize(options.getFetchSize()) .withQuery(options.getQuery()) .withRowMapper(new JdbcToCsvRowMapper())); } private static JdbcIO.DataSourceConfiguration configure(String connString, String driver, String user, String password, String connProperties) { final JdbcIO.DataSourceConfiguration dataSourceConfiguration = JdbcIO.DataSourceConfiguration.create(driver, connString) .withUsername(user) .withPassword(password); return connProperties == null ? dataSourceConfiguration : dataSourceConfiguration.withConnectionProperties(connProperties); } } {code} Also, I wonder will the Dataflow autoscale it's worker based on the amount of data read from database? > Using JdbcIO creates huge amount of connections > ----------------------------------------------- > > Key: BEAM-7230 > URL: https://issues.apache.org/jira/browse/BEAM-7230 > Project: Beam > Issue Type: Bug > Components: runner-dataflow > Affects Versions: 2.11.0 > Reporter: Brachi Packter > Assignee: Ismaël Mejía > Priority: Major > Fix For: 2.13.0 > > Time Spent: 2h 50m > Remaining Estimate: 0h > > I want to write form DataFlow to GCP cloud SQL, I'm using connection pool, > and still I see huge amount of connections in GCP SQL (4k while I set > connection pool to 300), and most of them in sleep. -- This message was sent by Atlassian Jira (v8.3.4#803005)