This is an automated email from the ASF dual-hosted git repository. leonard pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push: new 086bd9996 [FLINK-36618][cdc-connector][postgres] Improve PostgresDialect.discoverDataCollections to reduce the bootstrap time 086bd9996 is described below commit 086bd9996a9a7a50597db8d172cfef42e63a8062 Author: Hongshun Wang <125648852+loserwang1...@users.noreply.github.com> AuthorDate: Mon Nov 4 19:52:38 2024 +0800 [FLINK-36618][cdc-connector][postgres] Improve PostgresDialect.discoverDataCollections to reduce the bootstrap time This closes #3672. --- .../postgres/source/PostgresDialect.java | 15 ++-- .../source/utils/CustomPostgresSchema.java | 81 ++++++++++++++++------ 2 files changed, 68 insertions(+), 28 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java index 2d17a1c52..b0c5e7952 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java @@ -50,7 +50,6 @@ import io.debezium.schema.TopicSelector; import javax.annotation.Nullable; import java.sql.SQLException; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -171,11 +170,7 @@ public class PostgresDialect implements JdbcDataSourceDialect { try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) { // fetch table schemas - Map<TableId, TableChange> tableSchemas = new HashMap<>(); - for (TableId tableId : capturedTableIds) { - TableChange tableSchema = queryTableSchema(jdbc, tableId); - tableSchemas.put(tableId, tableSchema); - } + Map<TableId, TableChange> tableSchemas = queryTableSchema(jdbc, capturedTableIds); return tableSchemas; } catch (Exception e) { throw new FlinkRuntimeException( @@ -196,6 +191,14 @@ public class PostgresDialect implements JdbcDataSourceDialect { return schema.getTableSchema(tableId); } + private Map<TableId, TableChange> queryTableSchema( + JdbcConnection jdbc, List<TableId> tableIds) { + if (schema == null) { + schema = new CustomPostgresSchema((PostgresConnection) jdbc, sourceConfig); + } + return schema.getTableSchema(tableIds); + } + @Override public FetchTask<SourceSplitBase> createFetchTask(SourceSplitBase sourceSplitBase) { if (sourceSplitBase.isSnapshotSplit()) { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/CustomPostgresSchema.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/CustomPostgresSchema.java index 66936e704..714baf5dd 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/CustomPostgresSchema.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/CustomPostgresSchema.java @@ -34,7 +34,10 @@ import io.debezium.util.Clock; import java.sql.SQLException; import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; @@ -56,7 +59,7 @@ public class CustomPostgresSchema { // read schema from cache first if (!schemasByTableId.containsKey(tableId)) { try { - readTableSchema(tableId); + readTableSchema(Collections.singletonList(tableId)); } catch (SQLException e) { throw new FlinkRuntimeException("Failed to read table schema", e); } @@ -64,22 +67,51 @@ public class CustomPostgresSchema { return schemasByTableId.get(tableId); } - private TableChange readTableSchema(TableId tableId) throws SQLException { + public Map<TableId, TableChange> getTableSchema(List<TableId> tableIds) { + // read schema from cache first + Map<TableId, TableChange> tableChanges = new HashMap(); + + List<TableId> unMatchTableIds = new ArrayList<>(); + for (TableId tableId : tableIds) { + if (schemasByTableId.containsKey(tableId)) { + tableChanges.put(tableId, schemasByTableId.get(tableId)); + } else { + unMatchTableIds.add(tableId); + } + } + + if (!unMatchTableIds.isEmpty()) { + try { + readTableSchema(tableIds); + } catch (SQLException e) { + throw new FlinkRuntimeException("Failed to read table schema", e); + } + for (TableId tableId : unMatchTableIds) { + if (schemasByTableId.containsKey(tableId)) { + tableChanges.put(tableId, schemasByTableId.get(tableId)); + } else { + throw new FlinkRuntimeException( + String.format("Failed to read table schema of table %s", tableId)); + } + } + } + return tableChanges; + } + + private List<TableChange> readTableSchema(List<TableId> tableIds) throws SQLException { + List<TableChange> tableChanges = new ArrayList<>(); final PostgresOffsetContext offsetContext = PostgresOffsetContext.initialContext(dbzConfig, jdbcConnection, Clock.SYSTEM); PostgresPartition partition = new PostgresPartition(dbzConfig.getLogicalName()); - // set the events to populate proper sourceInfo into offsetContext - offsetContext.event(tableId, Instant.now()); - Tables tables = new Tables(); try { jdbcConnection.readSchema( tables, dbzConfig.databaseName(), - tableId.schema(), + null, dbzConfig.getTableFilters().dataCollectionFilter(), null, false); @@ -87,22 +119,27 @@ public class CustomPostgresSchema { throw new FlinkRuntimeException("Failed to read schema", e); } - Table table = Objects.requireNonNull(tables.forTable(tableId)); - - // TODO: check whether we always set isFromSnapshot = true - SchemaChangeEvent schemaChangeEvent = - SchemaChangeEvent.ofCreate( - partition, - offsetContext, - dbzConfig.databaseName(), - tableId.schema(), - null, - table, - true); - - for (TableChanges.TableChange tableChange : schemaChangeEvent.getTableChanges()) { - this.schemasByTableId.put(tableId, tableChange); + for (TableId tableId : tableIds) { + Table table = Objects.requireNonNull(tables.forTable(tableId)); + // set the events to populate proper sourceInfo into offsetContext + offsetContext.event(tableId, Instant.now()); + + // TODO: check whether we always set isFromSnapshot = true + SchemaChangeEvent schemaChangeEvent = + SchemaChangeEvent.ofCreate( + partition, + offsetContext, + dbzConfig.databaseName(), + tableId.schema(), + null, + table, + true); + + for (TableChanges.TableChange tableChange : schemaChangeEvent.getTableChanges()) { + this.schemasByTableId.put(tableId, tableChange); + } + tableChanges.add(this.schemasByTableId.get(tableId)); } - return this.schemasByTableId.get(tableId); + return tableChanges; } }