[ 
https://issues.apache.org/jira/browse/FLINK-36164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tchivs updated FLINK-36164:
---------------------------
    Attachment: image-2024-08-28-11-28-33-016.png

> JdbcIncrementalSource CheckPoint Timeout Due to Retrieving Schemas for All 
> Subpartitions When Synchronizing a Partitioned PostgreSQL Table
> ------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-36164
>                 URL: https://issues.apache.org/jira/browse/FLINK-36164
>             Project: Flink
>          Issue Type: Improvement
>          Components: Flink CDC
>    Affects Versions: cdc-3.1.1
>            Reporter: tchivs
>            Priority: Major
>         Attachments: image-2024-08-28-11-28-21-496.png, 
> image-2024-08-28-11-28-33-016.png
>
>
> What's Wrong?
> When synchronizing a PostgreSQL table using a connector, if the table 
> contains a large number of partitions, the checkpoint always fails. By 
> tracing the source code, it was found that the PostgresDialect's 
> queryTableSchema queries the schema of each table. This schema querying 
> during each checkpoint causes connection timeouts.
> {code:java}
> JdbcIncrementalSource<String> incrSource =
>         PostgresSourceBuilder.PostgresIncrementalSource.<String>builder()
>                 .hostname(hostname)
>                 .port(port)
>                 .database(databaseName)
>                 .schemaList(schemaName)
>                 .tableList(tableName)
>                 .username(username)
>                 .password(password)
>                 .deserializer(schema)
>                 .slotName(slotName)
>                 .decodingPluginName(config.get(DECODING_PLUGIN_NAME))
>                 .includeSchemaChanges(true)
>                 .debeziumProperties(debeziumProperties)
>                 .startupOptions(startupOptions)
>                 .splitSize(config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE))
>                 .splitMetaGroupSize(config.get(CHUNK_META_GROUP_SIZE))
>                 .fetchSize(config.get(SCAN_SNAPSHOT_FETCH_SIZE))
>                 .connectTimeout(config.get(CONNECT_TIMEOUT))
>                 .connectionPoolSize(config.get(CONNECTION_POOL_SIZE))
>                 .connectMaxRetries(config.get(CONNECT_MAX_RETRIES))
>                 .distributionFactorUpper(
>                         
> config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND))
>                 .distributionFactorLower(
>                         
> config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND))
>                 .heartbeatInterval(config.get(HEARTBEAT_INTERVAL))
>                 .build(); {code}
>  
> check point Exception:
> {quote}org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint 
> tolerable failure threshold. The latest checkpoint failed due to Checkpoint 
> expired before completing., view the Checkpoint History tab or the Job 
> Manager log to find out why continuous checkpoints failed.
> image
> {quote}
> error log:
>  
> {code:java}
> 2024-05-28 15:39:07,367 INFO org.apache.flink.runtime.taskmanager.Task [] - 
> Freeing task resources for jjdb: Writer -> jjdb: Committer (1/1)#0 
> (dbec67a546f03f19bae3a56726a02174_788573959fc6fd87fb6bfd0ffc27d896_0_0).
> 2024-05-28 15:39:07,370 INFO 
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task 
> and sending final execution state CANCELED to JobManager for task fkdb: 
> Writer -> fkdb: Committer (1/1)#0 
> dbec67a546f03f19bae3a56726a02174_1992c3287130e2f49268dbcc909a7d1d_0_0.
> 2024-05-28 15:39:07,370 INFO org.apache.http.impl.execchain.RetryExec [] - 
> I/O exception (java.net.SocketException) caught when processing request to 
> {}->http://192.168.0.168:8040: Socket closed
> 2024-05-28 15:39:07,378 INFO 
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task 
> and sending final execution state CANCELED to JobManager for task jjdb: 
> Writer -> jjdb: Committer (1/1)#0 
> dbec67a546f03f19bae3a56726a02174_788573959fc6fd87fb6bfd0ffc27d896_0_0.
> 2024-05-28 15:39:37,363 WARN org.apache.flink.runtime.taskmanager.Task [] - 
> Task 'Source: Postgres Source -> Process (1/1)#0' did not react to cancelling 
> signal - interrupting; it is stuck for 30 seconds in method:
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
> org.apache.flink.runtime.taskmanager.Task$$Lambda$813/1723769838.run(Unknown 
> Source)
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> java.lang.Thread.run(Thread.java:750)
> 2024-05-28 15:39:37,372 INFO io.debezium.embedded.EmbeddedEngine [] - 
> Stopping the embedded engine
> 2024-05-28 15:39:37,373 ERROR com.ververica.cdc.debezium.internal.Handover [] 
> - Reporting error:
> java.lang.InterruptedException: null
> at java.lang.Object.wait(Native Method) ~[?:1.8.0_381]
> at java.lang.Object.wait(Object.java:502) ~[?:1.8.0_381]
> at com.ververica.cdc.debezium.internal.Handover.produce(Handover.java:115) 
> ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at 
> com.ververica.cdc.debezium.internal.DebeziumChangeConsumer.handleBatch(DebeziumChangeConsumer.java:54)
>  ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at 
> io.debezium.embedded.ConvertingEngineBuilder.lambda$notifying$2(ConvertingEngineBuilder.java:83)
>  ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:822) 
> [flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at 
> io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:192)
>  [flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  [?:1.8.0_381]
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  [?:1.8.0_381]
> at java.lang.Thread.run(Thread.java:750) [?:1.8.0_381]
> 2024-05-28 15:39:37,375 INFO io.debezium.embedded.EmbeddedEngine [] - 
> Stopping the task and engine
> 2024-05-28 15:39:37,375 INFO io.debezium.connector.common.BaseSourceTask [] - 
> Stopping down connector
> 2024-05-28 15:41:07,376 WARN 
> io.debezium.pipeline.ChangeEventSourceCoordinator [] - Coordinator didn't 
> stop in the expected time, shutting down executor now
> 2024-05-28 15:41:07,377 WARN 
> io.debezium.pipeline.source.AbstractSnapshotChangeEventSource [] - Snapshot 
> was interrupted before completion
> 2024-05-28 15:41:07,377 INFO 
> io.debezium.pipeline.source.AbstractSnapshotChangeEventSource [] - Snapshot - 
> Final stage
> 2024-05-28 15:41:07,377 WARN 
> io.debezium.pipeline.ChangeEventSourceCoordinator [] - Change event source 
> executor was interrupted
> java.lang.InterruptedException: null
> at java.lang.Object.wait(Native Method) ~[?:1.8.0_381]
> at 
> io.debezium.connector.base.ChangeEventQueue.doEnqueue(ChangeEventQueue.java:204)
>  ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at 
> io.debezium.connector.base.ChangeEventQueue.enqueue(ChangeEventQueue.java:169)
>  ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at 
> io.debezium.pipeline.EventDispatcher$BufferingSnapshotChangeRecordReceiver.changeRecord(EventDispatcher.java:440)
>  ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at 
> io.debezium.pipeline.EventDispatcher$1.changeRecord(EventDispatcher.java:166) 
> ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at 
> io.debezium.relational.RelationalChangeRecordEmitter.emitReadRecord(RelationalChangeRecordEmitter.java:120)
>  ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at 
> io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:57)
>  ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at 
> io.debezium.pipeline.EventDispatcher.dispatchSnapshotEvent(EventDispatcher.java:155)
>  ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at 
> io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEventsForTable(RelationalSnapshotChangeEventSource.java:407)
>  ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at 
> io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEvents(RelationalSnapshotChangeEventSource.java:316)
>  ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at 
> io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:132)
>  ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at 
> io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:76)
>  ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at 
> io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:155)
>  ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at 
> io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:137)
>  ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at 
> io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:109)
>  ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> [?:1.8.0_381]
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_381]
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  [?:1.8.0_381]
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  [?:1.8.0_381]
> at java.lang.Thread.run(Thread.java:750) [?:1.8.0_381]
> 2024-05-28 15:41:07,378 INFO 
> io.debezium.pipeline.ChangeEventSourceCoordinator [] - Connected metrics set 
> to 'false'
> 2024-05-28 15:41:07,380 INFO io.debezium.jdbc.JdbcConnection [] - Connection 
> gracefully closed
> {code}
> image
> What You Expected?
> When synchronizing a PostgreSQL table using a connector, if the table 
> contains a large number of partitions, the checkpoint always fails. By 
> tracing the source code, it was found that the 's queries the schema of each 
> table. This schema querying during each checkpoint causes connection 
> timeouts.PostgresDialectqueryTableSchema
> Solution: Use caching. Each partitioned table should match and fetch the 
> schema only once using the following parameters:
>  
> tableList=(public)\.(aia_t_icc_jjdb.{*}|aia_t_vcs_fkdb.{*}|aia_t_vcs_pjdb.*|aia_t_vcs_dsrdb|aia_t_vcs_zjdb|case_log_test)
> How to Reproduce?
> Steps to Reproduce:
> Add a partitioned table in PostgreSQL.
> Create partitions for nearly ten years.
> Synchronize this table.
> Anything Else?
> Method Modification:
> I modified the method as follows and it works well:
>  
> {code:java}
> @Override
> public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc, TableId 
> tableId) {
> long startTime = System.nanoTime(); // Record start time
> String name = this.tableNameConverter.convert(tableId.table());
> TableId parentTableId = new TableId(null, tableId.schema(), name);
> TableChanges.TableChange tableChange = cache.get(parentTableId);
> if (tableChange == null) {
> LOG.info("[queryTableSchema begin] {}", tableId.identifier());
> if (schema == null)
> { schema = new CustomPostgresSchema((PostgresConnection) jdbc, sourceConfig); 
> }
> tableChange = schema.getTableSchema(tableId);
> LOG.info("[queryTableSchema end] {}", tableId.identifier());
> cache.put(parentTableId, tableChange);
> }
> long endTime = System.nanoTime(); 
> long duration = endTime - startTime; 
> LOG.info("[queryTableSchema duration] {} {} ms", tableId.identifier(), 
> duration / 1_000_000); // Convert nanoseconds to milliseconds
> return tableChange;
> }
> {code}
>  I am willing to submit a PR!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to