tchivs created FLINK-36164:
------------------------------

             Summary: JdbcIncrementalSource CheckPoint Timeout Due to 
Retrieving Schemas for All Subpartitions When Synchronizing a Partitioned 
PostgreSQL Table
                 Key: FLINK-36164
                 URL: https://issues.apache.org/jira/browse/FLINK-36164
             Project: Flink
          Issue Type: Improvement
          Components: Flink CDC
    Affects Versions: cdc-3.1.1
            Reporter: tchivs


When synchronizing a PostgreSQL table using a connector, if the table contains 
a large number of partitions, the checkpoint always fails. By tracing the 
source code, it was found that the PostgresDialect's queryTableSchema queries 
the schema of each table. This schema querying during each checkpoint causes 
connection timeouts.
 
 

 

tableName="(public)\.(aia_t_icc_jjdb.*|aia_t_vcs_fkdb.*|aia_t_vcs_pjdb.*|aia_t_vcs_dsrdb|aia_t_vcs_zjdb|case_log_test)"

```
JdbcIncrementalSource<String> incrSource =
PostgresSourceBuilder.PostgresIncrementalSource.<String>builder()
.hostname(hostname)
.port(port)
.database(databaseName)
.schemaList(schemaName)
.tableList(tableName)
.username(username)
.password(password)
.deserializer(schema)
.slotName(slotName)
.decodingPluginName(config.get(DECODING_PLUGIN_NAME))
.includeSchemaChanges(true)
.debeziumProperties(debeziumProperties)
.startupOptions(startupOptions)
.splitSize(config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE))
.splitMetaGroupSize(config.get(CHUNK_META_GROUP_SIZE))
.fetchSize(config.get(SCAN_SNAPSHOT_FETCH_SIZE))
.connectTimeout(config.get(CONNECT_TIMEOUT))
.connectionPoolSize(config.get(CONNECTION_POOL_SIZE))
.connectMaxRetries(config.get(CONNECT_MAX_RETRIES))
.distributionFactorUpper(
config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND))
.distributionFactorLower(
config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND))
.heartbeatInterval(config.get(HEARTBEAT_INTERVAL))
.build();
return env.fromSource(
incrSource, WatermarkStrategy.noWatermarks(), "Postgres IncrSource");
```

 

 


check point Exception:

