This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 2a23f7110 [FLINK-39143][pipeline-connecotr][Fluss]Optimize the
connection of fluss when use FlussMetaDataApplier (#4282)
2a23f7110 is described below
commit 2a23f71100a42e8f0a11e3df5020ae2412555de8
Author: Thorne <[email protected]>
AuthorDate: Wed Feb 25 10:17:45 2026 +0800
[FLINK-39143][pipeline-connecotr][Fluss]Optimize the connection of fluss
when use FlussMetaDataApplier (#4282)
Co-authored-by: Thorne <[email protected]>
---
.../fluss/sink/FlussMetaDataApplier.java | 36 +++++++++++++++++-----
1 file changed, 28 insertions(+), 8 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetaDataApplier.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetaDataApplier.java
index 5ce51447e..2e13b70d9 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetaDataApplier.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetaDataApplier.java
@@ -58,6 +58,9 @@ public class FlussMetaDataApplier implements MetadataApplier {
private Set<SchemaChangeEventType> enabledEventTypes =
new HashSet<>(Arrays.asList(CREATE_TABLE, DROP_TABLE));
+ private transient Connection connection;
+ private transient Admin admin;
+
public FlussMetaDataApplier(
Configuration flussClientConfig,
Map<String, String> tableProperties,
@@ -89,12 +92,13 @@ public class FlussMetaDataApplier implements
MetadataApplier {
@Override
public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) {
LOG.info("fluss metadata applier receive schemaChangeEvent {}",
schemaChangeEvent);
+ Admin admin = getAdmin();
if (schemaChangeEvent instanceof CreateTableEvent) {
CreateTableEvent createTableEvent = (CreateTableEvent)
schemaChangeEvent;
- applyCreateTable(createTableEvent);
+ applyCreateTable(admin, createTableEvent);
} else if (schemaChangeEvent instanceof DropTableEvent) {
DropTableEvent dropTableEvent = (DropTableEvent) schemaChangeEvent;
- applyDropTable(dropTableEvent);
+ applyDropTable(admin, dropTableEvent);
} else {
throw new IllegalArgumentException(
"fluss metadata applier only support CreateTableEvent now
but receives "
@@ -102,9 +106,8 @@ public class FlussMetaDataApplier implements
MetadataApplier {
}
}
- private void applyCreateTable(CreateTableEvent event) {
- try (Connection connection =
ConnectionFactory.createConnection(flussClientConfig);
- Admin admin = connection.getAdmin()) {
+ private void applyCreateTable(Admin admin, CreateTableEvent event) {
+ try {
TableId tableId = event.tableId();
TablePath tablePath = new TablePath(tableId.getSchemaName(),
tableId.getTableName());
String tableIdentifier = tablePath.getDatabaseName() + "." +
tablePath.getTableName();
@@ -126,9 +129,8 @@ public class FlussMetaDataApplier implements
MetadataApplier {
}
}
- private void applyDropTable(DropTableEvent event) {
- try (Connection connection =
ConnectionFactory.createConnection(flussClientConfig);
- Admin admin = connection.getAdmin()) {
+ private void applyDropTable(Admin admin, DropTableEvent event) {
+ try {
TableId tableId = event.tableId();
TablePath tablePath = new TablePath(tableId.getSchemaName(),
tableId.getTableName());
admin.dropTable(tablePath, true).get();
@@ -138,6 +140,24 @@ public class FlussMetaDataApplier implements
MetadataApplier {
}
}
+ private Admin getAdmin() {
+ if (connection == null) {
+ connection = ConnectionFactory.createConnection(flussClientConfig);
+ admin = connection.getAdmin();
+ }
+ return admin;
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (admin != null) {
+ admin.close();
+ }
+ if (connection != null) {
+ connection.close();
+ }
+ }
+
private void sanityCheck(TableDescriptor inferredFlussTable, TableInfo
currentTableInfo) {
List<String> inferredPrimaryKeyColumnNames =
inferredFlussTable.getSchema().getPrimaryKeyColumnNames().stream()