[
https://issues.apache.org/jira/browse/FLINK-36164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
tchivs updated FLINK-36164:
---------------------------
Attachment: image-2024-08-28-11-28-21-496.png
> JdbcIncrementalSource CheckPoint Timeout Due to Retrieving Schemas for All
> Subpartitions When Synchronizing a Partitioned PostgreSQL Table
> ------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-36164
> URL: https://issues.apache.org/jira/browse/FLINK-36164
> Project: Flink
> Issue Type: Improvement
> Components: Flink CDC
> Affects Versions: cdc-3.1.1
> Reporter: tchivs
> Priority: Major
> Attachments: image-2024-08-28-11-28-21-496.png,
> image-2024-08-28-11-28-33-016.png
>
>
> What's Wrong?
> When synchronizing a PostgreSQL table using a connector, if the table
> contains a large number of partitions, the checkpoint always fails. By
> tracing the source code, it was found that the PostgresDialect's
> queryTableSchema queries the schema of each table. This schema querying
> during each checkpoint causes connection timeouts.
> {code:java}
> JdbcIncrementalSource<String> incrSource =
> PostgresSourceBuilder.PostgresIncrementalSource.<String>builder()
> .hostname(hostname)
> .port(port)
> .database(databaseName)
> .schemaList(schemaName)
> .tableList(tableName)
> .username(username)
> .password(password)
> .deserializer(schema)
> .slotName(slotName)
> .decodingPluginName(config.get(DECODING_PLUGIN_NAME))
> .includeSchemaChanges(true)
> .debeziumProperties(debeziumProperties)
> .startupOptions(startupOptions)
> .splitSize(config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE))
> .splitMetaGroupSize(config.get(CHUNK_META_GROUP_SIZE))
> .fetchSize(config.get(SCAN_SNAPSHOT_FETCH_SIZE))
> .connectTimeout(config.get(CONNECT_TIMEOUT))
> .connectionPoolSize(config.get(CONNECTION_POOL_SIZE))
> .connectMaxRetries(config.get(CONNECT_MAX_RETRIES))
> .distributionFactorUpper(
>
> config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND))
> .distributionFactorLower(
>
> config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND))
> .heartbeatInterval(config.get(HEARTBEAT_INTERVAL))
> .build(); {code}
>
> check point Exception:
> {quote}org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint
> tolerable failure threshold. The latest checkpoint failed due to Checkpoint
> expired before completing., view the Checkpoint History tab or the Job
> Manager log to find out why continuous checkpoints failed.
> image
> {quote}
> error log:
>
> {code:java}
> 2024-05-28 15:39:07,367 INFO org.apache.flink.runtime.taskmanager.Task [] -
> Freeing task resources for jjdb: Writer -> jjdb: Committer (1/1)#0
> (dbec67a546f03f19bae3a56726a02174_788573959fc6fd87fb6bfd0ffc27d896_0_0).
> 2024-05-28 15:39:07,370 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task
> and sending final execution state CANCELED to JobManager for task fkdb:
> Writer -> fkdb: Committer (1/1)#0
> dbec67a546f03f19bae3a56726a02174_1992c3287130e2f49268dbcc909a7d1d_0_0.
> 2024-05-28 15:39:07,370 INFO org.apache.http.impl.execchain.RetryExec [] -
> I/O exception (java.net.SocketException) caught when processing request to
> {}->http://192.168.0.168:8040: Socket closed
> 2024-05-28 15:39:07,378 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task
> and sending final execution state CANCELED to JobManager for task jjdb:
> Writer -> jjdb: Committer (1/1)#0
> dbec67a546f03f19bae3a56726a02174_788573959fc6fd87fb6bfd0ffc27d896_0_0.
> 2024-05-28 15:39:37,363 WARN org.apache.flink.runtime.taskmanager.Task [] -
> Task 'Source: Postgres Source -> Process (1/1)#0' did not react to cancelling
> signal - interrupting; it is stuck for 30 seconds in method:
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
> org.apache.flink.runtime.taskmanager.Task$$Lambda$813/1723769838.run(Unknown
> Source)
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> java.lang.Thread.run(Thread.java:750)
> 2024-05-28 15:39:37,372 INFO io.debezium.embedded.EmbeddedEngine [] -
> Stopping the embedded engine
> 2024-05-28 15:39:37,373 ERROR com.ververica.cdc.debezium.internal.Handover []
> - Reporting error:
> java.lang.InterruptedException: null
> at java.lang.Object.wait(Native Method) ~[?:1.8.0_381]
> at java.lang.Object.wait(Object.java:502) ~[?:1.8.0_381]
> at com.ververica.cdc.debezium.internal.Handover.produce(Handover.java:115)
> ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at
> com.ververica.cdc.debezium.internal.DebeziumChangeConsumer.handleBatch(DebeziumChangeConsumer.java:54)
> ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at
> io.debezium.embedded.ConvertingEngineBuilder.lambda$notifying$2(ConvertingEngineBuilder.java:83)
> ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:822)
> [flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at
> io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:192)
> [flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [?:1.8.0_381]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [?:1.8.0_381]
> at java.lang.Thread.run(Thread.java:750) [?:1.8.0_381]
> 2024-05-28 15:39:37,375 INFO io.debezium.embedded.EmbeddedEngine [] -
> Stopping the task and engine
> 2024-05-28 15:39:37,375 INFO io.debezium.connector.common.BaseSourceTask [] -
> Stopping down connector
> 2024-05-28 15:41:07,376 WARN
> io.debezium.pipeline.ChangeEventSourceCoordinator [] - Coordinator didn't
> stop in the expected time, shutting down executor now
> 2024-05-28 15:41:07,377 WARN
> io.debezium.pipeline.source.AbstractSnapshotChangeEventSource [] - Snapshot
> was interrupted before completion
> 2024-05-28 15:41:07,377 INFO
> io.debezium.pipeline.source.AbstractSnapshotChangeEventSource [] - Snapshot -
> Final stage
> 2024-05-28 15:41:07,377 WARN
> io.debezium.pipeline.ChangeEventSourceCoordinator [] - Change event source
> executor was interrupted
> java.lang.InterruptedException: null
> at java.lang.Object.wait(Native Method) ~[?:1.8.0_381]
> at
> io.debezium.connector.base.ChangeEventQueue.doEnqueue(ChangeEventQueue.java:204)
> ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at
> io.debezium.connector.base.ChangeEventQueue.enqueue(ChangeEventQueue.java:169)
> ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at
> io.debezium.pipeline.EventDispatcher$BufferingSnapshotChangeRecordReceiver.changeRecord(EventDispatcher.java:440)
> ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at
> io.debezium.pipeline.EventDispatcher$1.changeRecord(EventDispatcher.java:166)
> ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at
> io.debezium.relational.RelationalChangeRecordEmitter.emitReadRecord(RelationalChangeRecordEmitter.java:120)
> ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at
> io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:57)
> ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at
> io.debezium.pipeline.EventDispatcher.dispatchSnapshotEvent(EventDispatcher.java:155)
> ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at
> io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEventsForTable(RelationalSnapshotChangeEventSource.java:407)
> ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at
> io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEvents(RelationalSnapshotChangeEventSource.java:316)
> ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at
> io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:132)
> ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at
> io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:76)
> ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at
> io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:155)
> ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at
> io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:137)
> ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at
> io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:109)
> ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> [?:1.8.0_381]
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_381]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [?:1.8.0_381]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [?:1.8.0_381]
> at java.lang.Thread.run(Thread.java:750) [?:1.8.0_381]
> 2024-05-28 15:41:07,378 INFO
> io.debezium.pipeline.ChangeEventSourceCoordinator [] - Connected metrics set
> to 'false'
> 2024-05-28 15:41:07,380 INFO io.debezium.jdbc.JdbcConnection [] - Connection
> gracefully closed
> {code}
> image
> What You Expected?
> When synchronizing a PostgreSQL table using a connector, if the table
> contains a large number of partitions, the checkpoint always fails. By
> tracing the source code, it was found that the 's queries the schema of each
> table. This schema querying during each checkpoint causes connection
> timeouts.PostgresDialectqueryTableSchema
> Solution: Use caching. Each partitioned table should match and fetch the
> schema only once using the following parameters:
>
> tableList=(public)\.(aia_t_icc_jjdb.{*}|aia_t_vcs_fkdb.{*}|aia_t_vcs_pjdb.*|aia_t_vcs_dsrdb|aia_t_vcs_zjdb|case_log_test)
> How to Reproduce?
> Steps to Reproduce:
> Add a partitioned table in PostgreSQL.
> Create partitions for nearly ten years.
> Synchronize this table.
> Anything Else?
> Method Modification:
> I modified the method as follows and it works well:
>
> {code:java}
> @Override
> public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc, TableId
> tableId) {
> long startTime = System.nanoTime(); // Record start time
> String name = this.tableNameConverter.convert(tableId.table());
> TableId parentTableId = new TableId(null, tableId.schema(), name);
> TableChanges.TableChange tableChange = cache.get(parentTableId);
> if (tableChange == null) {
> LOG.info("[queryTableSchema begin] {}", tableId.identifier());
> if (schema == null)
> { schema = new CustomPostgresSchema((PostgresConnection) jdbc, sourceConfig);
> }
> tableChange = schema.getTableSchema(tableId);
> LOG.info("[queryTableSchema end] {}", tableId.identifier());
> cache.put(parentTableId, tableChange);
> }
> long endTime = System.nanoTime();
> long duration = endTime - startTime;
> LOG.info("[queryTableSchema duration] {} {} ms", tableId.identifier(),
> duration / 1_000_000); // Convert nanoseconds to milliseconds
> return tableChange;
> }
> {code}
> I am willing to submit a PR!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)