[ 
https://issues.apache.org/jira/browse/FLINK-39143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18060603#comment-18060603
 ] 

Thorne commented on FLINK-39143:
--------------------------------

As shown in the code, this part will create a new connection during the process 
of handling the ddl of the relevant flux connector. If there are a large number 
of tables that need to be initialized, this is not a friendly way, but to use 
connection pooling for replacement.
{code:java}
//代码占位符
private void applyCreateTable(CreateTableEvent event) {
    try (Connection connection = 
ConnectionFactory.createConnection(flussClientConfig);
            Admin admin = connection.getAdmin()) {
        TableId tableId = event.tableId();
        TablePath tablePath = new TablePath(tableId.getSchemaName(), 
tableId.getTableName());
        String tableIdentifier = tablePath.getDatabaseName() + "." + 
tablePath.getTableName();
        List<String> bucketKeys = bucketKeysMap.get(tableIdentifier);
        Integer bucketNum = bucketNumMap.get(tableIdentifier);
        TableDescriptor inferredFlussTable =
                toFlussTable(event.getSchema(), bucketKeys, bucketNum, 
tableProperties);
        admin.createDatabase(tablePath.getDatabaseName(), 
DatabaseDescriptor.EMPTY, true);
        if (!admin.tableExists(tablePath).get()) {
            admin.createTable(tablePath, inferredFlussTable, false).get();
        } else {
            TableInfo currentTableInfo = admin.getTableInfo(tablePath).get();
            // sanity check to prevent unexpected table schema evolution.
            sanityCheck(inferredFlussTable, currentTableInfo);
        }
    } catch (Exception e) {
        LOG.error("Failed to apply schema change {}", event, e);
        throw new RuntimeException(e);
    }
}

private void applyDropTable(DropTableEvent event) {
    try (Connection connection = 
ConnectionFactory.createConnection(flussClientConfig);
            Admin admin = connection.getAdmin()) {
        TableId tableId = event.tableId();
        TablePath tablePath = new TablePath(tableId.getSchemaName(), 
tableId.getTableName());
        admin.dropTable(tablePath, true).get();
    } catch (Exception e) {
        LOG.error("Failed to apply schema change {}", event, e);
        throw new RuntimeException(e);
    }
}  {code}
 

 

 

> fluss‘s connection use connection pool be better
> ------------------------------------------------
>
>                 Key: FLINK-39143
>                 URL: https://issues.apache.org/jira/browse/FLINK-39143
>             Project: Flink
>          Issue Type: Improvement
>          Components: Flink CDC
>    Affects Versions: cdc-3.5.0
>            Reporter: Thorne
>            Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to