[ 
https://issues.apache.org/jira/browse/BEAM-7230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16834584#comment-16834584
 ] 

Brachi Packter commented on BEAM-7230:
--------------------------------------

Yes, in streaming.

I found the issue, connection pool is created per instance, and if I is created 
inside the DoFn, then it's created per DataFlow worker thread. 
This cause to huge amount of connection pool, and allowed connections per pool 
are multiplied by the running threads* running workeres.

 

I was able to fix it by stop using apache beam JDBCIO library. and create a 
singlton DataSource per worker. and for each data source give small amount of 
allowed connections in pool.

 

Here I create a siglton data source:
{code:java}
public class ConnectionPoolProvider {

    DataSource dataSource;
    private static ConnectionPoolProvider instance = null;
    private final static Object lock = new Object();

    public static ConnectionPoolProvider getInstance() {
        if (instance == null) {
            synchronized (lock) {
                if (instance == null) {
                    instance = new ConnectionPoolProvider();
                }
            }
        }
        return instance;
    }

    private ConnectionPoolProvider() {
        this.dataSource = getHikariDataSource();
    }


    private DataSource getHikariDataSource() {
        HikariConfig config = new HikariConfig();
        // Configure which instance and what database user to connect with.
        config.setJdbcUrl(String.format("jdbc:mysql:///%s", "some-name"));
        config.setUsername("some-user");
        config.setPassword("some-password");

        config.addDataSourceProperty("socketFactory", 
"com.google.cloud.sql.mysql.SocketFactory");
        config.addDataSourceProperty("cloudSqlInstance", 
"staging-161313:us-central1:mysql-instance");
        config.addDataSourceProperty("useSSL", "false");
        // maximumPoolSize limits the total number of concurrent connections 
this pool will keep.
        config.setMaximumPoolSize(15);
        // minimumIdle is the minimum number of idle connections Hikari 
maintains in the pool.
        // Additional connections will be established to meet this value unless 
the pool is full.
        config.setMinimumIdle(5);
        // setConnectionTimeout is the maximum number of milliseconds to wait 
for a connection checkout.
        // Any attempt to retrieve a connection from this pool that exceeds the 
set limit will throw an
        // SQLException.
        config.setConnectionTimeout(10000); // 10 seconds
        // idleTimeout is the maximum amount of time a connection can sit in 
the pool. Connections that
        // sit idle for this many milliseconds are retried if minimumIdle is 
exceeded.
        config.setIdleTimeout(600000); //  10 minutes
        // maxLifetime is the maximum possible lifetime of a connection in the 
pool. Connections that
        // live longer than this many milliseconds will be closed and 
reestablished between uses. This
        // value should be several minutes shorter than the database's timeout 
value to avoid unexpected
        // terminations.
        config.setMaxLifetime(1800000); // 30 minutes


        return new HikariDataSource(config);
    }

    public DataSource getDataSource() {
        return dataSource;
    }

}
{code}
And here my Transformer
{code:java}
@Override
    public  PCollection<Void> expand(PCollection<Row> input) {


          return input
                .apply(ParDo.of(new DoFn<ValueInSingleWindow<Row>, Void>() {
                    private List<ValueInSingleWindow<Row>> records = new 
ArrayList<>();
                    private DataSource dataSource;

                    @Setup
                    public void setup(){
                        
dataSource=ConnectionPoolProvider.getInstance().getDataSource();
                    }


                    @ProcessElement
                    public void process(ProcessContext context) {

                        records.add(context.element());
                        if (records.size() >= BATCH_SIZE) {
                            executeBatch();
                        }
                    }

                    @FinishBundle
                    public void finishBundle() throws Exception {
                        executeBatch();

                    }

                    private void executeBatch()  {
                        logger.debug("execute batch size"+records.size());
                        if (records.isEmpty()) {
                            return;
                        }
                        try(Connection connection = dataSource.getConnection();
                            PreparedStatement preparedStatement = 
connection.prepareStatement("insert into SOME_TABLE values(??)"))
                        {
                            connection.setAutoCommit(true);
                            for (ValueInSingleWindow<Row> record : records) {
                                preparedStatement.clearParameters();
                                prepareStatement(record, preparedStatement);
                                preparedStatement.addBatch();
                            }
                            preparedStatement.executeBatch();


                        } catch (SQLException e) {
                            throw new RuntimeException("failed execute", e);
                        }
                        records.clear();
                    }
                }));

    }
{code}
 

> Using JdbcIO creates huge amount of connections
> -----------------------------------------------
>
>                 Key: BEAM-7230
>                 URL: https://issues.apache.org/jira/browse/BEAM-7230
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-dataflow
>    Affects Versions: 2.11.0
>            Reporter: Brachi Packter
>            Priority: Major
>
> I want to write form DataFlow to GCP cloud SQL, I'm using connection pool, 
> and still I see huge amount of connections in GCP SQL (4k while I set 
> connection pool to 300), and most of them in sleep.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to