[
https://issues.apache.org/jira/browse/FLINK-36164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
tchivs updated FLINK-36164:
---------------------------
Description:
When synchronizing a PostgreSQL table using a connector, if the table contains
a large number of partitions, the checkpoint always fails. By tracing the
source code, it was found that the PostgresDialect's queryTableSchema queries
the schema of each table. This schema querying during each checkpoint causes
connection timeouts.
tableName value is
"(public)\.(aia_t_icc_jjdb.{*}|aia_t_vcs_fkdb.{*}|aia_t_vcs_pjdb.*|aia_t_vcs_dsrdb|aia_t_vcs_zjdb|case_log_test)"
JdbcIncrementalSource<String> incrSource =
PostgresSourceBuilder.PostgresIncrementalSource.<String>builder()
.hostname(hostname)
.port(port)
.database(databaseName)
.schemaList(schemaName)
.tableList(tableName)
.username(username)
.password(password)
.deserializer(schema)
.slotName(slotName)
.decodingPluginName(config.get(DECODING_PLUGIN_NAME))
.includeSchemaChanges(true)
.debeziumProperties(debeziumProperties)
.startupOptions(startupOptions)
.splitSize(config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE))
.splitMetaGroupSize(config.get(CHUNK_META_GROUP_SIZE))
.fetchSize(config.get(SCAN_SNAPSHOT_FETCH_SIZE))
.connectTimeout(config.get(CONNECT_TIMEOUT))
.connectionPoolSize(config.get(CONNECTION_POOL_SIZE))
.connectMaxRetries(config.get(CONNECT_MAX_RETRIES))
.distributionFactorUpper(
config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND))
.distributionFactorLower(
config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND))
.heartbeatInterval(config.get(HEARTBEAT_INTERVAL))
.build();
return env.fromSource(
incrSource, WatermarkStrategy.noWatermarks(), "Postgres IncrSource");
check point Exception:
```
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable
failure threshold. The latest checkpoint failed due to Checkpoint expired
before completing., view the Checkpoint History tab or the Job Manager log to
find out why continuous checkpoints failed.
```

