[ 
https://issues.apache.org/jira/browse/FLINK-36164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tchivs updated FLINK-36164:
---------------------------
    Description: 
What's Wrong?
When synchronizing a PostgreSQL table using a connector, if the table contains 
a large number of partitions, the checkpoint always fails. By tracing the 
source code, it was found that the PostgresDialect's queryTableSchema queries 
the schema of each table. This schema querying during each checkpoint causes 
connection timeouts.


check point Exception:

org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable 
failure threshold. The latest checkpoint failed due to Checkpoint expired 
before completing., view the Checkpoint History tab or the Job Manager log to 
find out why continuous checkpoints failed.
image

error log:

 
{code:java}
2024-05-28 15:39:07,367 INFO org.apache.flink.runtime.taskmanager.Task [] - 
Freeing task resources for jjdb: Writer -> jjdb: Committer (1/1)#0 
(dbec67a546f03f19bae3a56726a02174_788573959fc6fd87fb6bfd0ffc27d896_0_0).
2024-05-28 15:39:07,370 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor 
[] - Un-registering task and sending final execution state CANCELED to 
JobManager for task fkdb: Writer -> fkdb: Committer (1/1)#0 
dbec67a546f03f19bae3a56726a02174_1992c3287130e2f49268dbcc909a7d1d_0_0.
2024-05-28 15:39:07,370 INFO org.apache.http.impl.execchain.RetryExec [] - I/O 
exception (java.net.SocketException) caught when processing request to 
{}->http://192.168.0.168:8040: Socket closed
2024-05-28 15:39:07,378 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor 
[] - Un-registering task and sending final execution state CANCELED to 
JobManager for task jjdb: Writer -> jjdb: Committer (1/1)#0 
dbec67a546f03f19bae3a56726a02174_788573959fc6fd87fb6bfd0ffc27d896_0_0.
2024-05-28 15:39:37,363 WARN org.apache.flink.runtime.taskmanager.Task [] - 
Task 'Source: Postgres Source -> Process (1/1)#0' did not react to cancelling 
signal - interrupting; it is stuck for 30 seconds in method:
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
org.apache.flink.runtime.taskmanager.Task$$Lambda$813/1723769838.run(Unknown 
Source)
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
java.lang.Thread.run(Thread.java:750)
2024-05-28 15:39:37,372 INFO io.debezium.embedded.EmbeddedEngine [] - Stopping 
the embedded engine
2024-05-28 15:39:37,373 ERROR com.ververica.cdc.debezium.internal.Handover [] - 
Reporting error:
java.lang.InterruptedException: null
at java.lang.Object.wait(Native Method) ~[?:1.8.0_381]
at java.lang.Object.wait(Object.java:502) ~[?:1.8.0_381]
at com.ververica.cdc.debezium.internal.Handover.produce(Handover.java:115) 
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
com.ververica.cdc.debezium.internal.DebeziumChangeConsumer.handleBatch(DebeziumChangeConsumer.java:54)
 ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
io.debezium.embedded.ConvertingEngineBuilder.lambda$notifying$2(ConvertingEngineBuilder.java:83)
 ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:822) 
[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:192)
 [flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_381]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_381]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_381]
2024-05-28 15:39:37,375 INFO io.debezium.embedded.EmbeddedEngine [] - Stopping 
the task and engine
2024-05-28 15:39:37,375 INFO io.debezium.connector.common.BaseSourceTask [] - 
Stopping down connector
2024-05-28 15:41:07,376 WARN io.debezium.pipeline.ChangeEventSourceCoordinator 
[] - Coordinator didn't stop in the expected time, shutting down executor now
2024-05-28 15:41:07,377 WARN 
io.debezium.pipeline.source.AbstractSnapshotChangeEventSource [] - Snapshot was 
interrupted before completion
2024-05-28 15:41:07,377 INFO 
io.debezium.pipeline.source.AbstractSnapshotChangeEventSource [] - Snapshot - 
Final stage
2024-05-28 15:41:07,377 WARN io.debezium.pipeline.ChangeEventSourceCoordinator 
[] - Change event source executor was interrupted
java.lang.InterruptedException: null
at java.lang.Object.wait(Native Method) ~[?:1.8.0_381]
at 
io.debezium.connector.base.ChangeEventQueue.doEnqueue(ChangeEventQueue.java:204)
 ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