```
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable 
failure threshold. The latest checkpoint failed due to Checkpoint expired 
before completing., view the Checkpoint History tab or the Job Manager log to 
find out why continuous checkpoints failed.
```
![image](https://github.com/user-attachments/assets/183f44fd-0ab1-4302-9f13-ed5242c43636)


error log:

```logger
2024-05-28 15:39:07,367 INFO org.apache.flink.runtime.taskmanager.Task [] - 
Freeing task resources for jjdb: Writer -> jjdb: Committer (1/1)#0 
(dbec67a546f03f19bae3a56726a02174_788573959fc6fd87fb6bfd0ffc27d896_0_0).
2024-05-28 15:39:07,370 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor 
[] - Un-registering task and sending final execution state CANCELED to 
JobManager for task fkdb: Writer -> fkdb: Committer (1/1)#0 
dbec67a546f03f19bae3a56726a02174_1992c3287130e2f49268dbcc909a7d1d_0_0.
2024-05-28 15:39:07,370 INFO org.apache.http.impl.execchain.RetryExec [] - I/O 
exception (java.net.SocketException) caught when processing request to 
{}->http://192.168.0.168:8040: Socket closed
2024-05-28 15:39:07,378 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor 
[] - Un-registering task and sending final execution state CANCELED to 
JobManager for task jjdb: Writer -> jjdb: Committer (1/1)#0 
dbec67a546f03f19bae3a56726a02174_788573959fc6fd87fb6bfd0ffc27d896_0_0.
2024-05-28 15:39:37,363 WARN org.apache.flink.runtime.taskmanager.Task [] - 
Task 'Source: Postgres Source -> Process (1/1)#0' did not react to cancelling 
signal - interrupting; it is stuck for 30 seconds in method:
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
org.apache.flink.runtime.taskmanager.Task$$Lambda$813/1723769838.run(Unknown 
Source)
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
java.lang.Thread.run(Thread.java:750)

2024-05-28 15:39:37,372 INFO io.debezium.embedded.EmbeddedEngine [] - Stopping 
the embedded engine
2024-05-28 15:39:37,373 ERROR com.ververica.cdc.debezium.internal.Handover [] - 
Reporting error:
java.lang.InterruptedException: null
at java.lang.Object.wait(Native Method) ~[?:1.8.0_381]
at java.lang.Object.wait(Object.java:502) ~[?:1.8.0_381]
at com.ververica.cdc.debezium.internal.Handover.produce(Handover.java:115) 
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
com.ververica.cdc.debezium.internal.DebeziumChangeConsumer.handleBatch(DebeziumChangeConsumer.java:54)
 ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
io.debezium.embedded.ConvertingEngineBuilder.lambda$notifying$2(ConvertingEngineBuilder.java:83)
 ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:822) 
[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:192)
 [flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_381]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_381]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_381]
2024-05-28 15:39:37,375 INFO io.debezium.embedded.EmbeddedEngine [] - Stopping 
the task and engine
2024-05-28 15:39:37,375 INFO io.debezium.connector.common.BaseSourceTask [] - 
Stopping down connector
2024-05-28 15:41:07,376 WARN io.debezium.pipeline.ChangeEventSourceCoordinator 
[] - Coordinator didn't stop in the expected time, shutting down executor now
2024-05-28 15:41:07,377 WARN 
io.debezium.pipeline.source.AbstractSnapshotChangeEventSource [] - Snapshot was 
interrupted before completion
2024-05-28 15:41:07,377 INFO 
io.debezium.pipeline.source.AbstractSnapshotChangeEventSource [] - Snapshot - 
Final stage
2024-05-28 15:41:07,377 WARN io.debezium.pipeline.ChangeEventSourceCoordinator 
[] - Change event source executor was interrupted
java.lang.InterruptedException: null
at java.lang.Object.wait(Native Method) ~[?:1.8.0_381]
at 
io.debezium.connector.base.ChangeEventQueue.doEnqueue(ChangeEventQueue.java:204)
 ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
io.debezium.connector.base.ChangeEventQueue.enqueue(ChangeEventQueue.java:169) 
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
io.debezium.pipeline.EventDispatcher$BufferingSnapshotChangeRecordReceiver.changeRecord(EventDispatcher.java:440)
 ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
io.debezium.pipeline.EventDispatcher$1.changeRecord(EventDispatcher.java:166) 
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
io.debezium.relational.RelationalChangeRecordEmitter.emitReadRecord(RelationalChangeRecordEmitter.java:120)
 ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:57)
 ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
io.debezium.pipeline.EventDispatcher.dispatchSnapshotEvent(EventDispatcher.java:155)
 ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEventsForTable(RelationalSnapshotChangeEventSource.java:407)
 ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEvents(RelationalSnapshotChangeEventSource.java:316)
 ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:132)
 ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:76)
 ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:155)
 ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:137)
 ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:109)
 ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[?:1.8.0_381]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_381]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_381]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_381]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_381]
2024-05-28 15:41:07,378 INFO io.debezium.pipeline.ChangeEventSourceCoordinator 
[] - Connected metrics set to 'false'
2024-05-28 15:41:07,380 INFO io.debezium.jdbc.JdbcConnection [] - Connection 
gracefully closed
```

![image](https://github.com/user-attachments/assets/5c5aabd8-89b1-4dd8-8872-e576a875569f)


### What You Expected?


When synchronizing a PostgreSQL table using a connector, if the table contains 
a large number of partitions, the checkpoint always fails. By tracing the 
source code, it was found that the `PostgresDialect`'s `queryTableSchema` 
queries the schema of each table. This schema querying during each checkpoint 
causes connection timeouts.

**Solution:** Use caching. Each partitioned table should match and fetch the 
schema only once using the following parameters:
  - `--multi-to-one-origin "jjdb_.*|fkdb_.*|pjdb_.*"`
  - `--multi-to-one-target "dwd_jjdb|dwdfkdb|dwd_pjdb"`


### How to Reproduce?

**Steps to Reproduce:**
1. Add a partitioned table in PostgreSQL.
2. Create partitions for nearly ten years.
3. Synchronize this table.

### Anything Else?
**Method Modification:**
I modified the method as follows and it works well:

```java
@Override
public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc, TableId 
tableId) {
    long startTime = System.nanoTime(); // Record start time
    String name = this.tableNameConverter.convert(tableId.table());
    TableId parentTableId = new TableId(null, tableId.schema(), name);
    TableChanges.TableChange tableChange = cache.get(parentTableId);
    if (tableChange == null) {
        LOG.info("[queryTableSchema begin] {}", tableId.identifier());
        if (schema == null) {
            schema = new CustomPostgresSchema((PostgresConnection) jdbc, 
sourceConfig);
        }
        tableChange = schema.getTableSchema(tableId);
        LOG.info("[queryTableSchema end] {}", tableId.identifier());
        cache.put(parentTableId, tableChange);
    }
    long endTime = System.nanoTime(); 
    long duration = endTime - startTime; 
    LOG.info("[queryTableSchema duration] {} {} ms", tableId.identifier(), 
duration / 1_000_000); // Convert nanoseconds to milliseconds
    return tableChange;
}
```

I am willing to submit a PR!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to