[
https://issues.apache.org/jira/browse/FLINK-34666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17844495#comment-17844495
]
Hongshun Wang commented on FLINK-34666:
---------------------------------------
Have done it in FLINK-34634, please close it. [~renqs]
> Keep assigned splits in order to fix wrong meta group calculation
> -----------------------------------------------------------------
>
> Key: FLINK-34666
> URL: https://issues.apache.org/jira/browse/FLINK-34666
> Project: Flink
> Issue Type: Bug
> Components: Flink CDC
> Reporter: Hongshun Wang
> Priority: Major
> Fix For: cdc-3.1.0
>
>
> h3. Reason
> When added newly tables, and then restart job,
> IncrementalSourceEnumerator#sendStreamMetaRequestEvent ->
> SplitAssigner#getFinishedSplitInfos maybe return unordered
> finishedSplitInfos (newly snapshot infos is ahead of older one). When Reader
> request newly table's infos, will get older one, then never read all the
> infos and restart changelog read.
>
> h3. How to reproduced it?
> Add chunk-meta.group.size = 2 in
> getCreateTableStatement, then run test of
> org.apache.flink.cdc.connectors.postgres.source.NewlyAddedTableITCase#testNewlyAddedTableForExistsPipelineTwiceWithAheadWalLog
> {code:java}
> //代码占位符
> private String getCreateTableStatement(
> Map<String, String> otherOptions, String... captureTableNames) {
> return String.format(
> "CREATE TABLE address ("
> + " table_name STRING METADATA VIRTUAL,"
> + " id BIGINT NOT NULL,"
> + " country STRING,"
> + " city STRING,"
> + " detail_address STRING,"
> + " primary key (id) not enforced"
> + ") WITH ("
> + " 'connector' = 'postgres-cdc',"
> + " 'scan.incremental.snapshot.enabled' = 'true',"
> + " 'hostname' = '%s',"
> + " 'port' = '%s',"
> + " 'username' = '%s',"
> + " 'password' = '%s',"
> + " 'database-name' = '%s',"
> + " 'schema-name' = '%s',"
> + " 'table-name' = '%s',"
> + " 'slot.name' = '%s', "
> + " 'scan.incremental.snapshot.chunk.size' = '2',"
> + " 'chunk-meta.group.size' = '2',"
> + " 'scan.newly-added-table.enabled' = 'true'"
> + " %s"
> + ")",
> customDatabase.getHost(),
> customDatabase.getDatabasePort(),
> customDatabase.getUsername(),
> customDatabase.getPassword(),
> customDatabase.getDatabaseName(),
> SCHEMA_NAME,
> PostgresTestUtils.getTableNameRegex(captureTableNames),
> slotName,
> otherOptions.isEmpty()
> ? ""
> : ","
> + otherOptions.entrySet().stream()
> .map(
> e ->
> String.format(
> "'%s'='%s'",
> e.getKey(),
> e.getValue()))
> .collect(Collectors.joining(",")));
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)