io.debezium.connector.base.ChangeEventQueue.enqueue(ChangeEventQueue.java:169) 
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
io.debezium.pipeline.EventDispatcher$BufferingSnapshotChangeRecordReceiver.changeRecord(EventDispatcher.java:440)
 ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
io.debezium.pipeline.EventDispatcher$1.changeRecord(EventDispatcher.java:166) 
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
io.debezium.relational.RelationalChangeRecordEmitter.emitReadRecord(RelationalChangeRecordEmitter.java:120)
 ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:57)
 ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
io.debezium.pipeline.EventDispatcher.dispatchSnapshotEvent(EventDispatcher.java:155)
 ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEventsForTable(RelationalSnapshotChangeEventSource.java:407)
 ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEvents(RelationalSnapshotChangeEventSource.java:316)
 ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:132)
 ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:76)
 ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:155)
 ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:137)
 ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:109)
 ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[?:1.8.0_381]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_381]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_381]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_381]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_381]
2024-05-28 15:41:07,378 INFO io.debezium.pipeline.ChangeEventSourceCoordinator 
[] - Connected metrics set to 'false'
2024-05-28 15:41:07,380 INFO io.debezium.jdbc.JdbcConnection [] - Connection 
gracefully closed
{code}

image

What You Expected?
When synchronizing a PostgreSQL table using a connector, if the table contains 
a large number of partitions, the checkpoint always fails. By tracing the 
source code, it was found that the 's queries the schema of each table. This 
schema querying during each checkpoint causes connection 
timeouts.PostgresDialectqueryTableSchema

Solution: Use caching. Each partitioned table should match and fetch the schema 
only once using the following parameters:

 
tableList=(public)\.(aia_t_icc_jjdb.*|aia_t_vcs_fkdb.*|aia_t_vcs_pjdb.*|aia_t_vcs_dsrdb|aia_t_vcs_zjdb|case_log_test)
How to Reproduce?
Steps to Reproduce:
Add a partitioned table in PostgreSQL.
Create partitions for nearly ten years.
Synchronize this table.
Anything Else?
Method Modification:
I modified the method as follows and it works well:

 
{code:java}
@Override
public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc, TableId 
tableId) {
long startTime = System.nanoTime(); // Record start time
String name = this.tableNameConverter.convert(tableId.table());
TableId parentTableId = new TableId(null, tableId.schema(), name);
TableChanges.TableChange tableChange = cache.get(parentTableId);
if (tableChange == null) {
LOG.info("[queryTableSchema begin] {}", tableId.identifier());
if (schema == null)
{ schema = new CustomPostgresSchema((PostgresConnection) jdbc, sourceConfig); }
tableChange = schema.getTableSchema(tableId);
LOG.info("[queryTableSchema end] {}", tableId.identifier());
cache.put(parentTableId, tableChange);
}
long endTime = System.nanoTime(); 
long duration = endTime - startTime; 
LOG.info("[queryTableSchema duration] {} {} ms", tableId.identifier(), duration 
/ 1_000_000); // Convert nanoseconds to milliseconds
return tableChange;
}
{code}


 I am willing to submit a PR!

  was:
When synchronizing a PostgreSQL table using a connector, if the table contains 
a large number of partitions, the checkpoint always fails. By tracing the 
source code, it was found that the PostgresDialect's queryTableSchema queries 
the schema of each table. This schema querying during each checkpoint causes 
connection timeouts.
 
 

 

tableName value is 
"(public)\.(aia_t_icc_jjdb.{*}|aia_t_vcs_fkdb.{*}|aia_t_vcs_pjdb.*|aia_t_vcs_dsrdb|aia_t_vcs_zjdb|case_log_test)"

 


