[
https://issues.apache.org/jira/browse/BEAM-7230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16834584#comment-16834584
]
Brachi Packter commented on BEAM-7230:
--------------------------------------
Yes, in streaming.
I found the issue, connection pool is created per instance, and if I is created
inside the DoFn, then it's created per DataFlow worker thread.
This cause to huge amount of connection pool, and allowed connections per pool
are multiplied by the running threads* running workeres.
I was able to fix it by stop using apache beam JDBCIO library. and create a
singlton DataSource per worker. and for each data source give small amount of
allowed connections in pool.
Here I create a siglton data source:
{code:java}
public class ConnectionPoolProvider {
DataSource dataSource;
private static ConnectionPoolProvider instance = null;
private final static Object lock = new Object();
public static ConnectionPoolProvider getInstance() {
if (instance == null) {
synchronized (lock) {
if (instance == null) {
instance = new ConnectionPoolProvider();
}
}
}
return instance;
}
private ConnectionPoolProvider() {
this.dataSource = getHikariDataSource();
}
private DataSource getHikariDataSource() {
HikariConfig config = new HikariConfig();
// Configure which instance and what database user to connect with.
config.setJdbcUrl(String.format("jdbc:mysql:///%s", "some-name"));
config.setUsername("some-user");
config.setPassword("some-password");
config.addDataSourceProperty("socketFactory",
"com.google.cloud.sql.mysql.SocketFactory");
config.addDataSourceProperty("cloudSqlInstance",
"staging-161313:us-central1:mysql-instance");
config.addDataSourceProperty("useSSL", "false");
// maximumPoolSize limits the total number of concurrent connections
this pool will keep.
config.setMaximumPoolSize(15);
// minimumIdle is the minimum number of idle connections Hikari
maintains in the pool.
// Additional connections will be established to meet this value unless
the pool is full.
config.setMinimumIdle(5);
// setConnectionTimeout is the maximum number of milliseconds to wait
for a connection checkout.
// Any attempt to retrieve a connection from this pool that exceeds the
set limit will throw an
// SQLException.
config.setConnectionTimeout(10000); // 10 seconds
// idleTimeout is the maximum amount of time a connection can sit in
the pool. Connections that
// sit idle for this many milliseconds are retried if minimumIdle is
exceeded.
config.setIdleTimeout(600000); // 10 minutes
// maxLifetime is the maximum possible lifetime of a connection in the
pool. Connections that
// live longer than this many milliseconds will be closed and
reestablished between uses. This
// value should be several minutes shorter than the database's timeout
value to avoid unexpected
// terminations.
config.setMaxLifetime(1800000); // 30 minutes
return new HikariDataSource(config);
}
public DataSource getDataSource() {
return dataSource;
}
}
{code}
And here my Transformer
{code:java}
@Override
public PCollection<Void> expand(PCollection<Row> input) {
return input
.apply(ParDo.of(new DoFn<ValueInSingleWindow<Row>, Void>() {
private List<ValueInSingleWindow<Row>> records = new
ArrayList<>();
private DataSource dataSource;
@Setup
public void setup(){
dataSource=ConnectionPoolProvider.getInstance().getDataSource();
}
@ProcessElement
public void process(ProcessContext context) {
records.add(context.element());
if (records.size() >= BATCH_SIZE) {
executeBatch();
}
}
@FinishBundle
public void finishBundle() throws Exception {
executeBatch();
}
private void executeBatch() {
logger.debug("execute batch size"+records.size());
if (records.isEmpty()) {
return;
}
try(Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement =
connection.prepareStatement("insert into SOME_TABLE values(??)"))
{
connection.setAutoCommit(true);
for (ValueInSingleWindow<Row> record : records) {
preparedStatement.clearParameters();
prepareStatement(record, preparedStatement);
preparedStatement.addBatch();
}
preparedStatement.executeBatch();
} catch (SQLException e) {
throw new RuntimeException("failed execute", e);
}
records.clear();
}
}));
}
{code}
> Using JdbcIO creates huge amount of connections
> -----------------------------------------------
>
> Key: BEAM-7230
> URL: https://issues.apache.org/jira/browse/BEAM-7230
> Project: Beam
> Issue Type: Bug
> Components: runner-dataflow
> Affects Versions: 2.11.0
> Reporter: Brachi Packter
> Priority: Major
>
> I want to write form DataFlow to GCP cloud SQL, I'm using connection pool,
> and still I see huge amount of connections in GCP SQL (4k while I set
> connection pool to 300), and most of them in sleep.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)