hql0312 commented on code in PR #3808:
URL: https://github.com/apache/flink-cdc/pull/3808#discussion_r1898300731
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/CustomPostgresSchema.java:
##########
@@ -112,7 +112,13 @@ private List<TableChange> readTableSchema(List<TableId>
tableIds) throws SQLExce
tables,
dbzConfig.databaseName(),
null,
- dbzConfig.getTableFilters().dataCollectionFilter(),
+ // only check context tableIds
+ (tb) ->
Review Comment:
in PostgresDialect will cache the TableChange which had load. but first
time, all the table is not loaded.
you can trace JdbcSourceChunkSplitter.generateSplits method, which will call
analyzeTable, then call PostgresqlDialect.queryTableSchema ,and finally call
CustomPostgresSchema.getTableSchema, the code follow:
```java
public TableChange getTableSchema(TableId tableId) {
// read schema from cache first
if (!schemasByTableId.containsKey(tableId)) {
try {
readTableSchema(Collections.singletonList(tableId));
} catch (SQLException e) {
throw new FlinkRuntimeException("Failed to read table
schema", e);
}
}
return schemasByTableId.get(tableId);
}
```
this will call readTableSchema for the tableId which had not loaded.
in readTableSchema method will call jdbcConnection.readSchema with
dbzConfig.getTableFilters().dataCollectionFilter()
```java
public void readSchema(Tables tables, String databaseCatalog, String
schemaNamePattern,
TableFilter tableFilter, ColumnNameFilter
columnFilter, boolean removeTablesNotFoundInJdbc)
throws SQLException {
// Before we make any changes, get the copy of the set of table IDs
...
Set<TableId> tableIdsBefore = new HashSet<>(tables.tableIds());
// Read the metadata for the table columns ...
DatabaseMetaData metadata = connection().getMetaData();
// Find regular and materialized views as they cannot be snapshotted
final Set<TableId> viewIds = new HashSet<>();
final Set<TableId> tableIds = new HashSet<>();
int totalTables = 0;
// **the logic will load the tables which match
dbzConfig.getTableFilters().dataCollectionFilter()**
try (final ResultSet rs = metadata.getTables(databaseCatalog,
schemaNamePattern, null, supportedTableTypes())) {
while (rs.next()) {
final String catalogName =
resolveCatalogName(rs.getString(1));
final String schemaName = rs.getString(2);
final String tableName = rs.getString(3);
final String tableType = rs.getString(4);
if (isTableType(tableType)) {
totalTables++;
TableId tableId = new TableId(catalogName, schemaName,
tableName);
if (tableFilter == null ||
tableFilter.isIncluded(tableId)) {
tableIds.add(tableId);
}
}
else {
TableId tableId = new TableId(catalogName, schemaName,
tableName);
viewIds.add(tableId);
}
}
}
Map<TableId, List<Column>> columnsByTable = new HashMap<>();
if (totalTables == tableIds.size() ||
config.getBoolean(RelationalDatabaseConnectorConfig.SNAPSHOT_FULL_COLUMN_SCAN_FORCE))
{
columnsByTable = getColumnsDetails(databaseCatalog,
schemaNamePattern, null, tableFilter, columnFilter, metadata, viewIds);
}
else {
// **load the tables which match
dbzConfig.getTableFilters().dataCollectionFilter() for each tableId**
for (TableId includeTable : tableIds) {
LOGGER.debug("Retrieving columns of table {}", includeTable);
Map<TableId, List<Column>> cols =
getColumnsDetails(databaseCatalog, schemaNamePattern, includeTable.table(),
tableFilter,
columnFilter, metadata, viewIds);
columnsByTable.putAll(cols);
}
}
// Read the metadata for the primary keys ...
for (Entry<TableId, List<Column>> tableEntry :
columnsByTable.entrySet()) {
// First get the primary key information, which must be done for
*each* table ...
List<String> pkColumnNames =
readPrimaryKeyOrUniqueIndexNames(metadata, tableEntry.getKey());
// Then define the table ...
List<Column> columns = tableEntry.getValue();
Collections.sort(columns);
String defaultCharsetName = null; // JDBC does not expose
character sets
tables.overwriteTable(tableEntry.getKey(), columns,
pkColumnNames, defaultCharsetName);
}
if (removeTablesNotFoundInJdbc) {
// Remove any definitions for tables that were not found in the
database metadata ...
tableIdsBefore.removeAll(columnsByTable.keySet());
tableIdsBefore.forEach(tables::removeTable);
}
}
```
,this will load all tables which match
dbzConfig.getTableFilters().dataCollectionFilter() . the logic will run once
for each tableId which is the first to load, you can focus the comment content
with **
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]