This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a23f7110 [FLINK-39143][pipeline-connecotr][Fluss]Optimize the 
connection of fluss when use FlussMetaDataApplier (#4282)
2a23f7110 is described below

commit 2a23f71100a42e8f0a11e3df5020ae2412555de8
Author: Thorne <[email protected]>
AuthorDate: Wed Feb 25 10:17:45 2026 +0800

    [FLINK-39143][pipeline-connecotr][Fluss]Optimize the connection of fluss 
when use FlussMetaDataApplier (#4282)
    
    Co-authored-by: Thorne <[email protected]>
---
 .../fluss/sink/FlussMetaDataApplier.java           | 36 +++++++++++++++++-----
 1 file changed, 28 insertions(+), 8 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetaDataApplier.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetaDataApplier.java
index 5ce51447e..2e13b70d9 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetaDataApplier.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetaDataApplier.java
@@ -58,6 +58,9 @@ public class FlussMetaDataApplier implements MetadataApplier {
     private Set<SchemaChangeEventType> enabledEventTypes =
             new HashSet<>(Arrays.asList(CREATE_TABLE, DROP_TABLE));
 
+    private transient Connection connection;
+    private transient Admin admin;
+
     public FlussMetaDataApplier(
             Configuration flussClientConfig,
             Map<String, String> tableProperties,
@@ -89,12 +92,13 @@ public class FlussMetaDataApplier implements 
MetadataApplier {
     @Override
     public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) {
         LOG.info("fluss metadata applier receive schemaChangeEvent {}", 
schemaChangeEvent);
+        Admin admin = getAdmin();
         if (schemaChangeEvent instanceof CreateTableEvent) {
             CreateTableEvent createTableEvent = (CreateTableEvent) 
schemaChangeEvent;
-            applyCreateTable(createTableEvent);
+            applyCreateTable(admin, createTableEvent);
         } else if (schemaChangeEvent instanceof DropTableEvent) {
             DropTableEvent dropTableEvent = (DropTableEvent) schemaChangeEvent;
-            applyDropTable(dropTableEvent);
+            applyDropTable(admin, dropTableEvent);
         } else {
             throw new IllegalArgumentException(
                     "fluss metadata applier only support CreateTableEvent now 
but receives "
@@ -102,9 +106,8 @@ public class FlussMetaDataApplier implements 
MetadataApplier {
         }
     }
 
-    private void applyCreateTable(CreateTableEvent event) {
-        try (Connection connection = 
ConnectionFactory.createConnection(flussClientConfig);
-                Admin admin = connection.getAdmin()) {
+    private void applyCreateTable(Admin admin, CreateTableEvent event) {
+        try {
             TableId tableId = event.tableId();
             TablePath tablePath = new TablePath(tableId.getSchemaName(), 
tableId.getTableName());
             String tableIdentifier = tablePath.getDatabaseName() + "." + 
tablePath.getTableName();
@@ -126,9 +129,8 @@ public class FlussMetaDataApplier implements 
MetadataApplier {
         }
     }
 
-    private void applyDropTable(DropTableEvent event) {
-        try (Connection connection = 
ConnectionFactory.createConnection(flussClientConfig);
-                Admin admin = connection.getAdmin()) {
+    private void applyDropTable(Admin admin, DropTableEvent event) {
+        try {
             TableId tableId = event.tableId();
             TablePath tablePath = new TablePath(tableId.getSchemaName(), 
tableId.getTableName());
             admin.dropTable(tablePath, true).get();
@@ -138,6 +140,24 @@ public class FlussMetaDataApplier implements 
MetadataApplier {
         }
     }
 
+    private Admin getAdmin() {
+        if (connection == null) {
+            connection = ConnectionFactory.createConnection(flussClientConfig);
+            admin = connection.getAdmin();
+        }
+        return admin;
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (admin != null) {
+            admin.close();
+        }
+        if (connection != null) {
+            connection.close();
+        }
+    }
+
     private void sanityCheck(TableDescriptor inferredFlussTable, TableInfo 
currentTableInfo) {
         List<String> inferredPrimaryKeyColumnNames =
                 
inferredFlussTable.getSchema().getPrimaryKeyColumnNames().stream()

Reply via email to