error log:
```logger
2024-05-28 15:39:07,367 INFO org.apache.flink.runtime.taskmanager.Task [] -
Freeing task resources for jjdb: Writer -> jjdb: Committer (1/1)#0
(dbec67a546f03f19bae3a56726a02174_788573959fc6fd87fb6bfd0ffc27d896_0_0).
2024-05-28 15:39:07,370 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor
[] - Un-registering task and sending final execution state CANCELED to
JobManager for task fkdb: Writer -> fkdb: Committer (1/1)#0
dbec67a546f03f19bae3a56726a02174_1992c3287130e2f49268dbcc909a7d1d_0_0.
2024-05-28 15:39:07,370 INFO org.apache.http.impl.execchain.RetryExec [] - I/O
exception (java.net.SocketException) caught when processing request to
{}->[http://192.168.0.168:8040:] Socket closed
2024-05-28 15:39:07,378 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor
[] - Un-registering task and sending final execution state CANCELED to
JobManager for task jjdb: Writer -> jjdb: Committer (1/1)#0
dbec67a546f03f19bae3a56726a02174_788573959fc6fd87fb6bfd0ffc27d896_0_0.
2024-05-28 15:39:37,363 WARN org.apache.flink.runtime.taskmanager.Task [] -
Task 'Source: Postgres Source -> Process (1/1)#0' did not react to cancelling
signal - interrupting; it is stuck for 30 seconds in method:
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
org.apache.flink.runtime.taskmanager.Task$$Lambda$813/1723769838.run(Unknown
Source)
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
java.lang.Thread.run(Thread.java:750)
2024-05-28 15:39:37,372 INFO io.debezium.embedded.EmbeddedEngine [] - Stopping
the embedded engine
2024-05-28 15:39:37,373 ERROR com.ververica.cdc.debezium.internal.Handover [] -
Reporting error:
java.lang.InterruptedException: null
at java.lang.Object.wait(Native Method) ~[?:1.8.0_381]
at java.lang.Object.wait(Object.java:502) ~[?:1.8.0_381]
at com.ververica.cdc.debezium.internal.Handover.produce(Handover.java:115)
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at
com.ververica.cdc.debezium.internal.DebeziumChangeConsumer.handleBatch(DebeziumChangeConsumer.java:54)
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at
io.debezium.embedded.ConvertingEngineBuilder.lambda$notifying$2(ConvertingEngineBuilder.java:83)
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:822)
[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at
io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:192)
[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_381]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_381]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_381]
2024-05-28 15:39:37,375 INFO io.debezium.embedded.EmbeddedEngine [] - Stopping
the task and engine
2024-05-28 15:39:37,375 INFO io.debezium.connector.common.BaseSourceTask [] -
Stopping down connector
2024-05-28 15:41:07,376 WARN io.debezium.pipeline.ChangeEventSourceCoordinator
[] - Coordinator didn't stop in the expected time, shutting down executor now
2024-05-28 15:41:07,377 WARN
io.debezium.pipeline.source.AbstractSnapshotChangeEventSource [] - Snapshot was
interrupted before completion
2024-05-28 15:41:07,377 INFO
io.debezium.pipeline.source.AbstractSnapshotChangeEventSource [] - Snapshot -
Final stage
2024-05-28 15:41:07,377 WARN io.debezium.pipeline.ChangeEventSourceCoordinator
[] - Change event source executor was interrupted
java.lang.InterruptedException: null
at java.lang.Object.wait(Native Method) ~[?:1.8.0_381]
at
io.debezium.connector.base.ChangeEventQueue.doEnqueue(ChangeEventQueue.java:204)
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at
io.debezium.connector.base.ChangeEventQueue.enqueue(ChangeEventQueue.java:169)
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at
io.debezium.pipeline.EventDispatcher$BufferingSnapshotChangeRecordReceiver.changeRecord(EventDispatcher.java:440)
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at
io.debezium.pipeline.EventDispatcher$1.changeRecord(EventDispatcher.java:166)
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at
io.debezium.relational.RelationalChangeRecordEmitter.emitReadRecord(RelationalChangeRecordEmitter.java:120)
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at
io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:57)
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at
io.debezium.pipeline.EventDispatcher.dispatchSnapshotEvent(EventDispatcher.java:155)
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at
io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEventsForTable(RelationalSnapshotChangeEventSource.java:407)
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at
io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEvents(RelationalSnapshotChangeEventSource.java:316)
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at
io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:132)
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at
io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:76)
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at
io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:155)
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at
io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:137)
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at
io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:109)
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[?:1.8.0_381]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_381]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_381]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_381]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_381]
2024-05-28 15:41:07,378 INFO io.debezium.pipeline.ChangeEventSourceCoordinator
[] - Connected metrics set to 'false'
2024-05-28 15:41:07,380 INFO io.debezium.jdbc.JdbcConnection [] - Connection
gracefully closed
```

