[ 
https://issues.apache.org/jira/browse/FLINK-36584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17893028#comment-17893028
 ] 

Anil Dasari commented on FLINK-36584:
-------------------------------------

[~loserwang1024] Thank you for sharing the details. 

Issue 2: Will try backfill config. thank you.

Issue 1:

I’ve reviewed the PR comments, but I didn’t quite understand the term 
'high_watermarks.' Could you please explain it further? In the meantime, I’ll 
check the documentation to see if I can find any specific information on it.

In my opinion, the snapshot mode is used primarily for initial data 
synchronization. So there are two options for snapshot
 # Use clone DB for snapshot
 # Use primary DB for snapshot

Option 1: It addresses your point "If we only include snapshot split , cannot 
get a consistency snapshot at any point, unless any operation is not allowed 
during snapshot splits read". There wont be any operations on this DB. 

Option 2: Both snapshot and streaming operations take place on the same primary 
database. Therefore, the database connection requires repeatable read isolation 
to ensure that new transactions occurring after the replication slot is created 
are not visible during the snapshot process (i.e., during select queries). This 
approach is used in the Debezium snapshot process.

I believe Single transaction for multiple distributed snapshot split reader 
tasks is not possible (?) —except possibly through some sort of DB transaction 
savepoint functionality, which I vaguely recall—to have a single transaction 
span multiple tasks processing each split.

i think the same is discussed in your example ""For example, we have three 
snapshot splits. During the first split and third split, some update operations 
are applied. When read third snapshot split, it can read these operations to 
it, but the first splits cannot." I am still unclear on how new operations 
changes are bypassed during snapshot process 

