tchivs created FLINK-36164: ------------------------------ Summary: JdbcIncrementalSource CheckPoint Timeout Due to Retrieving Schemas for All Subpartitions When Synchronizing a Partitioned PostgreSQL Table Key: FLINK-36164 URL: https://issues.apache.org/jira/browse/FLINK-36164 Project: Flink Issue Type: Improvement Components: Flink CDC Affects Versions: cdc-3.1.1 Reporter: tchivs
When synchronizing a PostgreSQL table using a connector, if the table contains a large number of partitions, the checkpoint always fails. By tracing the source code, it was found that the PostgresDialect's queryTableSchema queries the schema of each table. This schema querying during each checkpoint causes connection timeouts. tableName="(public)\.(aia_t_icc_jjdb.*|aia_t_vcs_fkdb.*|aia_t_vcs_pjdb.*|aia_t_vcs_dsrdb|aia_t_vcs_zjdb|case_log_test)" ``` JdbcIncrementalSource<String> incrSource = PostgresSourceBuilder.PostgresIncrementalSource.<String>builder() .hostname(hostname) .port(port) .database(databaseName) .schemaList(schemaName) .tableList(tableName) .username(username) .password(password) .deserializer(schema) .slotName(slotName) .decodingPluginName(config.get(DECODING_PLUGIN_NAME)) .includeSchemaChanges(true) .debeziumProperties(debeziumProperties) .startupOptions(startupOptions) .splitSize(config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE)) .splitMetaGroupSize(config.get(CHUNK_META_GROUP_SIZE)) .fetchSize(config.get(SCAN_SNAPSHOT_FETCH_SIZE)) .connectTimeout(config.get(CONNECT_TIMEOUT)) .connectionPoolSize(config.get(CONNECTION_POOL_SIZE)) .connectMaxRetries(config.get(CONNECT_MAX_RETRIES)) .distributionFactorUpper( config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND)) .distributionFactorLower( config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND)) .heartbeatInterval(config.get(HEARTBEAT_INTERVAL)) .build(); return env.fromSource( incrSource, WatermarkStrategy.noWatermarks(), "Postgres IncrSource"); ``` check point Exception: ``` org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold. The latest checkpoint failed due to Checkpoint expired before completing., view the Checkpoint History tab or the Job Manager log to find out why continuous checkpoints failed. ```  error log: ```logger 2024-05-28 15:39:07,367 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for jjdb: Writer -> jjdb: Committer (1/1)#0 (dbec67a546f03f19bae3a56726a02174_788573959fc6fd87fb6bfd0ffc27d896_0_0). 2024-05-28 15:39:07,370 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state CANCELED to JobManager for task fkdb: Writer -> fkdb: Committer (1/1)#0 dbec67a546f03f19bae3a56726a02174_1992c3287130e2f49268dbcc909a7d1d_0_0. 2024-05-28 15:39:07,370 INFO org.apache.http.impl.execchain.RetryExec [] - I/O exception (java.net.SocketException) caught when processing request to {}->http://192.168.0.168:8040: Socket closed 2024-05-28 15:39:07,378 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state CANCELED to JobManager for task jjdb: Writer -> jjdb: Committer (1/1)#0 dbec67a546f03f19bae3a56726a02174_788573959fc6fd87fb6bfd0ffc27d896_0_0. 2024-05-28 15:39:37,363 WARN org.apache.flink.runtime.taskmanager.Task [] - Task 'Source: Postgres Source -> Process (1/1)#0' did not react to cancelling signal - interrupting; it is stuck for 30 seconds in method: org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398) org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367) org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352) org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229) org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858) org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807) org.apache.flink.runtime.taskmanager.Task$$Lambda$813/1723769838.run(Unknown Source) org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932) org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) java.lang.Thread.run(Thread.java:750) 2024-05-28 15:39:37,372 INFO io.debezium.embedded.EmbeddedEngine [] - Stopping the embedded engine 2024-05-28 15:39:37,373 ERROR com.ververica.cdc.debezium.internal.Handover [] - Reporting error: java.lang.InterruptedException: null at java.lang.Object.wait(Native Method) ~[?:1.8.0_381] at java.lang.Object.wait(Object.java:502) ~[?:1.8.0_381] at com.ververica.cdc.debezium.internal.Handover.produce(Handover.java:115) ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1] at com.ververica.cdc.debezium.internal.DebeziumChangeConsumer.handleBatch(DebeziumChangeConsumer.java:54) ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1] at io.debezium.embedded.ConvertingEngineBuilder.lambda$notifying$2(ConvertingEngineBuilder.java:83) ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1] at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:822) [flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1] at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:192) [flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_381] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_381] at java.lang.Thread.run(Thread.java:750) [?:1.8.0_381] 2024-05-28 15:39:37,375 INFO io.debezium.embedded.EmbeddedEngine [] - Stopping the task and engine 2024-05-28 15:39:37,375 INFO io.debezium.connector.common.BaseSourceTask [] - Stopping down connector 2024-05-28 15:41:07,376 WARN io.debezium.pipeline.ChangeEventSourceCoordinator [] - Coordinator didn't stop in the expected time, shutting down executor now 2024-05-28 15:41:07,377 WARN io.debezium.pipeline.source.AbstractSnapshotChangeEventSource [] - Snapshot was interrupted before completion 2024-05-28 15:41:07,377 INFO io.debezium.pipeline.source.AbstractSnapshotChangeEventSource [] - Snapshot - Final stage 2024-05-28 15:41:07,377 WARN io.debezium.pipeline.ChangeEventSourceCoordinator [] - Change event source executor was interrupted java.lang.InterruptedException: null at java.lang.Object.wait(Native Method) ~[?:1.8.0_381] at io.debezium.connector.base.ChangeEventQueue.doEnqueue(ChangeEventQueue.java:204) ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1] at io.debezium.connector.base.ChangeEventQueue.enqueue(ChangeEventQueue.java:169) ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1] at io.debezium.pipeline.EventDispatcher$BufferingSnapshotChangeRecordReceiver.changeRecord(EventDispatcher.java:440) ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1] at io.debezium.pipeline.EventDispatcher$1.changeRecord(EventDispatcher.java:166) ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1] at io.debezium.relational.RelationalChangeRecordEmitter.emitReadRecord(RelationalChangeRecordEmitter.java:120) ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1] at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:57) ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1] at io.debezium.pipeline.EventDispatcher.dispatchSnapshotEvent(EventDispatcher.java:155) ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1] at io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEventsForTable(RelationalSnapshotChangeEventSource.java:407) ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1] at io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEvents(RelationalSnapshotChangeEventSource.java:316) ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1] at io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:132) ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1] at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:76) ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1] at io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:155) ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1] at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:137) ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1] at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:109) ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_381] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_381] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_381] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_381] at java.lang.Thread.run(Thread.java:750) [?:1.8.0_381] 2024-05-28 15:41:07,378 INFO io.debezium.pipeline.ChangeEventSourceCoordinator [] - Connected metrics set to 'false' 2024-05-28 15:41:07,380 INFO io.debezium.jdbc.JdbcConnection [] - Connection gracefully closed ```  ### What You Expected? When synchronizing a PostgreSQL table using a connector, if the table contains a large number of partitions, the checkpoint always fails. By tracing the source code, it was found that the `PostgresDialect`'s `queryTableSchema` queries the schema of each table. This schema querying during each checkpoint causes connection timeouts. **Solution:** Use caching. Each partitioned table should match and fetch the schema only once using the following parameters: - `--multi-to-one-origin "jjdb_.*|fkdb_.*|pjdb_.*"` - `--multi-to-one-target "dwd_jjdb|dwdfkdb|dwd_pjdb"` ### How to Reproduce? **Steps to Reproduce:** 1. Add a partitioned table in PostgreSQL. 2. Create partitions for nearly ten years. 3. Synchronize this table. ### Anything Else? **Method Modification:** I modified the method as follows and it works well: ```java @Override public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc, TableId tableId) { long startTime = System.nanoTime(); // Record start time String name = this.tableNameConverter.convert(tableId.table()); TableId parentTableId = new TableId(null, tableId.schema(), name); TableChanges.TableChange tableChange = cache.get(parentTableId); if (tableChange == null) { LOG.info("[queryTableSchema begin] {}", tableId.identifier()); if (schema == null) { schema = new CustomPostgresSchema((PostgresConnection) jdbc, sourceConfig); } tableChange = schema.getTableSchema(tableId); LOG.info("[queryTableSchema end] {}", tableId.identifier()); cache.put(parentTableId, tableChange); } long endTime = System.nanoTime(); long duration = endTime - startTime; LOG.info("[queryTableSchema duration] {} {} ms", tableId.identifier(), duration / 1_000_000); // Convert nanoseconds to milliseconds return tableChange; } ``` I am willing to submit a PR! -- This message was sent by Atlassian Jira (v8.20.10#820010)