#
##
### What You Expected?
When synchronizing a PostgreSQL table using a connector, if the table contains
a large number of partitions, the checkpoint always fails. By tracing the
source code, it was found that the `PostgresDialect`'s `queryTableSchema`
queries the schema of each table. This schema querying during each checkpoint
causes connection timeouts.
*{*}Solution:{*}* Use caching. Each partitioned table should match and fetch
the schema only once using the following parameters:
- `--multi-to-one-origin "jjdb_.{*}|fkdb_.{*}|pjdb_.*"`
- `--multi-to-one-target "dwd_jjdb|dwdfkdb|dwd_pjdb"`
#
##
### How to Reproduce?
*{*}Steps to Reproduce:{*}*
1. Add a partitioned table in PostgreSQL.
2. Create partitions for nearly ten years.
3. Synchronize this table.
#
##
### Anything Else?
*{*}Method Modification:{*}*
I modified the method as follows and it works well:
```java
@Override
public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc, TableId
tableId) {
long startTime = System.nanoTime(); // Record start time
String name = this.tableNameConverter.convert(tableId.table());
TableId parentTableId = new TableId(null, tableId.schema(), name);
TableChanges.TableChange tableChange = cache.get(parentTableId);
if (tableChange == null) {
LOG.info("[queryTableSchema begin] {}", tableId.identifier());
if (schema == null)
{ schema = new CustomPostgresSchema((PostgresConnection) jdbc,
sourceConfig); }
tableChange = schema.getTableSchema(tableId);
LOG.info("[queryTableSchema end] {}", tableId.identifier());
cache.put(parentTableId, tableChange);
}
long endTime = System.nanoTime();
long duration = endTime - startTime;
LOG.info("[queryTableSchema duration] {} {} ms", tableId.identifier(),
duration / 1_000_000); // Convert nanoseconds to milliseconds
return tableChange;
}
```
I am willing to submit a PR!
was:
When synchronizing a PostgreSQL table using a connector, if the table contains
a large number of partitions, the checkpoint always fails. By tracing the
source code, it was found that the PostgresDialect's queryTableSchema queries
the schema of each table. This schema querying during each checkpoint causes
connection timeouts.
tableName="(public)\.(aia_t_icc_jjdb.*|aia_t_vcs_fkdb.*|aia_t_vcs_pjdb.*|aia_t_vcs_dsrdb|aia_t_vcs_zjdb|case_log_test)"
```
JdbcIncrementalSource<String> incrSource =
PostgresSourceBuilder.PostgresIncrementalSource.<String>builder()
.hostname(hostname)
.port(port)
.database(databaseName)
.schemaList(schemaName)
.tableList(tableName)
.username(username)
.password(password)
.deserializer(schema)
.slotName(slotName)
.decodingPluginName(config.get(DECODING_PLUGIN_NAME))
.includeSchemaChanges(true)
.debeziumProperties(debeziumProperties)
.startupOptions(startupOptions)
.splitSize(config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE))
.splitMetaGroupSize(config.get(CHUNK_META_GROUP_SIZE))
.fetchSize(config.get(SCAN_SNAPSHOT_FETCH_SIZE))
.connectTimeout(config.get(CONNECT_TIMEOUT))
.connectionPoolSize(config.get(CONNECTION_POOL_SIZE))
.connectMaxRetries(config.get(CONNECT_MAX_RETRIES))
.distributionFactorUpper(
config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND))
.distributionFactorLower(
config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND))
.heartbeatInterval(config.get(HEARTBEAT_INTERVAL))
.build();
return env.fromSource(
incrSource, WatermarkStrategy.noWatermarks(), "Postgres IncrSource");
```
check point Exception:
```
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable
failure threshold. The latest checkpoint failed due to Checkpoint expired
before completing., view the Checkpoint History tab or the Job Manager log to
find out why continuous checkpoints failed.
```