JdbcIncrementalSource<String> incrSource =
PostgresSourceBuilder.PostgresIncrementalSource.<String>builder()
.hostname(hostname)
.port(port)
.database(databaseName)
.schemaList(schemaName)
.tableList(tableName)
.username(username)
.password(password)
.deserializer(schema)
.slotName(slotName)
.decodingPluginName(config.get(DECODING_PLUGIN_NAME))
.includeSchemaChanges(true)
.debeziumProperties(debeziumProperties)
.startupOptions(startupOptions)
.splitSize(config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE))
.splitMetaGroupSize(config.get(CHUNK_META_GROUP_SIZE))
.fetchSize(config.get(SCAN_SNAPSHOT_FETCH_SIZE))
.connectTimeout(config.get(CONNECT_TIMEOUT))
.connectionPoolSize(config.get(CONNECTION_POOL_SIZE))
.connectMaxRetries(config.get(CONNECT_MAX_RETRIES))
.distributionFactorUpper(
config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND))
.distributionFactorLower(
config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND))
.heartbeatInterval(config.get(HEARTBEAT_INTERVAL))
.build();
return env.fromSource(
incrSource, WatermarkStrategy.noWatermarks(), "Postgres IncrSource");



 

 

check point Exception:

```
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable 
failure threshold. The latest checkpoint failed due to Checkpoint expired 
before completing., view the Checkpoint History tab or the Job Manager log to 
find out why continuous checkpoints failed.
```
![image]([https://github.com/user-attachments/assets/183f44fd-0ab1-4302-9f13-ed5242c43636])

error log:

```logger
2024-05-28 15:39:07,367 INFO org.apache.flink.runtime.taskmanager.Task [] - 
Freeing task resources for jjdb: Writer -> jjdb: Committer (1/1)#0 
(dbec67a546f03f19bae3a56726a02174_788573959fc6fd87fb6bfd0ffc27d896_0_0).
2024-05-28 15:39:07,370 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor 
[] - Un-registering task and sending final execution state CANCELED to 
JobManager for task fkdb: Writer -> fkdb: Committer (1/1)#0 
dbec67a546f03f19bae3a56726a02174_1992c3287130e2f49268dbcc909a7d1d_0_0.
2024-05-28 15:39:07,370 INFO org.apache.http.impl.execchain.RetryExec [] - I/O 
exception (java.net.SocketException) caught when processing request to 
{}->[http://192.168.0.168:8040:] Socket closed
2024-05-28 15:39:07,378 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor 
[] - Un-registering task and sending final execution state CANCELED to 
JobManager for task jjdb: Writer -> jjdb: Committer (1/1)#0 
dbec67a546f03f19bae3a56726a02174_788573959fc6fd87fb6bfd0ffc27d896_0_0.
2024-05-28 15:39:37,363 WARN org.apache.flink.runtime.taskmanager.Task [] - 
Task 'Source: Postgres Source -> Process (1/1)#0' did not react to cancelling 
signal - interrupting; it is stuck for 30 seconds in method:
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
org.apache.flink.runtime.taskmanager.Task$$Lambda$813/1723769838.run(Unknown 
Source)
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
java.lang.Thread.run(Thread.java:750)

2024-05-28 15:39:37,372 INFO io.debezium.embedded.EmbeddedEngine [] - Stopping 
the embedded engine
2024-05-28 15:39:37,373 ERROR com.ververica.cdc.debezium.internal.Handover [] - 
Reporting error:
java.lang.InterruptedException: null
at java.lang.Object.wait(Native Method) ~[?:1.8.0_381]
at java.lang.Object.wait(Object.java:502) ~[?:1.8.0_381]
at com.ververica.cdc.debezium.internal.Handover.produce(Handover.java:115) 
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
com.ververica.cdc.debezium.internal.DebeziumChangeConsumer.handleBatch(DebeziumChangeConsumer.java:54)
 ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
io.debezium.embedded.ConvertingEngineBuilder.lambda$notifying$2(ConvertingEngineBuilder.java:83)
 ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:822) 
[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:192)
 [flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_381]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_381]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_381]
2024-05-28 15:39:37,375 INFO io.debezium.embedded.EmbeddedEngine [] - Stopping 
the task and engine
2024-05-28 15:39:37,375 INFO io.debezium.connector.common.BaseSourceTask [] - 
Stopping down connector
2024-05-28 15:41:07,376 WARN io.debezium.pipeline.ChangeEventSourceCoordinator 
[] - Coordinator didn't stop in the expected time, shutting down executor now
2024-05-28 15:41:07,377 WARN 
io.debezium.pipeline.source.AbstractSnapshotChangeEventSource [] - Snapshot was 
interrupted before completion
2024-05-28 15:41:07,377 INFO 
io.debezium.pipeline.source.AbstractSnapshotChangeEventSource [] - Snapshot - 
Final stage
2024-05-28 15:41:07,377 WARN io.debezium.pipeline.ChangeEventSourceCoordinator 
[] - Change event source executor was interrupted
java.lang.InterruptedException: null
at java.lang.Object.wait(Native Method) ~[?:1.8.0_381]
at 
io.debezium.connector.base.ChangeEventQueue.doEnqueue(ChangeEventQueue.java:204)
 ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
io.debezium.connector.base.ChangeEventQueue.enqueue(ChangeEventQueue.java:169) 
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
io.debezium.pipeline.EventDispatcher$BufferingSnapshotChangeRecordReceiver.changeRecord(EventDispatcher.java:440)
 ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
io.debezium.pipeline.EventDispatcher$1.changeRecord(EventDispatcher.java:166) 
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
io.debezium.relational.RelationalChangeRecordEmitter.emitReadRecord(RelationalChangeRecordEmitter.java:120)
 ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:57)
 ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
io.debezium.pipeline.EventDispatcher.dispatchSnapshotEvent(EventDispatcher.java:155)
 ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEventsForTable(RelationalSnapshotChangeEventSource.java:407)
 ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEvents(RelationalSnapshotChangeEventSource.java:316)
 ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:132)
 ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:76)
 ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:155)
 ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:137)
 ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at 
io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:109)
 ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[?:1.8.0_381]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_381]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_381]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_381]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_381]
2024-05-28 15:41:07,378 INFO io.debezium.pipeline.ChangeEventSourceCoordinator 
[] - Connected metrics set to 'false'
2024-05-28 15:41:07,380 INFO io.debezium.jdbc.JdbcConnection [] - Connection 
gracefully closed
```

![image]([https://github.com/user-attachments/assets/5c5aabd8-89b1-4dd8-8872-e576a875569f])
 # 
 ## 
 ### What You Expected?

When synchronizing a PostgreSQL table using a connector, if the table contains 
a large number of partitions, the checkpoint always fails. By tracing the 
source code, it was found that the `PostgresDialect`'s `queryTableSchema` 
queries the schema of each table. This schema querying during each checkpoint 
causes connection timeouts.

*{*}Solution:{*}* Use caching. Each partitioned table should match and fetch 
the schema only once using the following parameters:
  - `--multi-to-one-origin "jjdb_.{*}|fkdb_.{*}|pjdb_.*"`
  - `--multi-to-one-target "dwd_jjdb|dwdfkdb|dwd_pjdb"`
 # 
 ## 
 ### How to Reproduce?

*{*}Steps to Reproduce:{*}*
1. Add a partitioned table in PostgreSQL.
2. Create partitions for nearly ten years.
3. Synchronize this table.
 # 
 ## 
 ### Anything Else?
*{*}Method Modification:{*}*
I modified the method as follows and it works well:

```java
@Override
public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc, TableId 
tableId) {
    long startTime = System.nanoTime(); // Record start time
    String name = this.tableNameConverter.convert(tableId.table());
    TableId parentTableId = new TableId(null, tableId.schema(), name);
    TableChanges.TableChange tableChange = cache.get(parentTableId);
    if (tableChange == null) {
        LOG.info("[queryTableSchema begin] {}", tableId.identifier());
        if (schema == null)

{             schema = new CustomPostgresSchema((PostgresConnection) jdbc, 
sourceConfig);         }

        tableChange = schema.getTableSchema(tableId);
        LOG.info("[queryTableSchema end] {}", tableId.identifier());
        cache.put(parentTableId, tableChange);
    }
    long endTime = System.nanoTime(); 
    long duration = endTime - startTime; 
    LOG.info("[queryTableSchema duration] {} {} ms", tableId.identifier(), 
duration / 1_000_000); // Convert nanoseconds to milliseconds
    return tableChange;
}
```

I am willing to submit a PR!


> JdbcIncrementalSource CheckPoint Timeout Due to Retrieving Schemas for All 
> Subpartitions When Synchronizing a Partitioned PostgreSQL Table
> ------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-36164
>                 URL: https://issues.apache.org/jira/browse/FLINK-36164
>             Project: Flink
>          Issue Type: Improvement
>          Components: Flink CDC
>    Affects Versions: cdc-3.1.1
>            Reporter: tchivs
>            Priority: Major
>
> What's Wrong?
> When synchronizing a PostgreSQL table using a connector, if the table 
> contains a large number of partitions, the checkpoint always fails. By 
> tracing the source code, it was found that the PostgresDialect's 
> queryTableSchema queries the schema of each table. This schema querying 
> during each checkpoint causes connection timeouts.
> check point Exception:
> org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable 
> failure threshold. The latest checkpoint failed due to Checkpoint expired 
> before completing., view the Checkpoint History tab or the Job Manager log to 
> find out why continuous checkpoints failed.
> image
> error log:
>  
> {code:java}
> 2024-05-28 15:39:07,367 INFO org.apache.flink.runtime.taskmanager.Task [] - 
> Freeing task resources for jjdb: Writer -> jjdb: Committer (1/1)#0 
> (dbec67a546f03f19bae3a56726a02174_788573959fc6fd87fb6bfd0ffc27d896_0_0).
> 2024-05-28 15:39:07,370 INFO 
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task 
> and sending final execution state CANCELED to JobManager for task fkdb: 
> Writer -> fkdb: Committer (1/1)#0 
> dbec67a546f03f19bae3a56726a02174_1992c3287130e2f49268dbcc909a7d1d_0_0.
> 2024-05-28 15:39:07,370 INFO org.apache.http.impl.execchain.RetryExec [] - 
> I/O exception (java.net.SocketException) caught when processing request to 
> {}->http://192.168.0.168:8040: Socket closed
> 2024-05-28 15:39:07,378 INFO 
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task 
> and sending final execution state CANCELED to JobManager for task jjdb: 
> Writer -> jjdb: Committer (1/1)#0 
> dbec67a546f03f19bae3a56726a02174_788573959fc6fd87fb6bfd0ffc27d896_0_0.
> 2024-05-28 15:39:37,363 WARN org.apache.flink.runtime.taskmanager.Task [] - 
> Task 'Source: Postgres Source -> Process (1/1)#0' did not react to cancelling 
> signal - interrupting; it is stuck for 30 seconds in method:
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
> org.apache.flink.runtime.taskmanager.Task$$Lambda$813/1723769838.run(Unknown 
> Source)
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> java.lang.Thread.run(Thread.java:750)
> 2024-05-28 15:39:37,372 INFO io.debezium.embedded.EmbeddedEngine [] - 
> Stopping the embedded engine
> 2024-05-28 15:39:37,373 ERROR com.ververica.cdc.debezium.internal.Handover [] 
> - Reporting error:
> java.lang.InterruptedException: null
> at java.lang.Object.wait(Native Method) ~[?:1.8.0_381]
> at java.lang.Object.wait(Object.java:502) ~[?:1.8.0_381]
> at com.ververica.cdc.debezium.internal.Handover.produce(Handover.java:115) 
> ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at 
> com.ververica.cdc.debezium.internal.DebeziumChangeConsumer.handleBatch(DebeziumChangeConsumer.java:54)
>  ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at 
> io.debezium.embedded.ConvertingEngineBuilder.lambda$notifying$2(ConvertingEngineBuilder.java:83)
>  ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:822) 
> [flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at 
> io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:192)
>  [flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  [?:1.8.0_381]
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  [?:1.8.0_381]
> at java.lang.Thread.run(Thread.java:750) [?:1.8.0_381]
> 2024-05-28 15:39:37,375 INFO io.debezium.embedded.EmbeddedEngine [] - 
> Stopping the task and engine
> 2024-05-28 15:39:37,375 INFO io.debezium.connector.common.BaseSourceTask [] - 
> Stopping down connector
> 2024-05-28 15:41:07,376 WARN 
> io.debezium.pipeline.ChangeEventSourceCoordinator [] - Coordinator didn't 
> stop in the expected time, shutting down executor now
> 2024-05-28 15:41:07,377 WARN 
> io.debezium.pipeline.source.AbstractSnapshotChangeEventSource [] - Snapshot 
> was interrupted before completion
> 2024-05-28 15:41:07,377 INFO 
> io.debezium.pipeline.source.AbstractSnapshotChangeEventSource [] - Snapshot - 
> Final stage
> 2024-05-28 15:41:07,377 WARN 
> io.debezium.pipeline.ChangeEventSourceCoordinator [] - Change event source 
> executor was interrupted
> java.lang.InterruptedException: null
> at java.lang.Object.wait(Native Method) ~[?:1.8.0_381]
> at 
> io.debezium.connector.base.ChangeEventQueue.doEnqueue(ChangeEventQueue.java:204)
>  ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at 
> io.debezium.connector.base.ChangeEventQueue.enqueue(ChangeEventQueue.java:169)
>  ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at 
> io.debezium.pipeline.EventDispatcher$BufferingSnapshotChangeRecordReceiver.changeRecord(EventDispatcher.java:440)
>  ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at 
> io.debezium.pipeline.EventDispatcher$1.changeRecord(EventDispatcher.java:166) 
> ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at 
> io.debezium.relational.RelationalChangeRecordEmitter.emitReadRecord(RelationalChangeRecordEmitter.java:120)
>  ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at 
> io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:57)
>  ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at 
> io.debezium.pipeline.EventDispatcher.dispatchSnapshotEvent(EventDispatcher.java:155)
>  ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at 
> io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEventsForTable(RelationalSnapshotChangeEventSource.java:407)
>  ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at 
> io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEvents(RelationalSnapshotChangeEventSource.java:316)
>  ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at 
> io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:132)
>  ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at 
> io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:76)
>  ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at 
> io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:155)
>  ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at 
> io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:137)
>  ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at 
> io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:109)
>  ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> [?:1.8.0_381]
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_381]
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  [?:1.8.0_381]
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  [?:1.8.0_381]
> at java.lang.Thread.run(Thread.java:750) [?:1.8.0_381]
> 2024-05-28 15:41:07,378 INFO 
> io.debezium.pipeline.ChangeEventSourceCoordinator [] - Connected metrics set 
> to 'false'
> 2024-05-28 15:41:07,380 INFO io.debezium.jdbc.JdbcConnection [] - Connection 
> gracefully closed
> {code}
> image
> What You Expected?
> When synchronizing a PostgreSQL table using a connector, if the table 
> contains a large number of partitions, the checkpoint always fails. By 
> tracing the source code, it was found that the 's queries the schema of each 
> table. This schema querying during each checkpoint causes connection 
> timeouts.PostgresDialectqueryTableSchema
> Solution: Use caching. Each partitioned table should match and fetch the 
> schema only once using the following parameters:
>  
> tableList=(public)\.(aia_t_icc_jjdb.*|aia_t_vcs_fkdb.*|aia_t_vcs_pjdb.*|aia_t_vcs_dsrdb|aia_t_vcs_zjdb|case_log_test)
> How to Reproduce?
> Steps to Reproduce:
> Add a partitioned table in PostgreSQL.
> Create partitions for nearly ten years.
> Synchronize this table.
> Anything Else?
> Method Modification:
> I modified the method as follows and it works well:
>  
> {code:java}
> @Override
> public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc, TableId 
> tableId) {
> long startTime = System.nanoTime(); // Record start time
> String name = this.tableNameConverter.convert(tableId.table());
> TableId parentTableId = new TableId(null, tableId.schema(), name);
> TableChanges.TableChange tableChange = cache.get(parentTableId);
> if (tableChange == null) {
> LOG.info("[queryTableSchema begin] {}", tableId.identifier());
> if (schema == null)
> { schema = new CustomPostgresSchema((PostgresConnection) jdbc, sourceConfig); 
> }
> tableChange = schema.getTableSchema(tableId);
> LOG.info("[queryTableSchema end] {}", tableId.identifier());
> cache.put(parentTableId, tableChange);
> }
> long endTime = System.nanoTime(); 
> long duration = endTime - startTime; 
> LOG.info("[queryTableSchema duration] {} {} ms", tableId.identifier(), 
> duration / 1_000_000); // Convert nanoseconds to milliseconds
> return tableChange;
> }
> {code}
>  I am willing to submit a PR!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to