[ 
https://issues.apache.org/jira/browse/BEAM-7230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16939262#comment-16939262
 ] 

Anton Bankovskii edited comment on BEAM-7230 at 9/27/19 11:28 AM:
------------------------------------------------------------------

Unfortunately I came across the same behavior while executing the pipeline 
using the DataflowRunner.

Exception in Dataflow console:
{noformat}
java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException:  Caused 
by: 
[SKIPPED] 14 more Caused by: java.lang.NullPointerException at 
org.apache.beam.sdk.io.jdbc.JdbcIO$PoolableDataSourceProvider.buildDataSource(JdbcIO.java:1394)
 at 
org.apache.beam.sdk.io.jdbc.JdbcIO$PoolableDataSourceProvider.apply(JdbcIO.java:1389)
 at 
org.apache.beam.sdk.io.jdbc.JdbcIO$PoolableDataSourceProvider.apply(JdbcIO.java:1369)
 at org.apache.beam.sdk.io.jdbc.JdbcIO$ReadFn.setup(JdbcIO.java:862){noformat}
Testing locally with DirectRunner gives no error.

 

Here's my code:
{code:java}
public class ReadFromJdbcFn extends PTransform<PBegin, PCollection<String>> {

    private JdbcToCsvOptions options;

    public ReadFromJdbcFn(JdbcToCsvOptions options) {
        this.options = options;
    }

    @Override
    public PCollection<String> expand(PBegin input) {

        final JdbcIO.DataSourceConfiguration dataSourceConfiguration = 
configure(options.getConnectionString(),
                options.getDriverName(), options.getUser(),
                options.getPassword(), options.getConnectionProperties());

        final SerializableFunction<Void, DataSource> dataSourceProviderFunction 
=
                JdbcIO.PoolableDataSourceProvider.of(dataSourceConfiguration);

        return input.apply(
                JdbcIO.<String>read()
                        .withDataSourceProviderFn(dataSourceProviderFunction)
                        .withCoder(StringUtf8Coder.of())
                        .withFetchSize(options.getFetchSize())
                        .withQuery(options.getQuery())
                        .withRowMapper(new JdbcToCsvRowMapper()));
    }

    private static JdbcIO.DataSourceConfiguration configure(String connString, 
String driver, String user, String password,
                                                            String 
connProperties) {

        final JdbcIO.DataSourceConfiguration dataSourceConfiguration = 
JdbcIO.DataSourceConfiguration.create(driver, connString)
                .withUsername(user)
                .withPassword(password);

        return connProperties == null ?
                dataSourceConfiguration : 
dataSourceConfiguration.withConnectionProperties(connProperties);
    }
}

{code}
Also, I wonder will the Dataflow autoscale it's worker's count based on the 
amount of data read from database?


was (Author: stabmeqt):
Unfortunately I came across the same behavior while executing the pipeline 
using the DataflowRunner.

Exception in Dataflow console:
{noformat}
java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException:  Caused 
by: 
[SKIPPED] 14 more Caused by: java.lang.NullPointerException at 
org.apache.beam.sdk.io.jdbc.JdbcIO$PoolableDataSourceProvider.buildDataSource(JdbcIO.java:1394)
 at 
org.apache.beam.sdk.io.jdbc.JdbcIO$PoolableDataSourceProvider.apply(JdbcIO.java:1389)
 at 
org.apache.beam.sdk.io.jdbc.JdbcIO$PoolableDataSourceProvider.apply(JdbcIO.java:1369)
 at org.apache.beam.sdk.io.jdbc.JdbcIO$ReadFn.setup(JdbcIO.java:862){noformat}
Testing locally with DirectRunner gives no error.

 

Here's my code:
{code:java}
public class ReadFromJdbcFn extends PTransform<PBegin, PCollection<String>> {

    private JdbcToCsvOptions options;

    public ReadFromJdbcFn(JdbcToCsvOptions options) {
        this.options = options;
    }

    @Override
    public PCollection<String> expand(PBegin input) {

        final JdbcIO.DataSourceConfiguration dataSourceConfiguration = 
configure(options.getConnectionString(),
                options.getDriverName(), options.getUser(),
                options.getPassword(), options.getConnectionProperties());

        final SerializableFunction<Void, DataSource> dataSourceProviderFunction 
=
                JdbcIO.PoolableDataSourceProvider.of(dataSourceConfiguration);

        return input.apply(
                JdbcIO.<String>read()
                        .withDataSourceProviderFn(dataSourceProviderFunction)
                        .withCoder(StringUtf8Coder.of())
                        .withFetchSize(options.getFetchSize())
                        .withQuery(options.getQuery())
                        .withRowMapper(new JdbcToCsvRowMapper()));
    }

    private static JdbcIO.DataSourceConfiguration configure(String connString, 
String driver, String user, String password,
                                                            String 
connProperties) {

        final JdbcIO.DataSourceConfiguration dataSourceConfiguration = 
JdbcIO.DataSourceConfiguration.create(driver, connString)
                .withUsername(user)
                .withPassword(password);

        return connProperties == null ?
                dataSourceConfiguration : 
dataSourceConfiguration.withConnectionProperties(connProperties);
    }
}

{code}
Also, I wonder will the Dataflow autoscale it's worker based on the amount of 
data read from database?

> Using JdbcIO creates huge amount of connections
> -----------------------------------------------
>
>                 Key: BEAM-7230
>                 URL: https://issues.apache.org/jira/browse/BEAM-7230
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-dataflow
>    Affects Versions: 2.11.0
>            Reporter: Brachi Packter
>            Assignee: Ismaël Mejía
>            Priority: Major
>             Fix For: 2.13.0
>
>          Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> I want to write form DataFlow to GCP cloud SQL, I'm using connection pool, 
> and still I see huge amount of connections in GCP SQL (4k while I set 
> connection pool to 300), and most of them in sleep.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to