[ 
https://issues.apache.org/jira/browse/FLINK-39844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-39844:
-----------------------------------
    Labels: pull-request-available  (was: )

> PostgreSQL CDC: filter table columns by schema name to fix LIKE-wildcard 
> cross-schema column   leakage
> ------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39844
>                 URL: https://issues.apache.org/jira/browse/FLINK-39844
>             Project: Flink
>          Issue Type: Improvement
>          Components: Flink CDC
>    Affects Versions: cdc-3.6.0
>            Reporter: Di Wu
>            Priority: Major
>              Labels: pull-request-available
>
> *Background*
>  
>   During the snapshot phase, the PostgreSQL connector reads the column 
> structure of captured tables
>   via JDBC DatabaseMetaData#getColumns(catalog, schemaPattern, 
> tableNamePattern,
>   columnNamePattern).
>  
>   Per the JDBC spec, *both* *schemaPattern* *and* *tableNamePattern* *are* 
> *LIKE* {*}patterns{*}:
>   - _ matches any single character
>   - % matches any sequence of characters
>   
>   So when a schema name or table name contains _ / % (both are legal 
> identifier characters in
>   PostgreSQL), getColumns may return columns from *other schemas / tables 
> that were not meant to* 
>   {*}match{*}.
>  
>   *Current State*
>  
>   An exact filter on the *table* *name* is already in place: for each column 
> returned by getColumns,
>   the result-set column 3 TABLE_NAME is compared for equality against the 
> target TableId.table(),
>   and the column is dropped on mismatch.
>  
>   However, the *schema* *dimension* *is* {*}missing{*}. Result-set column 2 
> TABLE_SCHEM is never validated.
>  
>   *Problem*
>  
>   Suppose a database has two schemas whose names differ by only a single 
> character:
>  
>   - sch_test (the target)
>   - schxtest (when _ acts as a wildcard, the pattern sch_test also matches it)
>  
>   Each schema contains a table with the same name, cross_schema_tbl. When the 
> user only wants to
>   capture sch_test.cross_schema_tbl:
>  
>   1. The connector calls getColumns(catalog, "sch_test", "cross_schema_tbl", 
> null).
>   2. PostgreSQL treats sch_test as a LIKE pattern, so schxtest matches too, 
> and columns of the
>   same-named table from *both* schemas are returned together.
>   3. The existing table-name filter cannot stop this — both tables have 
> TABLE_NAME equal to
>   cross_schema_tbl.
>   4. The same-named columns are loaded into a single table schema, and the 
> snapshot phase throws
>   IllegalStateException: Duplicate key Optional.empty, failing the job.
>  
>   The % case is analogous (when a schema name contains a literal %).
>   
>   *Impact*
>  
>   - {*}Trigger condition{*}: a single PostgreSQL instance contains multiple 
> schemas whose names are
>   LIKE-wildcard matches of one another, and they hold same-named tables.
>   - {*}Consequence{*}: the job fails outright during the snapshot phase, or 
> (when column names differ)
>   columns from another schema are mistakenly merged in, producing an 
> incorrect schema / dirty data.
>  
>   *Proposed* *Change*
>  
>   Add an *exact comparison on the schema name* next to the existing 
> table-name comparison: read
>   result-set column 2 TABLE_SCHEM and compare it for equality against 
> TableId.schema().
>  
>   final String resultSchemaName = columnMetadata.getString(2);
>   final String resultTableName = columnMetadata.getString(3);
>   if (!tableId.table().equals(resultTableName)
>           || (tableId.schema() != null && 
> !tableId.schema().equals(resultSchemaName))) {
>       return Optional.empty();
>   }
>   
>   Edge case: when TableId.schema() is null (no schema specified), the schema 
> check is skipped, to
>   avoid treating the non-null schema name returned by the metadata as unequal 
> and erroneously
>   dropping all columns.
>   
>   *Compatibility*
>  
>   This is a pure after-the-fact filter. It only tightens the decision of 
> "which columns belong to
>   the target table"; it does not change the query, and it does not affect 
> normal schemas/tables
>   (those without wildcard characters). Existing test behavior is unchanged.
>   
>   *Testing*
>  
>   Add a cross-schema case to SimilarTableNamesITCase: create two schemas 
> sch_test / schxtest that
>   are wildcard matches of each other, each holding a same-named table 
> cross_schema_tbl, and verify
>   that only the target schema's snapshot and incremental data are captured, 
> with no leakage from
>   the look-alike schema.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to