CloverDew opened a new issue, #10647: URL: https://github.com/apache/seatunnel/issues/10647
### Search before asking - [x] I had searched in the [issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### What happened **What happened:** When running a Flink job with MySQL CDC source and JDBC Exactly-Once sink connector with schema evolution enabled, the job hangs indefinitely at certain checkpoints during DDL operations. The issue occurs specifically when DDL events (ALTER TABLE) are processed concurrently with XA transactions used by the exactly-once sink. ``` 2026-03-24 11:43:03,144 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 7 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1774345383143 for job 4dabfd84185b4f842ef1738f89d84465. # Job hangs here indefinitely, no further checkpoint progress ``` **Root Cause:** The hang is caused by a self-deadlock in JdbcExactlyOnceSinkWriter.reOpenOutputFormat() method: When a DDL event occurs, reOpenOutputFormat() calls this.prepareCommit() first, which puts an XA transaction into PREPARED state The PREPARED XA transaction holds MySQL Metadata Locks (MDL) on the table Then reOpenOutputFormat() tries to execute ALTER TABLE on the same connection The ALTER TABLE gets blocked by the MDL locks held by the prepared XA transaction on the same connection This creates a self-deadlock situation where the connection waits for itself The issue occurs in translation layer when SchemaOperator forwards DDL events to JdbcExactlyOnceSinkWriter MySQL's XA transactions acquire MDL locks that conflict with DDL operations The prepared XA transaction from previous checkpoint blocks the current DDL execution ### SeaTunnel Version All of them have this problem. ### SeaTunnel Config ```conf source { MySQL-CDC { server-id = 5652-5657 username = "mysqluser" password = "mysqlpw" table-names = ["shop.products"] base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop" } } sink { Jdbc { url = "jdbc:mysql://mysql_cdc_e2e:3306/shop" driver = "com.mysql.cj.jdbc.Driver" user = "mysqluser" password = "mysqlpw" generate_sink_sql = true database = "shop" table = "mysql_cdc_e2e_sink_table_with_schema_change_exactly_once" support_upsert_by_query_primary_key_exist = true is_exactly_once = true } } ``` ### Running Command ```shell ------- ``` ### Error Exception ```log **Error Logs:** 2026-03-23 21:06:13,458 INFO io.debezium.connector.mysql.MySqlStreamingChangeEventSource - Connected to MySQL binlog at localhost:3306, starting at MySqlOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.mysql.Source:STRUCT}, sourceInfo=SourceInfo [currentGtid=null, currentBinlogFilename=DESKTOP-HNU0D1U-bin.000008, currentBinlogPosition=6577, currentRowNumber=0, serverId=0, sourceTime=null, threadId=-1, currentQuery=null, tableIds=[], databaseName=null], snapshotCompleted=false, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], restartGtidSet=null, currentGtidSet=null, restartBinlogFilename=DESKTOP-HNU0D1U-bin.000008, restartBinlogPosition=6577, restartRowsToSkip=0, restartEventsToSkip=0, currentEventLengthInBytes=0, inTransaction=false, transactionId=null, incrementalSnapshotContext =IncrementalSnapshotContext [windowOpened=false, chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maxim umKey=null]] 2026-03-23 21:06:13,458 INFO io.debezium.connector.mysql.MySqlStreamingChangeEventSource - Waiting for keepalive thread to start 2026-03-23 21:06:13,459 INFO io.debezium.util.Threads - Creating thread debezium-mysqlconnector-mysql_binlog_source-binlog-client 2026-03-23 21:06:13,499 INFO org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.FieldNamedPreparedStatement - PrepareStatement sql is: INSERT INTO `shop`.`mysql_cdc_e2e_sink_table_with_schema_change_exactly_once` (`id`, `name`, `description`, `weight`) VALUES (?, ?, ?, ?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`), `name`=VALUES(`name`), `description`=VALUES(`description`), `weight`=VALUES(`weight`) 2026-03-23 21:06:13,503 INFO org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.FieldNamedPreparedStatement - PrepareStatement sql is: DELETE FROM `shop`.`mysql_cdc_e2e_sink_table_with_schema_change_exactly_once` WHERE `id` = ? 2026-03-23 21:06:13,562 INFO io.debezium.connector.mysql.MySqlStreamingChangeEventSource - Keepalive thread is running 2026-03-23 21:06:13,650 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1774271173638 for job b1fbcd22a40d4a881708ad26b7a86a1b. 2026-03-23 21:06:13,710 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 1 for job b1fbcd22a40d4a881708ad26b7a86a1b (16073 bytes, checkpointDuration=68 ms, finalizationTime=4 ms). 2026-03-23 21:06:13,712 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator - Marking checkpoint 1 as completed for source Source: MySQL-CDC-Source. 2026-03-23 21:06:18,638 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 2 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1774271178637 for job b1fbcd22a40d4a881708ad26b7a86a1b. 2026-03-23 21:06:18,648 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 2 for job b1fbcd22a40d4a881708ad26b7a86a1b (17820 bytes, checkpointDuration=1 ms, finalizationTime=10 ms). 2026-03-23 21:06:18,648 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator - Marking checkpoint 2 as completed for source Source: MySQL-CDC-Source. 2026-03-23 21:06:18,670 INFO org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcSinkAggregatedCommitter - commit xid: [XidInfo(xid=201:3361643061393462656564343466333639393934613065323361323737333635000000007f93cd1a9d010000:74eef017, attempts=0)] 2026-03-23 21:06:18,670 INFO org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOpsImpl - commit 1 transactions 2026-03-23 21:06:18,670 INFO org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOpsImpl - committing 201:3361643061393462656564343466333639393934613065323361323737333635000000007f93cd1a9d010000:74eef017 transaction 2026-03-23 21:06:23,644 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 3 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1774271183643 for job b1fbcd22a40d4a881708ad26b7a86a1b. 2026-03-23 21:06:23,649 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 3 for job b1fbcd22a40d4a881708ad26b7a86a1b (17852 bytes, checkpointDuration=6 ms, finalizationTime=0 ms). 2026-03-23 21:06:23,650 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator - Marking checkpoint 3 as completed for source Source: MySQL-CDC-Source. 2026-03-23 21:06:23,650 INFO org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcSinkAggregatedCommitter - commit xid: [XidInfo(xid=201:3361643061393462656564343466333639393934613065323361323737333635000000008093cd1a9d010000:74eef017, attempts=0)] 2026-03-23 21:06:23,650 INFO org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOpsImpl - commit 1 transactions 2026-03-23 21:06:23,650 INFO org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOpsImpl - committing 201:3361643061393462656564343466333639393934613065323361323737333635000000008093cd1a9d010000:74eef017 transaction 2026-03-23 21:06:28,636 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 4 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1774271188636 for job b1fbcd22a40d4a881708ad26b7a86a1b. 2026-03-23 21:06:28,643 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 4 for job b1fbcd22a40d4a881708ad26b7a86a1b (17852 bytes, checkpointDuration=7 ms, finalizationTime=0 ms). 2026-03-23 21:06:28,644 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator - Marking checkpoint 4 as completed for source Source: MySQL-CDC-Source. 2026-03-23 21:06:28,645 INFO org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcSinkAggregatedCommitter - commit xid: [XidInfo(xid=201:3361643061393462656564343466333639393934613065323361323737333635000000008193cd1a9d010000:74eef017, attempts=0)] 2026-03-23 21:06:28,645 INFO org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOpsImpl - commit 1 transactions 2026-03-23 21:06:28,645 INFO org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOpsImpl - committing 201:3361643061393462656564343466333639393934613065323361323737333635000000008193cd1a9d010000:74eef017 transaction 2026-03-23 21:06:28,921 WARN org.apache.seatunnel.connectors.cdc.base.schema.AbstractSchemaChangeResolver - Ignoring statement for non-captured table ALTER TABLE products_on_hand ADD COLUMN add_column5 varchar(64) not null default 'yy' 2026-03-23 21:06:30,779 INFO org.apache.seatunnel.api.event.LoggingEventHandler - log event: MessageDelayedEvent(createdTime=1774271190779, jobId=b1fbcd22a40d4a881708ad26b7a86a1b, eventType=READER_MESSAGE_DELAYED, delayTime=2372, record=SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1774271188, file=DESKTOP-HNU0D1U-bin.000008, pos=9657, server_id=1}} ConnectRecord{topic='mysql_binlog_source', kafkaPartition=0, key=Struct{databaseName=shop}, keySchema=Schema{io.debezium.connector.mysql.SchemaChangeKey:STRUCT}, value=Struct{source=Struct{version=1.9.8.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1774271188407,db=shop,table=products,server_id=1,file=DESKTOP-HNU0D1U-bin.000008,pos=9446,row=0},databaseName=shop,ddl=alter table products ADD COLUMN add_column1 varchar(64) not null default 'yy',ADD COLUMN add_column2 int not null default 1,tableChanges=[Struct{type=ALTER,id="shop"."products",table=Struct{defaultCharsetName=utf 8mb4,primaryKeyColumnNames=[id],columns=[Struct{name=id,jdbcType=4,typeName=INT,typeExpression=INT,position=1,optional=false,autoIncremented=true,generated=true}, Struct{name=name,jdbcType=12,typeName=VARCHAR,typeExpression=VARCHAR,charsetName=utf8mb4,length=255,position=2,optional=false,autoIncremented=false,generated=false}, Struct{name=description,jdbcType=12,typeName=VARCHAR,typeExpression=VARCHAR,charsetName=utf8mb4,length=512,position=3,optional=true,autoIncremented=false,generated=false}, Struct{name=weight,jdbcType=6,typeName=FLOAT,typeExpression=FLOAT,position=4,optional=true,autoIncremented=false,generated=false}, Struct{name=add_column1,jdbcType=12,typeName=VARCHAR,typeExpression=VARCHAR,charsetName=utf8mb4,length=64,position=5,optional=false,autoIncremented=false,generated=false}, Struct{name=add_column2,jdbcType=4,typeName=INT,typeExpression=INT,position=6,optional=false,autoIncremented=false,generated=false}]}}]}, valueSchema=Schema{io.debezium.connector.mysql.SchemaCh angeValue:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}) 2026-03-23 21:06:30,779 INFO org.apache.kafka.connect.json.JsonConverterConfig - JsonConverterConfig values: converter.type = value decimal.format = BASE64 schemas.cache.size = 1000 schemas.enable = true 2026-03-23 21:06:30,797 WARN io.debezium.connector.mysql.MySqlValueConverters - Column is missing a character set: add_column1 VARCHAR(64) NOT NULL DEFAULT VALUE yy 2026-03-23 21:06:30,797 WARN io.debezium.connector.mysql.MySqlValueConverters - Using UTF-8 charset by default for column without charset: add_column1 VARCHAR(64) NOT NULL DEFAULT VALUE yy 2026-03-23 21:06:30,801 INFO org.apache.seatunnel.api.event.LoggingEventHandler - log event: AlterTableColumnsEvent(super=AlterTableEvent(super=TableEvent(createdTime=1774271190801, tableIdentifier=.shop.products, jobId=b1fbcd22a40d4a881708ad26b7a86a1b, statement=alter table products ADD COLUMN add_column1 varchar(64) not null default 'yy',ADD COLUMN add_column2 int not null default 1, sourceDialectName=MySQL, changeAfter=CatalogTable{tableId=MySQL.shop.products, tableSchema=TableSchema(primaryKey=PrimaryKey(primaryKey=PRIMARY, columnNames=[id], enableAutoId=null), constraintKeys=[]), options={table-name=shop.products, connector=jdbc, url=jdbc:mysql://localhost:3306/shop}, partitionKeys=[], comment='', catalogName='MySQL', metadata=MetadataSchema()})), events=[AlterTableAddColumnEvent(super=AlterTableColumnEvent(super=AlterTableEvent(super=TableEvent(createdTime=1774271190798, tableIdentifier=.shop.products, jobId=null, statement=null, sourceDialectName=MySQL, changeAfter=Catalog Table{tableId=MySQL.shop.products, tableSchema=TableSchema(primaryKey=PrimaryKey(primaryKey=PRIMARY, columnNames=[id], enableAutoId=null), constraintKeys=[]), options={table-name=shop.products, connector=jdbc, url=jdbc:mysql://localhost:3306/shop}, partitionKeys=[], comment='', catalogName='MySQL', metadata=MetadataSchema()}))), column=PhysicalColumn(super=Column(name=add_column1, dataType=STRING, columnLength=256, scale=null, nullable=false, defaultValue=yy, comment=null, sourceType=VARCHAR(64), sinkType=null, options=null, isUnsigned=false, isZeroFill=false, bitLen=2048, longColumnLength=256)), first=false, afterColumn=null), AlterTableAddColumnEvent(super=AlterTableColumnEvent(super=AlterTableEvent(super=TableEvent(createdTime=1774271190798, tableIdentifier=.shop.products, jobId=null, statement=null, sourceDialectName=MySQL, changeAfter=CatalogTable{tableId=MySQL.shop.products, tableSchema=TableSchema(primaryKey=PrimaryKey(primaryKey=PRIMARY, columnNames=[id], enableAutoId=null), constraintKeys=[]), options={table-name=shop.products, connector=jdbc, url=jdbc:mysql://localhost:3306/shop}, partitionKeys=[], comment='', catalogName='MySQL', metadata=MetadataSchema()}))), column=PhysicalColumn(super=Column(name=add_column2, dataType=INT, columnLength=null, scale=null, nullable=false, defaultValue=1, comment=null, sourceType=INT, sinkType=null, options=null, isUnsigned=false, isZeroFill=false, bitLen=0, longColumnLength=null)), first=false, afterColumn=null)]) 2026-03-23 21:06:30,919 INFO org.apache.seatunnel.translation.flink.schema.SchemaOperator - Starting schema change processing for table: .shop.products, job: b1fbcd22a40d4a881708ad26b7a86a1b, event time: 1774271190801 2026-03-23 21:06:30,920 INFO org.apache.seatunnel.translation.flink.schema.SchemaOperator - Broadcasting SchemaChangeEvent to all downstream sink subtasks for table: .shop.products 2026-03-23 21:06:30,921 INFO org.apache.seatunnel.translation.flink.schema.SchemaOperator - SchemaChangeEvent broadcast sent for table: .shop.products 2026-03-23 21:06:30,921 INFO org.apache.seatunnel.translation.flink.schema.SchemaOperator - Synchronously processing schema change for table .shop.products (epoch 1774271190801). Business data buffered. 2026-03-23 21:06:30,921 INFO org.apache.seatunnel.translation.flink.schema.coordinator.LocalSchemaCoordinator - Requesting schema change for table .shop.products (epoch 1774271190801). Waiting for all 1 sink subtasks to apply after checkpoint completion. 2026-03-23 21:06:31,026 INFO org.apache.seatunnel.translation.flink.schema.BroadcastSchemaSinkOperator - Subtask 0 applying schema change immediately for table .shop.products (epoch 1774271190801, change: AlterTableColumnsEvent). This prevents deadlock by allowing checkpoint barriers to propagate. 2026-03-23 21:06:31,035 INFO org.apache.seatunnel.translation.flink.sink.FlinkSinkWriter - FlinkSinkWriter applying SchemaChangeEvent for table: .shop.products 2026-03-23 21:06:31,035 INFO org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter - Start apply schema change for table shop.products sub-writer 0 2026-03-23 21:06:31,055 INFO org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect - Executing add column SQL: ALTER TABLE `shop`.`mysql_cdc_e2e_sink_table_with_schema_change_exactly_once` ADD COLUMN `add_column1` VARCHAR(64) NOT NULL DEFAULT 'yy' 2026-03-23 21:06:31,808 INFO org.apache.seatunnel.api.event.LoggingEventHandler - log event: MessageDelayedEvent(createdTime=1774271191808, jobId=b1fbcd22a40d4a881708ad26b7a86a1b, eventType=READER_MESSAGE_DELAYED, delayTime=3808, record=SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1774271188, file=DESKTOP-HNU0D1U-bin.000008, pos=9736, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.shop.products', kafkaPartition=null, key=Struct{id=110}, keySchema=Schema{mysql_binlog_source.shop.products.Key:STRUCT}, value=Struct{before=Struct{id=110,name=scooter,description=Small 2-wheel scooter,weight=3.140000104904175,add_column1=yy,add_column2=1},after=Struct{id=110,name=dailai,description=Small 2-wheel scooter,weight=3.140000104904175,add_column1=yy,add_column2=1},source=Struct{version=1.9.8.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1774271188000,db=shop,table=products,server_id=1,file=DESKTOP-HNU0D1U-b in.000008,pos=9891,row=0,thread=29},op=u,ts_ms=1774271188444}, valueSchema=Schema{mysql_binlog_source.shop.products.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}) 2026-03-23 21:06:31,811 INFO org.apache.seatunnel.api.event.LoggingEventHandler - log event: AlterTableColumnsEvent(super=AlterTableEvent(super=TableEvent(createdTime=1774271191811, tableIdentifier=.shop.products, jobId=b1fbcd22a40d4a881708ad26b7a86a1b, statement=alter table products ADD COLUMN add_column3 float not null default 1.1, sourceDialectName=MySQL, changeAfter=CatalogTable{tableId=MySQL.shop.products, tableSchema=TableSchema(primaryKey=PrimaryKey(primaryKey=PRIMARY, columnNames=[id], enableAutoId=null), constraintKeys=[]), options={table-name=shop.products, connector=jdbc, url=jdbc:mysql://localhost:3306/shop}, partitionKeys=[], comment='', catalogName='MySQL', metadata=MetadataSchema()})), events=[AlterTableAddColumnEvent(super=AlterTableColumnEvent(super=AlterTableEvent(super=TableEvent(createdTime=1774271191811, tableIdentifier=.shop.products, jobId=null, statement=null, sourceDialectName=MySQL, changeAfter=CatalogTable{tableId=MySQL.shop.products, tableSchema=TableS chema(primaryKey=PrimaryKey(primaryKey=PRIMARY, columnNames=[id], enableAutoId=null), constraintKeys=[]), options={table-name=shop.products, connector=jdbc, url=jdbc:mysql://localhost:3306/shop}, partitionKeys=[], comment='', catalogName='MySQL', metadata=MetadataSchema()}))), column=PhysicalColumn(super=Column(name=add_column3, dataType=FLOAT, columnLength=null, scale=null, nullable=false, defaultValue=1.1, comment=null, sourceType=FLOAT, sinkType=null, options=null, isUnsigned=false, isZeroFill=false, bitLen=0, longColumnLength=null)), first=false, afterColumn=null)]) 2026-03-23 21:06:31,824 INFO org.apache.seatunnel.api.event.LoggingEventHandler - log event: AlterTableColumnsEvent(super=AlterTableEvent(super=TableEvent(createdTime=1774271191813, tableIdentifier=.shop.products, jobId=b1fbcd22a40d4a881708ad26b7a86a1b, statement=alter table products ADD COLUMN add_column4 timestamp not null default current_timestamp(), sourceDialectName=MySQL, changeAfter=CatalogTable{tableId=MySQL.shop.products, tableSchema=TableSchema(primaryKey=PrimaryKey(primaryKey=PRIMARY, columnNames=[id], enableAutoId=null), constraintKeys=[]), options={table-name=shop.products, connector=jdbc, url=jdbc:mysql://localhost:3306/shop}, partitionKeys=[], comment='', catalogName='MySQL', metadata=MetadataSchema()})), events=[AlterTableAddColumnEvent(super=AlterTableColumnEvent(super=AlterTableEvent(super=TableEvent(createdTime=1774271191813, tableIdentifier=.shop.products, jobId=null, statement=null, sourceDialectName=MySQL, changeAfter=CatalogTable{tableId=MySQL.shop.products , tableSchema=TableSchema(primaryKey=PrimaryKey(primaryKey=PRIMARY, columnNames=[id], enableAutoId=null), constraintKeys=[]), options={table-name=shop.products, connector=jdbc, url=jdbc:mysql://localhost:3306/shop}, partitionKeys=[], comment='', catalogName='MySQL', metadata=MetadataSchema()}))), column=PhysicalColumn(super=Column(name=add_column4, dataType=TIMESTAMP, columnLength=null, scale=0, nullable=false, defaultValue=current_timestamp(), comment=null, sourceType=TIMESTAMP, sinkType=null, options=null, isUnsigned=false, isZeroFill=false, bitLen=0, longColumnLength=null)), first=false, afterColumn=null)]) 2026-03-23 21:06:32,839 WARN io.debezium.connector.mysql.MySqlValueConverters - Column is missing a character set: add_column6 VARCHAR(64) NOT NULL DEFAULT VALUE ff 2026-03-23 21:06:32,839 WARN io.debezium.connector.mysql.MySqlValueConverters - Using UTF-8 charset by default for column without charset: add_column6 VARCHAR(64) NOT NULL DEFAULT VALUE ff 2026-03-23 21:06:32,839 INFO org.apache.seatunnel.api.event.LoggingEventHandler - log event: AlterTableColumnsEvent(super=AlterTableEvent(super=TableEvent(createdTime=1774271192839, tableIdentifier=.shop.products, jobId=b1fbcd22a40d4a881708ad26b7a86a1b, statement=alter table products ADD COLUMN add_column6 varchar(64) not null default 'ff' after id, sourceDialectName=MySQL, changeAfter=CatalogTable{tableId=MySQL.shop.products, tableSchema=TableSchema(primaryKey=PrimaryKey(primaryKey=PRIMARY, columnNames=[id], enableAutoId=null), constraintKeys=[]), options={table-name=shop.products, connector=jdbc, url=jdbc:mysql://localhost:3306/shop}, partitionKeys=[], comment='', catalogName='MySQL', metadata=MetadataSchema()})), events=[AlterTableAddColumnEvent(super=AlterTableColumnEvent(super=AlterTableEvent(super=TableEvent(createdTime=1774271192839, tableIdentifier=.shop.products, jobId=null, statement=null, sourceDialectName=MySQL, changeAfter=CatalogTable{tableId=MySQL.shop.products, ta bleSchema=TableSchema(primaryKey=PrimaryKey(primaryKey=PRIMARY, columnNames=[id], enableAutoId=null), constraintKeys=[]), options={table-name=shop.products, connector=jdbc, url=jdbc:mysql://localhost:3306/shop}, partitionKeys=[], comment='', catalogName='MySQL', metadata=MetadataSchema()}))), column=PhysicalColumn(super=Column(name=add_column6, dataType=STRING, columnLength=256, scale=null, nullable=false, defaultValue=ff, comment=null, sourceType=VARCHAR(64), sinkType=null, options=null, isUnsigned=false, isZeroFill=false, bitLen=2048, longColumnLength=256)), first=false, afterColumn=id)]) 2026-03-23 21:06:33,645 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 5 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1774271193644 for job b1fbcd22a40d4a881708ad26b7a86a1b. 2026-03-23 21:06:33,853 INFO org.apache.seatunnel.api.event.LoggingEventHandler - log event: MessageDelayedEvent(createdTime=1774271193853, jobId=b1fbcd22a40d4a881708ad26b7a86a1b, eventType=READER_MESSAGE_DELAYED, delayTime=5853, record=SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1774271188, file=DESKTOP-HNU0D1U-bin.000008, pos=13677, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.shop.products', kafkaPartition=null, key=Struct{id=115}, keySchema=Schema{mysql_binlog_source.shop.products.Key:STRUCT}, value=Struct{before=Struct{id=115,add_column6=ff,name=hammer,description=16oz carpenter's hammer,weight=1.0,add_column1=yy,add_column2=1,add_column3=1.100000023841858,add_column4=2026-03-23T13:06:28Z},source=Struct{version=1.9.8.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1774271188000,db=shop,table=products,server_id=1,file=DESKTOP-HNU0D1U-bin.000008,pos=13831,row=0,thread=29},op=d,ts_ms=1774271 188491}, valueSchema=Schema{mysql_binlog_source.shop.products.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}) # Job hangs here indefinitely, no further checkpoint progress ``` ### Zeta or Flink or Spark Version _No response_ ### Java or Scala Version _No response_ ### Screenshots _No response_ ### Are you willing to submit PR? - [x] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