Also, Could you please elaborate your second comment of 
[https://github.com/apache/flink-cdc/issues/2867#issuecomment-1863915564] ? I 
mean how are we making sure of "I want all chunks to reach the same version of 
database data.". Please point me to the design doc if any to read. thanks. 

I am using Option 1 for now,  skip.backfill=true bypasses stream split. Please 
confirm ? 

> PostgresIncrementalSource is not exiting the flink execution when 
> StartupOptions is snapshot and create multiple replication slots
> ----------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-36584
>                 URL: https://issues.apache.org/jira/browse/FLINK-36584
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>    Affects Versions: 3.0.0
>            Reporter: Anil Dasari
>            Priority: Major
>
> Issue-1. PostgresIncrementalSource is not exiting the Flink execution when 
> StartupOptions is snapshot.
>  
> Postgres cdc module is using HybridSplitAssigner for batch scan and is 
> [https://github.com/apache/flink-cdc/blob/master/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/HybridSplitAssigner.java#L128]
>  still trying to create streaming split when snapshot splits are completed. 
>  
> Issue-2. When source parallelism is > 1 i.e 2, PostgresIncrementalSource is 
> creating multiple replication slots.
>  
> postgres logs: 
> {code:java}
> flink-postgres-1     | 2024-10-22 18:56:57.649 UTC [48] LOG:  logical 
> decoding found consistent point at 0/1690B28
> flink-postgres-1     | 2024-10-22 18:56:57.649 UTC [48] DETAIL:  There are no 
> running transactions.
> flink-postgres-1     | 2024-10-22 18:56:57.649 UTC [48] LOG:  exported 
> logical decoding snapshot: "00000006-00000003-1" with 0 transaction IDs
> flink-postgres-1     | 2024-10-22 18:56:58.226 UTC [51] LOG:  logical 
> decoding found consistent point at 0/1690BF8
> flink-postgres-1     | 2024-10-22 18:56:58.226 UTC [51] DETAIL:  There are no 
> running transactions.
> flink-postgres-1     | 2024-10-22 18:56:58.226 UTC [51] LOG:  exported 
> logical decoding snapshot: "00000008-00000003-1" with 0 transaction IDs
> flink-postgres-1     | 2024-10-22 18:56:58.266 UTC [52] LOG:  logical 
> decoding found consistent point at 0/1690C30
> flink-postgres-1     | 2024-10-22 18:56:58.266 UTC [52] DETAIL:  There are no 
> running transactions.
> flink-postgres-1     | 2024-10-22 18:56:58.267 UTC [52] LOG:  exported 
> logical decoding snapshot: "00000009-00000003-1" with 0 transaction IDs
> flink-postgres-1     | 2024-10-22 18:56:58.612 UTC [51] LOG:  starting 
> logical decoding for slot "flinkpostgres_0"
> flink-postgres-1     | 2024-10-22 18:56:58.612 UTC [51] DETAIL:  Streaming 
> transactions committing after 0/1690C30, reading WAL from 0/1690BF8.
> flink-postgres-1     | 2024-10-22 18:56:58.614 UTC [51] LOG:  logical 
> decoding found consistent point at 0/1690BF8
> flink-postgres-1     | 2024-10-22 18:56:58.614 UTC [51] DETAIL:  There are no 
> running transactions.
> flink-postgres-1     | 2024-10-22 18:56:58.753 UTC [56] ERROR:  replication 
> slot "flinkpostgres_1" does not exist
> flink-postgres-1     | 2024-10-22 18:56:58.753 UTC [56] STATEMENT:  select 
> pg_drop_replication_slot('flinkpostgres_1')
> flink-postgres-1     | 2024-10-22 18:56:59.347 UTC [57] LOG:  starting 
> logical decoding for slot "flinkpostgres_0"
> flink-postgres-1     | 2024-10-22 18:56:59.347 UTC [57] DETAIL:  Streaming 
> transactions committing after 0/1690C30, reading WAL from 0/1690C30.
> flink-postgres-1     | 2024-10-22 18:56:59.348 UTC [57] LOG:  logical 
> decoding found consistent point at 0/1690C30
> flink-postgres-1     | 2024-10-22 18:56:59.348 UTC [57] DETAIL:  There are no 
> running transactions.
> flink-postgres-1     | 2024-10-22 18:56:59.423 UTC [59] ERROR:  replication 
> slot "flinkpostgres_0" does not exist
> flink-postgres-1     | 2024-10-22 18:56:59.423 UTC [59] STATEMENT:  select 
> pg_drop_replication_slot('flinkpostgres_0')
> flink-postgres-1     | 2024-10-22 18:56:59.673 UTC [60] ERROR:  replication 
> slot "flinkpostgres_0" does not exist
> flink-postgres-1     | 2024-10-22 18:56:59.673 UTC [60] STATEMENT:  select 
> pg_drop_replication_slot('flinkpostgres_0') {code}
> Setup:
>  
> flink-cdc version : 3.2.0 
> flink version: 1.19
> Steps to reproduce the issue:
>  
> 1. main code: 
> {code:java}
> DebeziumDeserializationSchema<String> deserializer =
>                 new JsonDebeziumDeserializationSchema();
>         JdbcIncrementalSource<String> postgresIncrementalSource =
>                 
> PostgresSourceBuilder.PostgresIncrementalSource.<String>builder()
>                         .startupOptions(StartupOptions.snapshot())
>                         .hostname("localhost")
>                         .port(5432)
>                         .database("test")
>                         .schemaList("public")
>                         .username("postgres")
>                         .password("postgres")
>                         .slotName("flinkpostgres")
>                         .decodingPluginName("pgoutput")
>                         .deserializer(deserializer)
> //                        .splitSize(2)
>                         .build();
>        
>         Configuration config = new Configuration();
>         config.set(RestartStrategyOptions.RESTART_STRATEGY, "none");
>         config.setString("heartbeat.interval", "6000000"); // 100 minutes
>         config.setString("heartbeat.timeout", "18000000");
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment(config);
>         env.enableCheckpointing(300000);
>         env.fromSource(
>                         postgresIncrementalSource,
>                         WatermarkStrategy.noWatermarks(),
>                         "PostgresParallelSource")
>                 .setParallelism(2)
>                 .print();
>         env.execute("Output Postgres Snapshot"); {code}
> 2. Create two tables and records in postgres
> 3. Run step#1 code.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to