error log:
```logger
2024-05-28 15:39:07,367 INFO org.apache.flink.runtime.taskmanager.Task [] -
Freeing task resources for jjdb: Writer -> jjdb: Committer (1/1)#0
(dbec67a546f03f19bae3a56726a02174_788573959fc6fd87fb6bfd0ffc27d896_0_0).
2024-05-28 15:39:07,370 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor
[] - Un-registering task and sending final execution state CANCELED to
JobManager for task fkdb: Writer -> fkdb: Committer (1/1)#0
dbec67a546f03f19bae3a56726a02174_1992c3287130e2f49268dbcc909a7d1d_0_0.
2024-05-28 15:39:07,370 INFO org.apache.http.impl.execchain.RetryExec [] - I/O
exception (java.net.SocketException) caught when processing request to
{}->http://192.168.0.168:8040: Socket closed
2024-05-28 15:39:07,378 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor
[] - Un-registering task and sending final execution state CANCELED to
JobManager for task jjdb: Writer -> jjdb: Committer (1/1)#0
dbec67a546f03f19bae3a56726a02174_788573959fc6fd87fb6bfd0ffc27d896_0_0.
2024-05-28 15:39:37,363 WARN org.apache.flink.runtime.taskmanager.Task [] -
Task 'Source: Postgres Source -> Process (1/1)#0' did not react to cancelling
signal - interrupting; it is stuck for 30 seconds in method:
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
org.apache.flink.runtime.taskmanager.Task$$Lambda$813/1723769838.run(Unknown
Source)
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
java.lang.Thread.run(Thread.java:750)
2024-05-28 15:39:37,372 INFO io.debezium.embedded.EmbeddedEngine [] - Stopping
the embedded engine
2024-05-28 15:39:37,373 ERROR com.ververica.cdc.debezium.internal.Handover [] -
Reporting error:
java.lang.InterruptedException: null
at java.lang.Object.wait(Native Method) ~[?:1.8.0_381]
at java.lang.Object.wait(Object.java:502) ~[?:1.8.0_381]
at com.ververica.cdc.debezium.internal.Handover.produce(Handover.java:115)
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at
com.ververica.cdc.debezium.internal.DebeziumChangeConsumer.handleBatch(DebeziumChangeConsumer.java:54)
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at
io.debezium.embedded.ConvertingEngineBuilder.lambda$notifying$2(ConvertingEngineBuilder.java:83)
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:822)
[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at
io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:192)
[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_381]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_381]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_381]
2024-05-28 15:39:37,375 INFO io.debezium.embedded.EmbeddedEngine [] - Stopping
the task and engine
2024-05-28 15:39:37,375 INFO io.debezium.connector.common.BaseSourceTask [] -
Stopping down connector
2024-05-28 15:41:07,376 WARN io.debezium.pipeline.ChangeEventSourceCoordinator
[] - Coordinator didn't stop in the expected time, shutting down executor now
2024-05-28 15:41:07,377 WARN
io.debezium.pipeline.source.AbstractSnapshotChangeEventSource [] - Snapshot was
interrupted before completion
2024-05-28 15:41:07,377 INFO
io.debezium.pipeline.source.AbstractSnapshotChangeEventSource [] - Snapshot -
Final stage
2024-05-28 15:41:07,377 WARN io.debezium.pipeline.ChangeEventSourceCoordinator
[] - Change event source executor was interrupted
java.lang.InterruptedException: null
at java.lang.Object.wait(Native Method) ~[?:1.8.0_381]
at
io.debezium.connector.base.ChangeEventQueue.doEnqueue(ChangeEventQueue.java:204)
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at
io.debezium.connector.base.ChangeEventQueue.enqueue(ChangeEventQueue.java:169)
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at
io.debezium.pipeline.EventDispatcher$BufferingSnapshotChangeRecordReceiver.changeRecord(EventDispatcher.java:440)
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at
io.debezium.pipeline.EventDispatcher$1.changeRecord(EventDispatcher.java:166)
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at
io.debezium.relational.RelationalChangeRecordEmitter.emitReadRecord(RelationalChangeRecordEmitter.java:120)
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at
io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:57)
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at
io.debezium.pipeline.EventDispatcher.dispatchSnapshotEvent(EventDispatcher.java:155)
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at
io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEventsForTable(RelationalSnapshotChangeEventSource.java:407)
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at
io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEvents(RelationalSnapshotChangeEventSource.java:316)
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at
io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:132)
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at
io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:76)
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at
io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:155)
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at
io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:137)
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at
io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:109)
~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[?:1.8.0_381]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_381]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_381]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_381]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_381]
2024-05-28 15:41:07,378 INFO io.debezium.pipeline.ChangeEventSourceCoordinator
[] - Connected metrics set to 'false'
2024-05-28 15:41:07,380 INFO io.debezium.jdbc.JdbcConnection [] - Connection
gracefully closed
```

