This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 086bd9996 [FLINK-36618][cdc-connector][postgres] Improve 
PostgresDialect.discoverDataCollections to reduce the bootstrap time
086bd9996 is described below

commit 086bd9996a9a7a50597db8d172cfef42e63a8062
Author: Hongshun Wang <125648852+loserwang1...@users.noreply.github.com>
AuthorDate: Mon Nov 4 19:52:38 2024 +0800

    [FLINK-36618][cdc-connector][postgres] Improve 
PostgresDialect.discoverDataCollections to reduce the bootstrap time
    
    This closes #3672.
---
 .../postgres/source/PostgresDialect.java           | 15 ++--
 .../source/utils/CustomPostgresSchema.java         | 81 ++++++++++++++++------
 2 files changed, 68 insertions(+), 28 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java
index 2d17a1c52..b0c5e7952 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java
@@ -50,7 +50,6 @@ import io.debezium.schema.TopicSelector;
 import javax.annotation.Nullable;
 
 import java.sql.SQLException;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -171,11 +170,7 @@ public class PostgresDialect implements 
JdbcDataSourceDialect {
 
         try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {
             // fetch table schemas
-            Map<TableId, TableChange> tableSchemas = new HashMap<>();
-            for (TableId tableId : capturedTableIds) {
-                TableChange tableSchema = queryTableSchema(jdbc, tableId);
-                tableSchemas.put(tableId, tableSchema);
-            }
+            Map<TableId, TableChange> tableSchemas = queryTableSchema(jdbc, 
capturedTableIds);
             return tableSchemas;
         } catch (Exception e) {
             throw new FlinkRuntimeException(
@@ -196,6 +191,14 @@ public class PostgresDialect implements 
JdbcDataSourceDialect {
         return schema.getTableSchema(tableId);
     }
 
+    private Map<TableId, TableChange> queryTableSchema(
+            JdbcConnection jdbc, List<TableId> tableIds) {
+        if (schema == null) {
+            schema = new CustomPostgresSchema((PostgresConnection) jdbc, 
sourceConfig);
+        }
+        return schema.getTableSchema(tableIds);
+    }
+
     @Override
     public FetchTask<SourceSplitBase> createFetchTask(SourceSplitBase 
sourceSplitBase) {
         if (sourceSplitBase.isSnapshotSplit()) {
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/CustomPostgresSchema.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/CustomPostgresSchema.java
index 66936e704..714baf5dd 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/CustomPostgresSchema.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/CustomPostgresSchema.java
@@ -34,7 +34,10 @@ import io.debezium.util.Clock;
 
 import java.sql.SQLException;
 import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
@@ -56,7 +59,7 @@ public class CustomPostgresSchema {
         // read schema from cache first
         if (!schemasByTableId.containsKey(tableId)) {
             try {
-                readTableSchema(tableId);
+                readTableSchema(Collections.singletonList(tableId));
             } catch (SQLException e) {
                 throw new FlinkRuntimeException("Failed to read table schema", 
e);
             }
@@ -64,22 +67,51 @@ public class CustomPostgresSchema {
         return schemasByTableId.get(tableId);
     }
 
-    private TableChange readTableSchema(TableId tableId) throws SQLException {
+    public Map<TableId, TableChange> getTableSchema(List<TableId> tableIds) {
+        // read schema from cache first
+        Map<TableId, TableChange> tableChanges = new HashMap();
+
+        List<TableId> unMatchTableIds = new ArrayList<>();
+        for (TableId tableId : tableIds) {
+            if (schemasByTableId.containsKey(tableId)) {
+                tableChanges.put(tableId, schemasByTableId.get(tableId));
+            } else {
+                unMatchTableIds.add(tableId);
+            }
+        }
+
+        if (!unMatchTableIds.isEmpty()) {
+            try {
+                readTableSchema(tableIds);
+            } catch (SQLException e) {
+                throw new FlinkRuntimeException("Failed to read table schema", 
e);
+            }
+            for (TableId tableId : unMatchTableIds) {
+                if (schemasByTableId.containsKey(tableId)) {
+                    tableChanges.put(tableId, schemasByTableId.get(tableId));
+                } else {
+                    throw new FlinkRuntimeException(
+                            String.format("Failed to read table schema of 
table %s", tableId));
+                }
+            }
+        }
+        return tableChanges;
+    }
+
+    private List<TableChange> readTableSchema(List<TableId> tableIds) throws 
SQLException {
+        List<TableChange> tableChanges = new ArrayList<>();
 
         final PostgresOffsetContext offsetContext =
                 PostgresOffsetContext.initialContext(dbzConfig, 
jdbcConnection, Clock.SYSTEM);
 
         PostgresPartition partition = new 
PostgresPartition(dbzConfig.getLogicalName());
 
-        // set the events to populate proper sourceInfo into offsetContext
-        offsetContext.event(tableId, Instant.now());
-
         Tables tables = new Tables();
         try {
             jdbcConnection.readSchema(
                     tables,
                     dbzConfig.databaseName(),
-                    tableId.schema(),
+                    null,
                     dbzConfig.getTableFilters().dataCollectionFilter(),
                     null,
                     false);
@@ -87,22 +119,27 @@ public class CustomPostgresSchema {
             throw new FlinkRuntimeException("Failed to read schema", e);
         }
 
-        Table table = Objects.requireNonNull(tables.forTable(tableId));
-
-        // TODO: check whether we always set isFromSnapshot = true
-        SchemaChangeEvent schemaChangeEvent =
-                SchemaChangeEvent.ofCreate(
-                        partition,
-                        offsetContext,
-                        dbzConfig.databaseName(),
-                        tableId.schema(),
-                        null,
-                        table,
-                        true);
-
-        for (TableChanges.TableChange tableChange : 
schemaChangeEvent.getTableChanges()) {
-            this.schemasByTableId.put(tableId, tableChange);
+        for (TableId tableId : tableIds) {
+            Table table = Objects.requireNonNull(tables.forTable(tableId));
+            // set the events to populate proper sourceInfo into offsetContext
+            offsetContext.event(tableId, Instant.now());
+
+            // TODO: check whether we always set isFromSnapshot = true
+            SchemaChangeEvent schemaChangeEvent =
+                    SchemaChangeEvent.ofCreate(
+                            partition,
+                            offsetContext,
+                            dbzConfig.databaseName(),
+                            tableId.schema(),
+                            null,
+                            table,
+                            true);
+
+            for (TableChanges.TableChange tableChange : 
schemaChangeEvent.getTableChanges()) {
+                this.schemasByTableId.put(tableId, tableChange);
+            }
+            tableChanges.add(this.schemasByTableId.get(tableId));
         }
-        return this.schemasByTableId.get(tableId);
+        return tableChanges;
     }
 }

Reply via email to