[
https://issues.apache.org/jira/browse/FLINK-36584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17893337#comment-17893337
]
Anil Dasari edited comment on FLINK-36584 at 10/28/24 5:21 AM:
---------------------------------------------------------------
[~loserwang1024] Thanks for the sharing details.
On issue 2: I've enabled the skip snapshot backfill by using
{{{}PostgresSourceBuilder.PostgresIncrementalSource#skipSnapshotBackfill(true){}}},
and the task index replication slots have been created. However, I've noticed
that {{PostgresScanFetchTask#maybeCreateSlotForBackFillReadTask}} is bypassing
the task-based replication slot when skipSnapshotBackfill is set to true. I'm
currently debugging the code to determine where 'backfillslottest_0' is used in
the process.
when set to true:
{code:java}
flink-postgres-1 | 2024-10-28 04:57:26.240 UTC [212] LOG: logical decoding
found consistent point at 0/1677190
flink-postgres-1 | 2024-10-28 04:57:26.240 UTC [212] DETAIL: There are no
running transactions.
flink-postgres-1 | 2024-10-28 04:57:26.240 UTC [212] LOG: exported logical
decoding snapshot: "00000006-0000013E-1" with 0 transaction IDs
flink-postgres-1 | 2024-10-28 04:57:27.291 UTC [214] ERROR: replication slot
"backfillslottest_0" does not exist
flink-postgres-1 | 2024-10-28 04:57:27.291 UTC [214] STATEMENT: select
pg_drop_replication_slot('backfillslottest_0')
flink-postgres-1 | 2024-10-28 04:57:27.734 UTC [216] LOG: starting logical
decoding for slot "backfillslottest"
flink-postgres-1 | 2024-10-28 04:57:27.734 UTC [216] DETAIL: Streaming
transactions committing after 0/16771C8, reading WAL from 0/1677190.
flink-postgres-1 | 2024-10-28 04:57:27.735 UTC [216] LOG: logical decoding
found consistent point at 0/1677190
flink-postgres-1 | 2024-10-28 04:57:27.735 UTC [216] DETAIL: There are no
running transactions.
flink-postgres-1 | 2024-10-28 04:57:27.846 UTC [218] LOG: starting logical
decoding for slot "backfillslottest"
flink-postgres-1 | 2024-10-28 04:57:27.846 UTC [218] DETAIL: Streaming
transactions committing after 0/16771C8, reading WAL from 0/1677190.
flink-postgres-1 | 2024-10-28 04:57:27.847 UTC [218] LOG: logical decoding
found consistent point at 0/1677190
flink-postgres-1 | 2024-10-28 04:57:27.847 UTC [218] DETAIL: There are no
running transactions. {code}
when set to false:
{code:java}
flink-postgres-1 | 2024-10-28 04:56:55.409 UTC [199] LOG: logical decoding
found consistent point at 0/1677090
flink-postgres-1 | 2024-10-28 04:56:55.409 UTC [199] DETAIL: There are no
running transactions.
flink-postgres-1 | 2024-10-28 04:56:55.409 UTC [199] LOG: exported logical
decoding snapshot: "00000007-0000001A-1" with 0 transaction IDs
flink-postgres-1 | 2024-10-28 04:56:55.614 UTC [199] LOG: starting logical
decoding for slot "nobackfillslot_0"
flink-postgres-1 | 2024-10-28 04:56:55.614 UTC [199] DETAIL: Streaming
transactions committing after 0/16770C8, reading WAL from 0/1677090.
flink-postgres-1 | 2024-10-28 04:56:55.615 UTC [199] LOG: logical decoding
found consistent point at 0/1677090
flink-postgres-1 | 2024-10-28 04:56:55.615 UTC [199] DETAIL: There are no
running transactions.
flink-postgres-1 | 2024-10-28 04:56:56.238 UTC [201] LOG: starting logical
decoding for slot "nobackfillslot_0"
flink-postgres-1 | 2024-10-28 04:56:56.238 UTC [201] DETAIL: Streaming
transactions committing after 0/16770C8, reading WAL from 0/1677090.
flink-postgres-1 | 2024-10-28 04:56:56.239 UTC [201] LOG: logical decoding
found consistent point at 0/1677090
flink-postgres-1 | 2024-10-28 04:56:56.239 UTC [201] DETAIL: There are no
running transactions.
flink-postgres-1 | 2024-10-28 04:56:56.295 UTC [203] ERROR: replication slot
"nobackfillslot_0" does not exist
flink-postgres-1 | 2024-10-28 04:56:56.295 UTC [203] STATEMENT: select
pg_drop_replication_slot('nobackfillslot_0')
flink-postgres-1 | 2024-10-28 04:56:56.391 UTC [204] ERROR: replication slot
"nobackfillslot_0" does not exist
flink-postgres-1 | 2024-10-28 04:56:56.391 UTC [204] STATEMENT: select
pg_drop_replication_slot('nobackfillslot_0')
flink-postgres-1 | 2024-10-28 04:56:56.765 UTC [206] LOG: starting logical
decoding for slot "nobackfillslot"
flink-postgres-1 | 2024-10-28 04:56:56.765 UTC [206] DETAIL: Streaming
transactions committing after 0/1677060, reading WAL from 0/1677028.
flink-postgres-1 | 2024-10-28 04:56:56.766 UTC [206] LOG: logical decoding
found consistent point at 0/1677028
flink-postgres-1 | 2024-10-28 04:56:56.766 UTC [206] DETAIL: There are no
running transactions.
flink-postgres-1 | 2024-10-28 04:57:04.548 UTC [209] LOG: starting logical
decoding for slot "nobackfillslot"
flink-postgres-1 | 2024-10-28 04:57:04.548 UTC [209] DETAIL: Streaming
transactions committing after 0/1677060, reading WAL from 0/1677028.
flink-postgres-1 | 2024-10-28 04:57:04.548 UTC [209] LOG: logical decoding
found consistent point at 0/1677028
flink-postgres-1 | 2024-10-28 04:57:04.548 UTC [209] DETAIL: There are no
running transactions. {code}
Followup question on issue -1:
streamsplit created
[here|https://github.com/apache/flink-cdc/blob/master/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/HybridSplitAssigner.java#L140]
(even when the startup mode is set to snapshot only and new table scans are
disabled) is different from the backfill task created [here|#L123].] Could you
please clarify ?
Backfill task is submitted at the end of the snapshot task and not at the end
of snapshot (i.e on completion of all snapshot tasks) ?
was (Author: JIRAUSER283879):
[~loserwang1024] Thanks for the sharing details.
On issue 2: I've enabled the skip snapshot backfill by using
{{{}PostgresSourceBuilder.PostgresIncrementalSource#skipSnapshotBackfill(true){}}},
and the task index replication slots have been created. However, I've noticed
that {{PostgresScanFetchTask#maybeCreateSlotForBackFillReadTask}} is bypassing
the task-based replication slot when skipSnapshotBackfill is set to true. I'm
currently debugging the code to determine where 'backfillslottest_0' is used in
the process.
when set to true:
{code:java}
flink-postgres-1 | 2024-10-28 04:57:26.240 UTC [212] LOG: logical decoding
found consistent point at 0/1677190
flink-postgres-1 | 2024-10-28 04:57:26.240 UTC [212] DETAIL: There are no
running transactions.
flink-postgres-1 | 2024-10-28 04:57:26.240 UTC [212] LOG: exported logical
decoding snapshot: "00000006-0000013E-1" with 0 transaction IDs
flink-postgres-1 | 2024-10-28 04:57:27.291 UTC [214] ERROR: replication slot
"backfillslottest_0" does not exist
flink-postgres-1 | 2024-10-28 04:57:27.291 UTC [214] STATEMENT: select
pg_drop_replication_slot('backfillslottest_0')
flink-postgres-1 | 2024-10-28 04:57:27.734 UTC [216] LOG: starting logical
decoding for slot "backfillslottest"
flink-postgres-1 | 2024-10-28 04:57:27.734 UTC [216] DETAIL: Streaming
transactions committing after 0/16771C8, reading WAL from 0/1677190.
flink-postgres-1 | 2024-10-28 04:57:27.735 UTC [216] LOG: logical decoding
found consistent point at 0/1677190
flink-postgres-1 | 2024-10-28 04:57:27.735 UTC [216] DETAIL: There are no
running transactions.
flink-postgres-1 | 2024-10-28 04:57:27.846 UTC [218] LOG: starting logical
decoding for slot "backfillslottest"
flink-postgres-1 | 2024-10-28 04:57:27.846 UTC [218] DETAIL: Streaming
transactions committing after 0/16771C8, reading WAL from 0/1677190.
flink-postgres-1 | 2024-10-28 04:57:27.847 UTC [218] LOG: logical decoding
found consistent point at 0/1677190
flink-postgres-1 | 2024-10-28 04:57:27.847 UTC [218] DETAIL: There are no
running transactions. {code}
when set to false:
{code:java}
flink-postgres-1 | 2024-10-28 04:56:55.409 UTC [199] LOG: logical decoding
found consistent point at 0/1677090
flink-postgres-1 | 2024-10-28 04:56:55.409 UTC [199] DETAIL: There are no
running transactions.
flink-postgres-1 | 2024-10-28 04:56:55.409 UTC [199] LOG: exported logical
decoding snapshot: "00000007-0000001A-1" with 0 transaction IDs
flink-postgres-1 | 2024-10-28 04:56:55.614 UTC [199] LOG: starting logical
decoding for slot "nobackfillslot_0"
flink-postgres-1 | 2024-10-28 04:56:55.614 UTC [199] DETAIL: Streaming
transactions committing after 0/16770C8, reading WAL from 0/1677090.
flink-postgres-1 | 2024-10-28 04:56:55.615 UTC [199] LOG: logical decoding
found consistent point at 0/1677090
flink-postgres-1 | 2024-10-28 04:56:55.615 UTC [199] DETAIL: There are no
running transactions.
flink-postgres-1 | 2024-10-28 04:56:56.238 UTC [201] LOG: starting logical
decoding for slot "nobackfillslot_0"
flink-postgres-1 | 2024-10-28 04:56:56.238 UTC [201] DETAIL: Streaming
transactions committing after 0/16770C8, reading WAL from 0/1677090.
flink-postgres-1 | 2024-10-28 04:56:56.239 UTC [201] LOG: logical decoding
found consistent point at 0/1677090
flink-postgres-1 | 2024-10-28 04:56:56.239 UTC [201] DETAIL: There are no
running transactions.
flink-postgres-1 | 2024-10-28 04:56:56.295 UTC [203] ERROR: replication slot
"nobackfillslot_0" does not exist
flink-postgres-1 | 2024-10-28 04:56:56.295 UTC [203] STATEMENT: select
pg_drop_replication_slot('nobackfillslot_0')
flink-postgres-1 | 2024-10-28 04:56:56.391 UTC [204] ERROR: replication slot
"nobackfillslot_0" does not exist
flink-postgres-1 | 2024-10-28 04:56:56.391 UTC [204] STATEMENT: select
pg_drop_replication_slot('nobackfillslot_0')
flink-postgres-1 | 2024-10-28 04:56:56.765 UTC [206] LOG: starting logical
decoding for slot "nobackfillslot"
flink-postgres-1 | 2024-10-28 04:56:56.765 UTC [206] DETAIL: Streaming
transactions committing after 0/1677060, reading WAL from 0/1677028.
flink-postgres-1 | 2024-10-28 04:56:56.766 UTC [206] LOG: logical decoding
found consistent point at 0/1677028
flink-postgres-1 | 2024-10-28 04:56:56.766 UTC [206] DETAIL: There are no
running transactions.
flink-postgres-1 | 2024-10-28 04:57:04.548 UTC [209] LOG: starting logical
decoding for slot "nobackfillslot"
flink-postgres-1 | 2024-10-28 04:57:04.548 UTC [209] DETAIL: Streaming
transactions committing after 0/1677060, reading WAL from 0/1677028.
flink-postgres-1 | 2024-10-28 04:57:04.548 UTC [209] LOG: logical decoding
found consistent point at 0/1677028
flink-postgres-1 | 2024-10-28 04:57:04.548 UTC [209] DETAIL: There are no
running transactions. {code}
Followup question on issue -1:
streamsplit created
[here|https://github.com/apache/flink-cdc/blob/master/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/HybridSplitAssigner.java#L140]
(even when the startup mode is set to snapshot only and new table scans are
disabled) is different from the backfill task created
[here|[https://github.com/apache/flink-cdc/blob/master/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java#L123].]
Could you please clarify ?
Backfill task is submitted at the end of the snapshot task and not at the end
of snapshot (i.e on completion of all snapshot tasks) ?
> PostgresIncrementalSource is not exiting the flink execution when
> StartupOptions is snapshot and create multiple replication slots
> ----------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-36584
> URL: https://issues.apache.org/jira/browse/FLINK-36584
> Project: Flink
> Issue Type: Bug
> Components: Flink CDC
> Affects Versions: 3.0.0
> Reporter: Anil Dasari
> Priority: Major
>
> Issue-1. PostgresIncrementalSource is not exiting the Flink execution when
> StartupOptions is snapshot.
>
> Postgres cdc module is using HybridSplitAssigner for batch scan and is
> [https://github.com/apache/flink-cdc/blob/master/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/HybridSplitAssigner.java#L128]
> still trying to create streaming split when snapshot splits are completed.
>
> Issue-2. When source parallelism is > 1 i.e 2, PostgresIncrementalSource is
> creating multiple replication slots.
>
> postgres logs:
> {code:java}
> flink-postgres-1 | 2024-10-22 18:56:57.649 UTC [48] LOG: logical
> decoding found consistent point at 0/1690B28
> flink-postgres-1 | 2024-10-22 18:56:57.649 UTC [48] DETAIL: There are no
> running transactions.
> flink-postgres-1 | 2024-10-22 18:56:57.649 UTC [48] LOG: exported
> logical decoding snapshot: "00000006-00000003-1" with 0 transaction IDs
> flink-postgres-1 | 2024-10-22 18:56:58.226 UTC [51] LOG: logical
> decoding found consistent point at 0/1690BF8
> flink-postgres-1 | 2024-10-22 18:56:58.226 UTC [51] DETAIL: There are no
> running transactions.
> flink-postgres-1 | 2024-10-22 18:56:58.226 UTC [51] LOG: exported
> logical decoding snapshot: "00000008-00000003-1" with 0 transaction IDs
> flink-postgres-1 | 2024-10-22 18:56:58.266 UTC [52] LOG: logical
> decoding found consistent point at 0/1690C30
> flink-postgres-1 | 2024-10-22 18:56:58.266 UTC [52] DETAIL: There are no
> running transactions.
> flink-postgres-1 | 2024-10-22 18:56:58.267 UTC [52] LOG: exported
> logical decoding snapshot: "00000009-00000003-1" with 0 transaction IDs
> flink-postgres-1 | 2024-10-22 18:56:58.612 UTC [51] LOG: starting
> logical decoding for slot "flinkpostgres_0"
> flink-postgres-1 | 2024-10-22 18:56:58.612 UTC [51] DETAIL: Streaming
> transactions committing after 0/1690C30, reading WAL from 0/1690BF8.
> flink-postgres-1 | 2024-10-22 18:56:58.614 UTC [51] LOG: logical
> decoding found consistent point at 0/1690BF8
> flink-postgres-1 | 2024-10-22 18:56:58.614 UTC [51] DETAIL: There are no
> running transactions.
> flink-postgres-1 | 2024-10-22 18:56:58.753 UTC [56] ERROR: replication
> slot "flinkpostgres_1" does not exist
> flink-postgres-1 | 2024-10-22 18:56:58.753 UTC [56] STATEMENT: select
> pg_drop_replication_slot('flinkpostgres_1')
> flink-postgres-1 | 2024-10-22 18:56:59.347 UTC [57] LOG: starting
> logical decoding for slot "flinkpostgres_0"
> flink-postgres-1 | 2024-10-22 18:56:59.347 UTC [57] DETAIL: Streaming
> transactions committing after 0/1690C30, reading WAL from 0/1690C30.
> flink-postgres-1 | 2024-10-22 18:56:59.348 UTC [57] LOG: logical
> decoding found consistent point at 0/1690C30
> flink-postgres-1 | 2024-10-22 18:56:59.348 UTC [57] DETAIL: There are no
> running transactions.
> flink-postgres-1 | 2024-10-22 18:56:59.423 UTC [59] ERROR: replication
> slot "flinkpostgres_0" does not exist
> flink-postgres-1 | 2024-10-22 18:56:59.423 UTC [59] STATEMENT: select
> pg_drop_replication_slot('flinkpostgres_0')
> flink-postgres-1 | 2024-10-22 18:56:59.673 UTC [60] ERROR: replication
> slot "flinkpostgres_0" does not exist
> flink-postgres-1 | 2024-10-22 18:56:59.673 UTC [60] STATEMENT: select
> pg_drop_replication_slot('flinkpostgres_0') {code}
> Setup:
>
> flink-cdc version : 3.2.0
> flink version: 1.19
> Steps to reproduce the issue:
>
> 1. main code:
> {code:java}
> DebeziumDeserializationSchema<String> deserializer =
> new JsonDebeziumDeserializationSchema();
> JdbcIncrementalSource<String> postgresIncrementalSource =
>
> PostgresSourceBuilder.PostgresIncrementalSource.<String>builder()
> .startupOptions(StartupOptions.snapshot())
> .hostname("localhost")
> .port(5432)
> .database("test")
> .schemaList("public")
> .username("postgres")
> .password("postgres")
> .slotName("flinkpostgres")
> .decodingPluginName("pgoutput")
> .deserializer(deserializer)
> // .splitSize(2)
> .build();
>
> Configuration config = new Configuration();
> config.set(RestartStrategyOptions.RESTART_STRATEGY, "none");
> config.setString("heartbeat.interval", "6000000"); // 100 minutes
> config.setString("heartbeat.timeout", "18000000");
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment(config);
> env.enableCheckpointing(300000);
> env.fromSource(
> postgresIncrementalSource,
> WatermarkStrategy.noWatermarks(),
> "PostgresParallelSource")
> .setParallelism(2)
> .print();
> env.execute("Output Postgres Snapshot"); {code}
> 2. Create two tables and records in postgres
> 3. Run step#1 code.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)