### What You Expected?
When synchronizing a PostgreSQL table using a connector, if the table contains
a large number of partitions, the checkpoint always fails. By tracing the
source code, it was found that the `PostgresDialect`'s `queryTableSchema`
queries the schema of each table. This schema querying during each checkpoint
causes connection timeouts.
**Solution:** Use caching. Each partitioned table should match and fetch the
schema only once using the following parameters:
- `--multi-to-one-origin "jjdb_.*|fkdb_.*|pjdb_.*"`
- `--multi-to-one-target "dwd_jjdb|dwdfkdb|dwd_pjdb"`
### How to Reproduce?
**Steps to Reproduce:**
1. Add a partitioned table in PostgreSQL.
2. Create partitions for nearly ten years.
3. Synchronize this table.
### Anything Else?
**Method Modification:**
I modified the method as follows and it works well:
```java
@Override
public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc, TableId
tableId) {
long startTime = System.nanoTime(); // Record start time
String name = this.tableNameConverter.convert(tableId.table());
TableId parentTableId = new TableId(null, tableId.schema(), name);
TableChanges.TableChange tableChange = cache.get(parentTableId);
if (tableChange == null) {
LOG.info("[queryTableSchema begin] {}", tableId.identifier());
if (schema == null) {
schema = new CustomPostgresSchema((PostgresConnection) jdbc,
sourceConfig);
}
tableChange = schema.getTableSchema(tableId);
LOG.info("[queryTableSchema end] {}", tableId.identifier());
cache.put(parentTableId, tableChange);
}
long endTime = System.nanoTime();
long duration = endTime - startTime;
LOG.info("[queryTableSchema duration] {} {} ms", tableId.identifier(),
duration / 1_000_000); // Convert nanoseconds to milliseconds
return tableChange;
}
```
I am willing to submit a PR!
> JdbcIncrementalSource CheckPoint Timeout Due to Retrieving Schemas for All
> Subpartitions When Synchronizing a Partitioned PostgreSQL Table
> ------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-36164
> URL: https://issues.apache.org/jira/browse/FLINK-36164
> Project: Flink
> Issue Type: Improvement
> Components: Flink CDC
> Affects Versions: cdc-3.1.1
> Reporter: tchivs
> Priority: Major
>
> When synchronizing a PostgreSQL table using a connector, if the table
> contains a large number of partitions, the checkpoint always fails. By
> tracing the source code, it was found that the PostgresDialect's
> queryTableSchema queries the schema of each table. This schema querying
> during each checkpoint causes connection timeouts.
>
>
>
> tableName value is
> "(public)\.(aia_t_icc_jjdb.{*}|aia_t_vcs_fkdb.{*}|aia_t_vcs_pjdb.*|aia_t_vcs_dsrdb|aia_t_vcs_zjdb|case_log_test)"
>
> JdbcIncrementalSource<String> incrSource =
> PostgresSourceBuilder.PostgresIncrementalSource.<String>builder()
> .hostname(hostname)
> .port(port)
> .database(databaseName)
> .schemaList(schemaName)
> .tableList(tableName)
> .username(username)
> .password(password)
> .deserializer(schema)
> .slotName(slotName)
> .decodingPluginName(config.get(DECODING_PLUGIN_NAME))
> .includeSchemaChanges(true)
> .debeziumProperties(debeziumProperties)
> .startupOptions(startupOptions)
> .splitSize(config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE))
> .splitMetaGroupSize(config.get(CHUNK_META_GROUP_SIZE))
> .fetchSize(config.get(SCAN_SNAPSHOT_FETCH_SIZE))
> .connectTimeout(config.get(CONNECT_TIMEOUT))
> .connectionPoolSize(config.get(CONNECTION_POOL_SIZE))
> .connectMaxRetries(config.get(CONNECT_MAX_RETRIES))
> .distributionFactorUpper(
> config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND))
> .distributionFactorLower(
> config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND))
> .heartbeatInterval(config.get(HEARTBEAT_INTERVAL))
> .build();
> return env.fromSource(
> incrSource, WatermarkStrategy.noWatermarks(), "Postgres IncrSource");
>
>
> check point Exception:
> ```
> org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable
> failure threshold. The latest checkpoint failed due to Checkpoint expired
> before completing., view the Checkpoint History tab or the Job Manager log to
> find out why continuous checkpoints failed.
> ```
> 
> error log:
> ```logger
> 2024-05-28 15:39:07,367 INFO org.apache.flink.runtime.taskmanager.Task [] -
> Freeing task resources for jjdb: Writer -> jjdb: Committer (1/1)#0
> (dbec67a546f03f19bae3a56726a02174_788573959fc6fd87fb6bfd0ffc27d896_0_0).
> 2024-05-28 15:39:07,370 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task
> and sending final execution state CANCELED to JobManager for task fkdb:
> Writer -> fkdb: Committer (1/1)#0
> dbec67a546f03f19bae3a56726a02174_1992c3287130e2f49268dbcc909a7d1d_0_0.
> 2024-05-28 15:39:07,370 INFO org.apache.http.impl.execchain.RetryExec [] -
> I/O exception (java.net.SocketException) caught when processing request to
> {}->[http://192.168.0.168:8040:] Socket closed
> 2024-05-28 15:39:07,378 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task
> and sending final execution state CANCELED to JobManager for task jjdb:
> Writer -> jjdb: Committer (1/1)#0
> dbec67a546f03f19bae3a56726a02174_788573959fc6fd87fb6bfd0ffc27d896_0_0.
> 2024-05-28 15:39:37,363 WARN org.apache.flink.runtime.taskmanager.Task [] -
> Task 'Source: Postgres Source -> Process (1/1)#0' did not react to cancelling
> signal - interrupting; it is stuck for 30 seconds in method:
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
> org.apache.flink.runtime.taskmanager.Task$$Lambda$813/1723769838.run(Unknown
> Source)
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> java.lang.Thread.run(Thread.java:750)
> 2024-05-28 15:39:37,372 INFO io.debezium.embedded.EmbeddedEngine [] -
> Stopping the embedded engine
> 2024-05-28 15:39:37,373 ERROR com.ververica.cdc.debezium.internal.Handover []
> - Reporting error:
> java.lang.InterruptedException: null
> at java.lang.Object.wait(Native Method) ~[?:1.8.0_381]
> at java.lang.Object.wait(Object.java:502) ~[?:1.8.0_381]
> at com.ververica.cdc.debezium.internal.Handover.produce(Handover.java:115)
> ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at
> com.ververica.cdc.debezium.internal.DebeziumChangeConsumer.handleBatch(DebeziumChangeConsumer.java:54)
> ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at
> io.debezium.embedded.ConvertingEngineBuilder.lambda$notifying$2(ConvertingEngineBuilder.java:83)
> ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:822)
> [flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at
> io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:192)
> [flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [?:1.8.0_381]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [?:1.8.0_381]
> at java.lang.Thread.run(Thread.java:750) [?:1.8.0_381]
> 2024-05-28 15:39:37,375 INFO io.debezium.embedded.EmbeddedEngine [] -
> Stopping the task and engine
> 2024-05-28 15:39:37,375 INFO io.debezium.connector.common.BaseSourceTask [] -
> Stopping down connector
> 2024-05-28 15:41:07,376 WARN
> io.debezium.pipeline.ChangeEventSourceCoordinator [] - Coordinator didn't
> stop in the expected time, shutting down executor now
> 2024-05-28 15:41:07,377 WARN
> io.debezium.pipeline.source.AbstractSnapshotChangeEventSource [] - Snapshot
> was interrupted before completion
> 2024-05-28 15:41:07,377 INFO
> io.debezium.pipeline.source.AbstractSnapshotChangeEventSource [] - Snapshot -
> Final stage
> 2024-05-28 15:41:07,377 WARN
> io.debezium.pipeline.ChangeEventSourceCoordinator [] - Change event source
> executor was interrupted
> java.lang.InterruptedException: null
> at java.lang.Object.wait(Native Method) ~[?:1.8.0_381]
> at
> io.debezium.connector.base.ChangeEventQueue.doEnqueue(ChangeEventQueue.java:204)
> ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at
> io.debezium.connector.base.ChangeEventQueue.enqueue(ChangeEventQueue.java:169)
> ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at
> io.debezium.pipeline.EventDispatcher$BufferingSnapshotChangeRecordReceiver.changeRecord(EventDispatcher.java:440)
> ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at
> io.debezium.pipeline.EventDispatcher$1.changeRecord(EventDispatcher.java:166)
> ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at
> io.debezium.relational.RelationalChangeRecordEmitter.emitReadRecord(RelationalChangeRecordEmitter.java:120)
> ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at
> io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:57)
> ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at
> io.debezium.pipeline.EventDispatcher.dispatchSnapshotEvent(EventDispatcher.java:155)
> ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at
> io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEventsForTable(RelationalSnapshotChangeEventSource.java:407)
> ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at
> io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEvents(RelationalSnapshotChangeEventSource.java:316)
> ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at
> io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:132)
> ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at
> io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:76)
> ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at
> io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:155)
> ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at
> io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:137)
> ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at
> io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:109)
> ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> [?:1.8.0_381]
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_381]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [?:1.8.0_381]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [?:1.8.0_381]
> at java.lang.Thread.run(Thread.java:750) [?:1.8.0_381]
> 2024-05-28 15:41:07,378 INFO
> io.debezium.pipeline.ChangeEventSourceCoordinator [] - Connected metrics set
> to 'false'
> 2024-05-28 15:41:07,380 INFO io.debezium.jdbc.JdbcConnection [] - Connection
> gracefully closed
> ```
> 
> #
> ##
> ### What You Expected?
> When synchronizing a PostgreSQL table using a connector, if the table
> contains a large number of partitions, the checkpoint always fails. By
> tracing the source code, it was found that the `PostgresDialect`'s
> `queryTableSchema` queries the schema of each table. This schema querying
> during each checkpoint causes connection timeouts.
> *{*}Solution:{*}* Use caching. Each partitioned table should match and fetch
> the schema only once using the following parameters:
> - `--multi-to-one-origin "jjdb_.{*}|fkdb_.{*}|pjdb_.*"`
> - `--multi-to-one-target "dwd_jjdb|dwdfkdb|dwd_pjdb"`
> #
> ##
> ### How to Reproduce?
> *{*}Steps to Reproduce:{*}*
> 1. Add a partitioned table in PostgreSQL.
> 2. Create partitions for nearly ten years.
> 3. Synchronize this table.
> #
> ##
> ### Anything Else?
> *{*}Method Modification:{*}*
> I modified the method as follows and it works well:
> ```java
> @Override
> public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc, TableId
> tableId) {
> long startTime = System.nanoTime(); // Record start time
> String name = this.tableNameConverter.convert(tableId.table());
> TableId parentTableId = new TableId(null, tableId.schema(), name);
> TableChanges.TableChange tableChange = cache.get(parentTableId);
> if (tableChange == null) {
> LOG.info("[queryTableSchema begin] {}", tableId.identifier());
> if (schema == null)
> { schema = new CustomPostgresSchema((PostgresConnection) jdbc,
> sourceConfig); }
> tableChange = schema.getTableSchema(tableId);
> LOG.info("[queryTableSchema end] {}", tableId.identifier());
> cache.put(parentTableId, tableChange);
> }
> long endTime = System.nanoTime();
> long duration = endTime - startTime;
> LOG.info("[queryTableSchema duration] {} {} ms", tableId.identifier(),
> duration / 1_000_000); // Convert nanoseconds to milliseconds
> return tableChange;
> }
> ```
> I am willing to submit a PR!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)