[ 
https://issues.apache.org/jira/browse/FLINK-34666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hongshun Wang updated FLINK-34666:
----------------------------------
    Description: 
## Reason

When added newly tables, and then restart job, 
IncrementalSourceEnumerator#sendStreamMetaRequestEvent -> 
SplitAssigner#getFinishedSplitInfos maybe return unordered  finishedSplitInfos 
(newly snapshot infos is ahead of older one). When Reader request newly table's 
infos, will get older one, then never read all the infos and restart changelog 
read.

 

## How to reproduced it?

Add  chunk-meta.group.size = 2 in 
getCreateTableStatement, then run test of 
org.apache.flink.cdc.connectors.postgres.source.NewlyAddedTableITCase#testNewlyAddedTableForExistsPipelineTwiceWithAheadWalLog
{code:java}
//代码占位符
private String getCreateTableStatement(
        Map<String, String> otherOptions, String... captureTableNames) {
    return String.format(
            "CREATE TABLE address ("
                    + " table_name STRING METADATA VIRTUAL,"
                    + " id BIGINT NOT NULL,"
                    + " country STRING,"
                    + " city STRING,"
                    + " detail_address STRING,"
                    + " primary key (id) not enforced"
                    + ") WITH ("
                    + " 'connector' = 'postgres-cdc',"
                    + " 'scan.incremental.snapshot.enabled' = 'true',"
                    + " 'hostname' = '%s',"
                    + " 'port' = '%s',"
                    + " 'username' = '%s',"
                    + " 'password' = '%s',"
                    + " 'database-name' = '%s',"
                    + " 'schema-name' = '%s',"
                    + " 'table-name' = '%s',"
                    + " 'slot.name' = '%s', "
                    + " 'scan.incremental.snapshot.chunk.size' = '2',"
                    + " 'chunk-meta.group.size' = '2',"
                    + " 'scan.newly-added-table.enabled' = 'true'"
                    + " %s"
                    + ")",
            customDatabase.getHost(),
            customDatabase.getDatabasePort(),
            customDatabase.getUsername(),
            customDatabase.getPassword(),
            customDatabase.getDatabaseName(),
            SCHEMA_NAME,
            PostgresTestUtils.getTableNameRegex(captureTableNames),
            slotName,
            otherOptions.isEmpty()
                    ? ""
                    : ","
                            + otherOptions.entrySet().stream()
                                    .map(
                                            e ->
                                                    String.format(
                                                            "'%s'='%s'",
                                                            e.getKey(), 
e.getValue()))
                                    .collect(Collectors.joining(",")));
} {code}

> Keep assigned splits in order to fix wrong meta group calculation
> -----------------------------------------------------------------
>
>                 Key: FLINK-34666
>                 URL: https://issues.apache.org/jira/browse/FLINK-34666
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>            Reporter: Hongshun Wang
>            Priority: Major
>             Fix For: cdc-3.1.0
>
>
> ## Reason
> When added newly tables, and then restart job, 
> IncrementalSourceEnumerator#sendStreamMetaRequestEvent -> 
> SplitAssigner#getFinishedSplitInfos maybe return unordered  
> finishedSplitInfos (newly snapshot infos is ahead of older one). When Reader 
> request newly table's infos, will get older one, then never read all the 
> infos and restart changelog read.
>  
> ## How to reproduced it?
> Add  chunk-meta.group.size = 2 in 
> getCreateTableStatement, then run test of 
> org.apache.flink.cdc.connectors.postgres.source.NewlyAddedTableITCase#testNewlyAddedTableForExistsPipelineTwiceWithAheadWalLog
> {code:java}
> //代码占位符
> private String getCreateTableStatement(
>         Map<String, String> otherOptions, String... captureTableNames) {
>     return String.format(
>             "CREATE TABLE address ("
>                     + " table_name STRING METADATA VIRTUAL,"
>                     + " id BIGINT NOT NULL,"
>                     + " country STRING,"
>                     + " city STRING,"
>                     + " detail_address STRING,"
>                     + " primary key (id) not enforced"
>                     + ") WITH ("
>                     + " 'connector' = 'postgres-cdc',"
>                     + " 'scan.incremental.snapshot.enabled' = 'true',"
>                     + " 'hostname' = '%s',"
>                     + " 'port' = '%s',"
>                     + " 'username' = '%s',"
>                     + " 'password' = '%s',"
>                     + " 'database-name' = '%s',"
>                     + " 'schema-name' = '%s',"
>                     + " 'table-name' = '%s',"
>                     + " 'slot.name' = '%s', "
>                     + " 'scan.incremental.snapshot.chunk.size' = '2',"
>                     + " 'chunk-meta.group.size' = '2',"
>                     + " 'scan.newly-added-table.enabled' = 'true'"
>                     + " %s"
>                     + ")",
>             customDatabase.getHost(),
>             customDatabase.getDatabasePort(),
>             customDatabase.getUsername(),
>             customDatabase.getPassword(),
>             customDatabase.getDatabaseName(),
>             SCHEMA_NAME,
>             PostgresTestUtils.getTableNameRegex(captureTableNames),
>             slotName,
>             otherOptions.isEmpty()
>                     ? ""
>                     : ","
>                             + otherOptions.entrySet().stream()
>                                     .map(
>                                             e ->
>                                                     String.format(
>                                                             "'%s'='%s'",
>                                                             e.getKey(), 
> e.getValue()))
>                                     .collect(Collectors.joining(",")));
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to