This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new f88a19453 [ISSUE #5040] Support gtid mode for sync data with mysql 
(#5041)
f88a19453 is described below

commit f88a19453c0b8c91f331dddbc7ef8ac13c3a9a8d
Author: mike_xwm <[email protected]>
AuthorDate: Wed Jul 24 14:34:18 2024 +0800

    [ISSUE #5040] Support gtid mode for sync data with mysql (#5041)
    
    * [ISSUE #5040] Support gtid mode for sync data with mysql
    
    * fix conflicts with master
    
    * fix checkstyle error
---
 eventmesh-admin-server/conf/eventmesh.sql          |   3 +
 .../conf/mapper/EventMeshMysqlPositionMapper.xml   |  28 +-
 .../web/db/entity/EventMeshMysqlPosition.java      |   6 +
 .../position/impl/MysqlPositionHandler.java        |   8 +-
 .../connector/rdb/canal/CanalSinkConfig.java       |   2 +
 .../connector/rdb/canal/CanalSourceConfig.java     |   4 +
 .../remote/offset/canal/CanalRecordOffset.java     |   5 +
 .../remote/offset/canal/CanalRecordPartition.java  |   2 +
 .../connector/canal/CanalConnectRecord.java        |   6 +
 .../connector/canal/dialect/AbstractDbDialect.java |   4 -
 .../connector/canal/dialect/DbDialect.java         |   2 -
 .../connector/canal/dialect/MysqlDialect.java      |   4 -
 .../interceptor/SqlBuilderLoadInterceptor.java     |  24 +-
 .../connector/canal/sink/DbLoadContext.java        |   2 +
 .../sink/{DbLoadContext.java => GtidBatch.java}    |  37 +-
 .../{DbLoadContext.java => GtidBatchManager.java}  |  32 +-
 .../canal/sink/connector/CanalSinkConnector.java   | 404 +++++++++++++++------
 .../connector/canal/source/EntryParser.java        |  33 +-
 .../source/connector/CanalSourceConnector.java     |  51 ++-
 19 files changed, 452 insertions(+), 205 deletions(-)

diff --git a/eventmesh-admin-server/conf/eventmesh.sql 
b/eventmesh-admin-server/conf/eventmesh.sql
index 3b6fc9b77..226101661 100644
--- a/eventmesh-admin-server/conf/eventmesh.sql
+++ b/eventmesh-admin-server/conf/eventmesh.sql
@@ -71,8 +71,11 @@ CREATE TABLE IF NOT EXISTS `event_mesh_job_info` (
 CREATE TABLE IF NOT EXISTS `event_mesh_mysql_position` (
   `id` int unsigned NOT NULL AUTO_INCREMENT,
   `jobID` int unsigned NOT NULL,
+  `serverUUID` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci 
DEFAULT NULL,
   `address` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT 
NULL,
   `position` bigint DEFAULT NULL,
+  `gtid` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT 
NULL,
+  `currentGtid` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci 
DEFAULT NULL,
   `timestamp` bigint DEFAULT NULL,
   `journalName` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
   `createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
diff --git 
a/eventmesh-admin-server/conf/mapper/EventMeshMysqlPositionMapper.xml 
b/eventmesh-admin-server/conf/mapper/EventMeshMysqlPositionMapper.xml
index bc3a3292a..cbb7c094d 100644
--- a/eventmesh-admin-server/conf/mapper/EventMeshMysqlPositionMapper.xml
+++ b/eventmesh-admin-server/conf/mapper/EventMeshMysqlPositionMapper.xml
@@ -16,24 +16,28 @@
   limitations under the License.
   -->
 <!DOCTYPE mapper
-        PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
-        "http://mybatis.org/dtd/mybatis-3-mapper.dtd";>
+    PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
+    "http://mybatis.org/dtd/mybatis-3-mapper.dtd";>
 <mapper 
namespace="org.apache.eventmesh.admin.server.web.db.mapper.EventMeshMysqlPositionMapper">
 
     <resultMap id="BaseResultMap" 
type="org.apache.eventmesh.admin.server.web.db.entity.EventMeshMysqlPosition">
-            <id property="id" column="id" jdbcType="INTEGER"/>
-            <result property="jobID" column="jobID" jdbcType="INTEGER"/>
-            <result property="address" column="address" jdbcType="VARCHAR"/>
-            <result property="position" column="position" jdbcType="BIGINT"/>
-            <result property="timestamp" column="timestamp" jdbcType="BIGINT"/>
-            <result property="journalName" column="journalName" 
jdbcType="VARCHAR"/>
-            <result property="createTime" column="createTime" 
jdbcType="TIMESTAMP"/>
-            <result property="updateTime" column="updateTime" 
jdbcType="TIMESTAMP"/>
+        <id property="id" column="id" jdbcType="INTEGER"/>
+        <result property="jobID" column="jobID" jdbcType="INTEGER"/>
+        <result property="serverUUID" column="serverUUID" jdbcType="VARCHAR"/>
+        <result property="address" column="address" jdbcType="VARCHAR"/>
+        <result property="position" column="position" jdbcType="BIGINT"/>
+        <result property="gtid" column="gtid" jdbcType="VARCHAR"/>
+        <result property="currentGtid" column="currentGtid" 
jdbcType="VARCHAR"/>
+        <result property="timestamp" column="timestamp" jdbcType="BIGINT"/>
+        <result property="journalName" column="journalName" 
jdbcType="VARCHAR"/>
+        <result property="createTime" column="createTime" 
jdbcType="TIMESTAMP"/>
+        <result property="updateTime" column="updateTime" 
jdbcType="TIMESTAMP"/>
     </resultMap>
 
     <sql id="Base_Column_List">
-        id,jobID,address,
-        position,timestamp,journalName,
+        id
+        ,jobID,serverUUID,address,
+        position,gtid,currentGtid,timestamp,journalName,
         createTime,updateTime
     </sql>
 </mapper>
diff --git 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshMysqlPosition.java
 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshMysqlPosition.java
index ffe3e446d..65a38b54b 100644
--- 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshMysqlPosition.java
+++ 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshMysqlPosition.java
@@ -38,10 +38,16 @@ public class EventMeshMysqlPosition implements Serializable 
{
 
     private Integer jobID;
 
+    private String serverUUID;
+
     private String address;
 
     private Long position;
 
+    private String gtid;
+
+    private String currentGtid;
+
     private Long timestamp;
 
     private String journalName;
diff --git 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java
 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java
index 525fe02c0..f2c174c3b 100644
--- 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java
+++ 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java
@@ -115,9 +115,12 @@ public class MysqlPositionHandler extends PositionHandler {
                 CanalRecordOffset offset = (CanalRecordOffset) 
recordPosition.getRecordOffset();
                 if (offset != null) {
                     position.setPosition(offset.getOffset());
+                    position.setGtid(offset.getGtid());
+                    position.setCurrentGtid(offset.getCurrentGtid());
                 }
                 CanalRecordPartition partition = (CanalRecordPartition) 
recordPosition.getRecordPartition();
                 if (partition != null) {
+                    position.setServerUUID(partition.getServerUUID());
                     position.setTimestamp(partition.getTimeStamp());
                     position.setJournalName(partition.getJournalName());
                 }
@@ -148,13 +151,16 @@ public class MysqlPositionHandler extends PositionHandler 
{
             request.getJobID()));
         List<RecordPosition> recordPositionList = new ArrayList<>();
         for (EventMeshMysqlPosition position : positionList) {
-            RecordPosition recordPosition = new RecordPosition();
             CanalRecordPartition partition = new CanalRecordPartition();
             partition.setTimeStamp(position.getTimestamp());
             partition.setJournalName(position.getJournalName());
+            partition.setServerUUID(position.getServerUUID());
+            RecordPosition recordPosition = new RecordPosition();
             recordPosition.setRecordPartition(partition);
             CanalRecordOffset offset = new CanalRecordOffset();
             offset.setOffset(position.getPosition());
+            offset.setGtid(position.getGtid());
+            offset.setCurrentGtid(position.getCurrentGtid());
             recordPosition.setRecordOffset(offset);
             recordPositionList.add(recordPosition);
         }
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java
index 85484b2ce..80aec7bfe 100644
--- 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java
@@ -39,6 +39,8 @@ public class CanalSinkConfig extends SinkConfig {
     // sync mode: field/row
     private SyncMode syncMode;
 
+    private boolean isGTIDMode = true;
+
     // skip sink process exception
     private Boolean skipException = false;
 
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java
index d75ceb6b5..707f10290 100644
--- 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java
@@ -45,6 +45,10 @@ public class CanalSourceConfig extends SourceConfig {
 
     private Short clientId;
 
+    private String serverUUID;
+
+    private boolean isGTIDMode = true;
+
     private Integer batchSize = 10000;
 
     private Long batchTimeout = -1L;
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalRecordOffset.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalRecordOffset.java
index 90c94c99b..d0f2053f4 100644
--- 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalRecordOffset.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalRecordOffset.java
@@ -30,6 +30,11 @@ public class CanalRecordOffset extends RecordOffset {
 
     private Long offset;
 
+    // mysql instance gtid range
+    private String gtid;
+
+    private String currentGtid;
+
     @Override
     public Class<? extends RecordOffset> getRecordOffsetClass() {
         return CanalRecordOffset.class;
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalRecordPartition.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalRecordPartition.java
index 72d404bab..ded82306e 100644
--- 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalRecordPartition.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalRecordPartition.java
@@ -29,6 +29,8 @@ import lombok.ToString;
 @ToString
 public class CanalRecordPartition extends RecordPartition {
 
+    private String serverUUID;
+
     private String journalName;
 
     private Long timeStamp;
diff --git 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/CanalConnectRecord.java
 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/CanalConnectRecord.java
index a723b24dc..36ecd158f 100644
--- 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/CanalConnectRecord.java
+++ 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/CanalConnectRecord.java
@@ -31,8 +31,14 @@ import lombok.Data;
 public class CanalConnectRecord {
 
     private String schemaName;
+
     private String tableName;
 
+    // mysql instance gtid range
+    private String gtid;
+
+    private String currentGtid;
+
     /**
      * The business type of the changed data (I/U/D/C/A/E), consistent with 
the EventType defined in EntryProtocol in canal.
      */
diff --git 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/AbstractDbDialect.java
 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/AbstractDbDialect.java
index f5c2245b9..4cf0f82ec 100644
--- 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/AbstractDbDialect.java
+++ 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/AbstractDbDialect.java
@@ -97,10 +97,6 @@ public abstract class AbstractDbDialect implements DbDialect 
{
         return sqlTemplate;
     }
 
-    public boolean isDRDS() {
-        return false;
-    }
-
     public String getShardColumns(String schema, String table) {
         return null;
     }
diff --git 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/DbDialect.java
 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/DbDialect.java
index a18edfd5b..781c2fe95 100644
--- 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/DbDialect.java
+++ 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/DbDialect.java
@@ -48,8 +48,6 @@ public interface DbDialect {
 
     public boolean isSupportMergeSql();
 
-    public boolean isDRDS();
-
     public LobHandler getLobHandler();
 
     public JdbcTemplate getJdbcTemplate();
diff --git 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/MysqlDialect.java
 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/MysqlDialect.java
index acd491ba6..bfe562871 100644
--- 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/MysqlDialect.java
+++ 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/MysqlDialect.java
@@ -50,10 +50,6 @@ public class MysqlDialect extends AbstractDbDialect {
         return null;
     }
 
-    public boolean isDRDS() {
-        return false;
-    }
-
     public String getDefaultCatalog() {
         return jdbcTemplate.queryForObject("select database()", String.class);
     }
diff --git 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java
 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java
index 24d6b42f8..0ad07577f 100644
--- 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java
+++ 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java
@@ -51,35 +51,21 @@ public class SqlBuilderLoadInterceptor {
         String shardColumns = null;
 
         if (type.isInsert()) {
-            if (CollectionUtils.isEmpty(record.getColumns())
-                && (dbDialect.isDRDS())) {
-                // sql
-                sql = sqlTemplate.getInsertSql(schemaName,
-                    record.getTableName(),
-                    buildColumnNames(record.getKeys()),
-                    buildColumnNames(record.getColumns()));
-            } else {
-                sql = sqlTemplate.getMergeSql(schemaName,
+            sql = sqlTemplate.getMergeSql(schemaName,
                     record.getTableName(),
                     buildColumnNames(record.getKeys()),
                     buildColumnNames(record.getColumns()),
                     new String[] {},
-                    !dbDialect.isDRDS(),
+                    true,
                     shardColumns);
-            }
         } else if (type.isUpdate()) {
-
             boolean existOldKeys = 
!CollectionUtils.isEmpty(record.getOldKeys());
             boolean rowMode = sinkConfig.getSyncMode().isRow();
             String[] keyColumns = null;
             String[] otherColumns = null;
             if (existOldKeys) {
                 keyColumns = buildColumnNames(record.getOldKeys());
-                if (dbDialect.isDRDS()) {
-                    otherColumns = 
buildColumnNames(record.getUpdatedColumns(), record.getUpdatedKeys());
-                } else {
-                    otherColumns = 
buildColumnNames(record.getUpdatedColumns(), record.getKeys());
-                }
+                otherColumns = buildColumnNames(record.getUpdatedColumns(), 
record.getKeys());
             } else {
                 keyColumns = buildColumnNames(record.getKeys());
                 otherColumns = buildColumnNames(record.getUpdatedColumns());
@@ -91,10 +77,10 @@ public class SqlBuilderLoadInterceptor {
                     keyColumns,
                     otherColumns,
                     new String[] {},
-                    !dbDialect.isDRDS(),
+                    true,
                     shardColumns);
             } else {
-                sql = sqlTemplate.getUpdateSql(schemaName, 
record.getTableName(), keyColumns, otherColumns, !dbDialect.isDRDS(), 
shardColumns);
+                sql = sqlTemplate.getUpdateSql(schemaName, 
record.getTableName(), keyColumns, otherColumns, true, shardColumns);
             }
         } else if (type.isDelete()) {
             sql = sqlTemplate.getDeleteSql(schemaName,
diff --git 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/DbLoadContext.java
 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/DbLoadContext.java
index 561d89487..3498e87e7 100644
--- 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/DbLoadContext.java
+++ 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/DbLoadContext.java
@@ -28,6 +28,8 @@ import lombok.Data;
 @Data
 public class DbLoadContext {
 
+    private String gtid;
+
     private List<CanalConnectRecord> lastProcessedRecords;
 
     private List<CanalConnectRecord> prepareRecords;
diff --git 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/DbLoadContext.java
 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/GtidBatch.java
similarity index 56%
copy from 
eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/DbLoadContext.java
copy to 
eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/GtidBatch.java
index 561d89487..dd6559b83 100644
--- 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/DbLoadContext.java
+++ 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/GtidBatch.java
@@ -19,29 +19,30 @@ package org.apache.eventmesh.connector.canal.sink;
 
 import org.apache.eventmesh.connector.canal.CanalConnectRecord;
 
-import java.util.Collections;
-import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
 
-import lombok.Data;
+public class GtidBatch {
+    private int totalBatches;
+    private List<List<CanalConnectRecord>> batches;
+    private int receivedBatchCount;
 
-@Data
-public class DbLoadContext {
-
-    private List<CanalConnectRecord> lastProcessedRecords;
-
-    private List<CanalConnectRecord> prepareRecords;
-
-    private List<CanalConnectRecord> processedRecords;
-
-    private List<CanalConnectRecord> failedRecords;
+    public GtidBatch(int totalBatches) {
+        this.totalBatches = totalBatches;
+        this.batches = new CopyOnWriteArrayList<>(new List[totalBatches]);
+        this.receivedBatchCount = 0;
+    }
 
-    public DbLoadContext() {
-        lastProcessedRecords = Collections.synchronizedList(new 
LinkedList<>());
-        prepareRecords = Collections.synchronizedList(new LinkedList<>());
-        processedRecords = Collections.synchronizedList(new LinkedList<>());
-        failedRecords = Collections.synchronizedList(new LinkedList<>());
+    public void addBatch(int batchIndex, List<CanalConnectRecord> 
batchRecords) {
+        batches.set(batchIndex, batchRecords);
+        receivedBatchCount++;
     }
 
+    public List<List<CanalConnectRecord>> getBatches() {
+        return batches;
+    }
 
+    public boolean isComplete() {
+        return receivedBatchCount == totalBatches;
+    }
 }
diff --git 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/DbLoadContext.java
 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/GtidBatchManager.java
similarity index 55%
copy from 
eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/DbLoadContext.java
copy to 
eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/GtidBatchManager.java
index 561d89487..30060aa8f 100644
--- 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/DbLoadContext.java
+++ 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/GtidBatchManager.java
@@ -19,29 +19,27 @@ package org.apache.eventmesh.connector.canal.sink;
 
 import org.apache.eventmesh.connector.canal.CanalConnectRecord;
 
-import java.util.Collections;
-import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
 
-import lombok.Data;
+public class GtidBatchManager {
 
-@Data
-public class DbLoadContext {
+    private static ConcurrentHashMap<String, GtidBatch> gtidBatchMap = new 
ConcurrentHashMap<>();
 
-    private List<CanalConnectRecord> lastProcessedRecords;
-
-    private List<CanalConnectRecord> prepareRecords;
-
-    private List<CanalConnectRecord> processedRecords;
-
-    private List<CanalConnectRecord> failedRecords;
+    public static void addBatch(String gtid, int batchIndex, int totalBatches, 
List<CanalConnectRecord> batchRecords) {
+        gtidBatchMap.computeIfAbsent(gtid, k -> new 
GtidBatch(totalBatches)).addBatch(batchIndex, batchRecords);
+    }
 
-    public DbLoadContext() {
-        lastProcessedRecords = Collections.synchronizedList(new 
LinkedList<>());
-        prepareRecords = Collections.synchronizedList(new LinkedList<>());
-        processedRecords = Collections.synchronizedList(new LinkedList<>());
-        failedRecords = Collections.synchronizedList(new LinkedList<>());
+    public static GtidBatch getGtidBatch(String gtid) {
+        return gtidBatchMap.get(gtid);
     }
 
+    public static boolean isComplete(String gtid) {
+        GtidBatch batch = gtidBatchMap.get(gtid);
+        return batch != null && batch.isComplete();
+    }
 
+    public static void removeGtidBatch(String gtid) {
+        gtidBatchMap.remove(gtid);
+    }
 }
diff --git 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java
 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java
index 8f9df7595..5f3c0a2bc 100644
--- 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java
@@ -31,6 +31,8 @@ import 
org.apache.eventmesh.connector.canal.sink.DbLoadContext;
 import org.apache.eventmesh.connector.canal.sink.DbLoadData;
 import org.apache.eventmesh.connector.canal.sink.DbLoadData.TableLoadData;
 import org.apache.eventmesh.connector.canal.sink.DbLoadMerger;
+import org.apache.eventmesh.connector.canal.sink.GtidBatch;
+import org.apache.eventmesh.connector.canal.sink.GtidBatchManager;
 import org.apache.eventmesh.connector.canal.source.table.RdbTableMgr;
 import org.apache.eventmesh.openconnect.api.ConnectorCreateService;
 import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
@@ -38,7 +40,6 @@ import 
org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext;
 import org.apache.eventmesh.openconnect.api.sink.Sink;
 import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
 
-
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 
@@ -52,6 +53,7 @@ import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -86,9 +88,12 @@ public class CanalSinkConnector implements Sink, 
ConnectorCreateService<Sink> {
 
     private ExecutorService executor;
 
+    private ExecutorService gtidSingleExecutor;
+
     private int batchSize = 50;
 
     private boolean useBatch = true;
+
     private RdbTableMgr tableMgr;
 
     @Override
@@ -123,6 +128,7 @@ public class CanalSinkConnector implements Sink, 
ConnectorCreateService<Sink> {
             new ArrayBlockingQueue<>(sinkConfig.getPoolSize() * 4),
             new NamedThreadFactory("canalSink"),
             new ThreadPoolExecutor.CallerRunsPolicy());
+        gtidSingleExecutor = Executors.newSingleThreadExecutor(r -> new 
Thread(r, "gtidSingleExecutor"));
     }
 
     @Override
@@ -143,6 +149,7 @@ public class CanalSinkConnector implements Sink, 
ConnectorCreateService<Sink> {
     @Override
     public void stop() {
         executor.shutdown();
+        gtidSingleExecutor.shutdown();
     }
 
     @Override
@@ -153,6 +160,8 @@ public class CanalSinkConnector implements Sink, 
ConnectorCreateService<Sink> {
             canalConnectRecordList = filterRecord(canalConnectRecordList);
             if (isDdlDatas(canalConnectRecordList)) {
                 doDdl(context, canalConnectRecordList);
+            } else if (sinkConfig.isGTIDMode()) {
+                doLoadWithGtid(context, sinkConfig, connectRecord);
             } else {
                 canalConnectRecordList = 
DbLoadMerger.merge(canalConnectRecordList);
 
@@ -257,6 +266,57 @@ public class CanalSinkConnector implements Sink, 
ConnectorCreateService<Sink> {
         batchDatas.clear();
     }
 
+    private void doLoadWithGtid(DbLoadContext context, CanalSinkConfig 
sinkConfig, ConnectRecord connectRecord) {
+        int batchIndex = connectRecord.getExtension("batchIndex", 
Integer.class);
+        int totalBatches = connectRecord.getExtension("totalBatches", 
Integer.class);
+        List<CanalConnectRecord> canalConnectRecordList = 
(List<CanalConnectRecord>) connectRecord.getData();
+        String gtid = canalConnectRecordList.get(0).getCurrentGtid();
+        GtidBatchManager.addBatch(gtid, batchIndex, totalBatches, 
canalConnectRecordList);
+        // check whether the batch is complete
+        if (GtidBatchManager.isComplete(gtid)) {
+            GtidBatch batch = GtidBatchManager.getGtidBatch(gtid);
+            List<List<CanalConnectRecord>> totalRows = batch.getBatches();
+            List<CanalConnectRecord> filteredRows = new ArrayList<>();
+            for (List<CanalConnectRecord> canalConnectRecords : totalRows) {
+                canalConnectRecords = filterRecord(canalConnectRecords);
+                if (!CollectionUtils.isEmpty(canalConnectRecords)) {
+                    for (final CanalConnectRecord record : 
canalConnectRecords) {
+                        boolean filter = interceptor.before(sinkConfig, 
record);
+                        filteredRows.add(record);
+                    }
+                }
+            }
+            context.setGtid(gtid);
+            Future<Exception> result = gtidSingleExecutor.submit(new 
DbLoadWorker(context, filteredRows, dbDialect, false, sinkConfig));
+            Exception ex = null;
+            try {
+                ex = result.get();
+            } catch (Exception e) {
+                ex = e;
+            }
+            Boolean skipException = sinkConfig.getSkipException();
+            if (skipException != null && skipException) {
+                if (ex != null) {
+                    // do skip
+                    log.warn("skip exception for data : {} , caused by {}",
+                        filteredRows,
+                        ExceptionUtils.getFullStackTrace(ex));
+                    GtidBatchManager.removeGtidBatch(gtid);
+                }
+            } else {
+                if (ex != null) {
+                    log.error("sink connector will shutdown by " + 
ex.getMessage(), ExceptionUtils.getFullStackTrace(ex));
+                    gtidSingleExecutor.shutdown();
+                    System.exit(1);
+                } else {
+                    GtidBatchManager.removeGtidBatch(gtid);
+                }
+            }
+        } else {
+            log.info("Batch received, waiting for other batches.");
+        }
+    }
+
     private List<List<CanalConnectRecord>> split(List<CanalConnectRecord> 
records) {
         List<List<CanalConnectRecord>> result = new ArrayList<>();
         if (records == null || records.isEmpty()) {
@@ -296,12 +356,12 @@ public class CanalSinkConnector implements Sink, 
ConnectorCreateService<Sink> {
     }
 
     private void doTwoPhase(DbLoadContext context, CanalSinkConfig sinkConfig, 
List<List<CanalConnectRecord>> totalRows, boolean canBatch) {
-        List<Future<Exception>> results = new ArrayList<Future<Exception>>();
+        List<Future<Exception>> results = new ArrayList<>();
         for (List<CanalConnectRecord> rows : totalRows) {
             if (CollectionUtils.isEmpty(rows)) {
                 continue;
             }
-            results.add(executor.submit(new DbLoadWorker(context, rows, 
dbDialect, canBatch)));
+            results.add(executor.submit(new DbLoadWorker(context, rows, 
dbDialect, canBatch, sinkConfig)));
         }
 
         boolean partFailed = false;
@@ -330,7 +390,7 @@ public class CanalSinkConnector implements Sink, 
ConnectorCreateService<Sink> {
             Boolean skipException = sinkConfig.getSkipException();
             if (skipException != null && skipException) {
                 for (CanalConnectRecord retryRecord : retryRecords) {
-                    DbLoadWorker worker = new DbLoadWorker(context, 
Arrays.asList(retryRecord), dbDialect, false);
+                    DbLoadWorker worker = new DbLoadWorker(context, 
Arrays.asList(retryRecord), dbDialect, false, sinkConfig);
                     try {
                         Exception ex = worker.call();
                         if (ex != null) {
@@ -347,7 +407,7 @@ public class CanalSinkConnector implements Sink, 
ConnectorCreateService<Sink> {
                     }
                 }
             } else {
-                DbLoadWorker worker = new DbLoadWorker(context, retryRecords, 
dbDialect, false);
+                DbLoadWorker worker = new DbLoadWorker(context, retryRecords, 
dbDialect, false, sinkConfig);
                 try {
                     Exception ex = worker.call();
                     if (ex != null) {
@@ -355,7 +415,9 @@ public class CanalSinkConnector implements Sink, 
ConnectorCreateService<Sink> {
                     }
                 } catch (Exception ex) {
                     log.error("##load phase two failed!", ex);
-                    throw new RuntimeException(ex);
+                    log.error("sink connector will shutdown by " + 
ex.getMessage(), ex);
+                    executor.shutdown();
+                    System.exit(1);
                 }
             }
         }
@@ -371,16 +433,21 @@ public class CanalSinkConnector implements Sink, 
ConnectorCreateService<Sink> {
         private final DbDialect dbDialect;
         private final List<CanalConnectRecord> records;
         private final boolean canBatch;
+
+        private final CanalSinkConfig sinkConfig;
+
         private final List<CanalConnectRecord> allFailedRecords = new 
ArrayList<>();
         private final List<CanalConnectRecord> allProcessedRecords = new 
ArrayList<>();
         private final List<CanalConnectRecord> processedRecords = new 
ArrayList<>();
         private final List<CanalConnectRecord> failedRecords = new 
ArrayList<>();
 
-        public DbLoadWorker(DbLoadContext context, List<CanalConnectRecord> 
records, DbDialect dbDialect, boolean canBatch) {
+        public DbLoadWorker(DbLoadContext context, List<CanalConnectRecord> 
records, DbDialect dbDialect, boolean canBatch,
+            CanalSinkConfig sinkConfig) {
             this.context = context;
             this.records = records;
             this.canBatch = canBatch;
             this.dbDialect = dbDialect;
+            this.sinkConfig = sinkConfig;
         }
 
         public Exception call() throws Exception {
@@ -394,132 +461,239 @@ public class CanalSinkConnector implements Sink, 
ConnectorCreateService<Sink> {
         private Exception doCall() {
             RuntimeException error = null;
             ExecuteResult exeResult = null;
-            int index = 0;
-            while (index < records.size()) {
-                final List<CanalConnectRecord> splitDatas = new ArrayList<>();
-                if (useBatch && canBatch) {
-                    int end = Math.min(index + batchSize, records.size());
-                    splitDatas.addAll(records.subList(index, end));
-                    index = end;
-                } else {
-                    splitDatas.add(records.get(index));
-                    index = index + 1;
-                }
 
+            if (sinkConfig.isGTIDMode()) {
                 int retryCount = 0;
-                while (true) {
-                    try {
-                        if (!CollectionUtils.isEmpty(failedRecords)) {
-                            splitDatas.clear();
-                            splitDatas.addAll(failedRecords);
-                        } else {
-                            failedRecords.addAll(splitDatas);
+                final List<CanalConnectRecord> toExecuteRecords = new 
ArrayList<>();
+                try {
+                    if (!CollectionUtils.isEmpty(failedRecords)) {
+                        // if failedRecords not empty, make it retry
+                        toExecuteRecords.addAll(failedRecords);
+                    } else {
+                        toExecuteRecords.addAll(records);
+                        // add to failed record first, maybe get lob or 
datasource error
+                        failedRecords.addAll(toExecuteRecords);
+                    }
+                    JdbcTemplate template = dbDialect.getJdbcTemplate();
+                    String sourceGtid = context.getGtid();
+                    if (StringUtils.isNotEmpty(sourceGtid)) {
+                        String setGtid = "SET @@session.gtid_next = '" + 
sourceGtid + "';";
+                        template.execute(setGtid);
+                    } else {
+                        log.error("gtid is empty in gtid mode");
+                        throw new RuntimeException("gtid is empty in gtid 
mode");
+                    }
+
+                    final LobCreator lobCreator = 
dbDialect.getLobHandler().getLobCreator();
+                    int affect = (Integer) 
dbDialect.getTransactionTemplate().execute((TransactionCallback) status -> {
+                        try {
+                            failedRecords.clear();
+                            processedRecords.clear();
+                            int affect1 = 0;
+                            for (CanalConnectRecord record : toExecuteRecords) 
{
+                                int affects = template.update(record.getSql(), 
new PreparedStatementSetter() {
+                                    public void setValues(PreparedStatement 
ps) throws SQLException {
+                                        doPreparedStatement(ps, dbDialect, 
lobCreator, record);
+                                    }
+                                });
+                                affect1 = affect1 + affects;
+                                processStat(record, affects, false);
+                            }
+                            return affect1;
+                        } catch (Exception e) {
+                            // rollback
+                            status.setRollbackOnly();
+                            throw new RuntimeException("Failed to executed", 
e);
+                        } finally {
+                            lobCreator.close();
                         }
+                    });
+
+                    // reset gtid
+                    String resetGtid = "SET @@session.gtid_next = AUTOMATIC;";
+                    dbDialect.getJdbcTemplate().execute(resetGtid);
+                    error = null;
+                    exeResult = ExecuteResult.SUCCESS;
+                } catch (DeadlockLoserDataAccessException ex) {
+                    error = new 
RuntimeException(ExceptionUtils.getFullStackTrace(ex));
+                    exeResult = ExecuteResult.RETRY;
+                } catch (Throwable ex) {
+                    error = new 
RuntimeException(ExceptionUtils.getFullStackTrace(ex));
+                    exeResult = ExecuteResult.ERROR;
+                }
 
-                        final LobCreator lobCreator = 
dbDialect.getLobHandler().getLobCreator();
-                        if (useBatch && canBatch) {
-                            final String sql = splitDatas.get(0).getSql();
-                            int[] affects = new int[splitDatas.size()];
-                            affects = (int[]) 
dbDialect.getTransactionTemplate().execute((TransactionCallback) status -> {
-                                try {
-                                    failedRecords.clear();
-                                    processedRecords.clear();
-                                    JdbcTemplate template = 
dbDialect.getJdbcTemplate();
-                                    int[] affects1 = template.batchUpdate(sql, 
new BatchPreparedStatementSetter() {
-
-                                        public void 
setValues(PreparedStatement ps, int idx) throws SQLException {
-                                            doPreparedStatement(ps, dbDialect, 
lobCreator, splitDatas.get(idx));
-                                        }
-
-                                        public int getBatchSize() {
-                                            return splitDatas.size();
-                                        }
-                                    });
-                                    return affects1;
-                                } finally {
-                                    lobCreator.close();
-                                }
-                            });
+                if (ExecuteResult.SUCCESS == exeResult) {
+                    allFailedRecords.addAll(failedRecords);
+                    allProcessedRecords.addAll(processedRecords);
+                    failedRecords.clear();
+                    processedRecords.clear();
+                } else if (ExecuteResult.RETRY == exeResult) {
+                    retryCount = retryCount + 1;
+                    processedRecords.clear();
+                    failedRecords.clear();
+                    failedRecords.addAll(toExecuteRecords);
+                    int retry = 3;
+                    if (retryCount >= retry) {
+                        processFailedDatas(toExecuteRecords.size());
+                        throw new RuntimeException(String.format("execute 
retry %s times failed", retryCount), error);
+                    } else {
+                        try {
+                            int retryWait = 3000;
+                            int wait = retryCount * retryWait;
+                            wait = Math.max(wait, retryWait);
+                            Thread.sleep(wait);
+                        } catch (InterruptedException ex) {
+                            Thread.interrupted();
+                            processFailedDatas(toExecuteRecords.size());
+                            throw new RuntimeException(ex);
+                        }
+                    }
+                } else {
+                    processedRecords.clear();
+                    failedRecords.clear();
+                    failedRecords.addAll(toExecuteRecords);
+                    processFailedDatas(toExecuteRecords.size());
+                    throw error;
+                }
+            } else {
+                int index = 0;
+                while (index < records.size()) {
+                    final List<CanalConnectRecord> toExecuteRecords = new 
ArrayList<>();
+                    if (useBatch && canBatch) {
+                        int end = Math.min(index + batchSize, records.size());
+                        toExecuteRecords.addAll(records.subList(index, end));
+                        index = end;
+                    } else {
+                        toExecuteRecords.add(records.get(index));
+                        index = index + 1;
+                    }
 
-                            for (int i = 0; i < splitDatas.size(); i++) {
-                                assert affects != null;
-                                processStat(splitDatas.get(i), affects[i], 
true);
+                    int retryCount = 0;
+                    while (true) {
+                        try {
+                            if (!CollectionUtils.isEmpty(failedRecords)) {
+                                toExecuteRecords.clear();
+                                toExecuteRecords.addAll(failedRecords);
+                            } else {
+                                failedRecords.addAll(toExecuteRecords);
                             }
-                        } else {
-                            final CanalConnectRecord record = 
splitDatas.get(0);
-                            int affect = 0;
-                            affect = (Integer) 
dbDialect.getTransactionTemplate().execute((TransactionCallback) status -> {
-                                try {
-                                    failedRecords.clear();
-                                    processedRecords.clear();
-                                    JdbcTemplate template = 
dbDialect.getJdbcTemplate();
-                                    int affect1 = 
template.update(record.getSql(), new PreparedStatementSetter() {
-
-                                        public void 
setValues(PreparedStatement ps) throws SQLException {
-                                            doPreparedStatement(ps, dbDialect, 
lobCreator, record);
-                                        }
-                                    });
-                                    return affect1;
-                                } finally {
-                                    lobCreator.close();
+
+                            final LobCreator lobCreator = 
dbDialect.getLobHandler().getLobCreator();
+                            if (useBatch && canBatch) {
+                                JdbcTemplate template = 
dbDialect.getJdbcTemplate();
+                                final String sql = 
toExecuteRecords.get(0).getSql();
+
+                                int[] affects = new 
int[toExecuteRecords.size()];
+
+                                affects = (int[]) 
dbDialect.getTransactionTemplate().execute((TransactionCallback) status -> {
+                                    try {
+                                        failedRecords.clear();
+                                        processedRecords.clear();
+                                        int[] affects1 = 
template.batchUpdate(sql, new BatchPreparedStatementSetter() {
+
+                                            public void 
setValues(PreparedStatement ps, int idx) throws SQLException {
+                                                doPreparedStatement(ps, 
dbDialect, lobCreator, toExecuteRecords.get(idx));
+                                            }
+
+                                            public int getBatchSize() {
+                                                return toExecuteRecords.size();
+                                            }
+                                        });
+                                        return affects1;
+                                    } catch (Exception e) {
+                                        // rollback
+                                        status.setRollbackOnly();
+                                        throw new RuntimeException("Failed to 
execute batch with GTID", e);
+                                    } finally {
+                                        lobCreator.close();
+                                    }
+                                });
+
+                                for (int i = 0; i < toExecuteRecords.size(); 
i++) {
+                                    assert affects != null;
+                                    processStat(toExecuteRecords.get(i), 
affects[i], true);
                                 }
-                            });
-                            processStat(record, affect, false);
-                        }
+                            } else {
+                                final CanalConnectRecord record = 
toExecuteRecords.get(0);
+                                JdbcTemplate template = 
dbDialect.getJdbcTemplate();
+                                int affect = 0;
+                                affect = (Integer) 
dbDialect.getTransactionTemplate().execute((TransactionCallback) status -> {
+                                    try {
+                                        failedRecords.clear();
+                                        processedRecords.clear();
+                                        int affect1 = 
template.update(record.getSql(), new PreparedStatementSetter() {
+
+                                            public void 
setValues(PreparedStatement ps) throws SQLException {
+                                                doPreparedStatement(ps, 
dbDialect, lobCreator, record);
+                                            }
+                                        });
+                                        return affect1;
+                                    } catch (Exception e) {
+                                        // rollback
+                                        status.setRollbackOnly();
+                                        throw new RuntimeException("Failed to 
executed", e);
+                                    } finally {
+                                        lobCreator.close();
+                                    }
+                                });
+                                processStat(record, affect, false);
+                            }
 
-                        error = null;
-                        exeResult = ExecuteResult.SUCCESS;
-                    } catch (DeadlockLoserDataAccessException ex) {
-                        error = new 
RuntimeException(ExceptionUtils.getFullStackTrace(ex));
-                        exeResult = ExecuteResult.RETRY;
-                    } catch (Throwable ex) {
-                        error = new 
RuntimeException(ExceptionUtils.getFullStackTrace(ex));
-                        exeResult = ExecuteResult.ERROR;
-                    }
+                            error = null;
+                            exeResult = ExecuteResult.SUCCESS;
+                        } catch (DeadlockLoserDataAccessException ex) {
+                            error = new 
RuntimeException(ExceptionUtils.getFullStackTrace(ex));
+                            exeResult = ExecuteResult.RETRY;
+                        } catch (Throwable ex) {
+                            error = new 
RuntimeException(ExceptionUtils.getFullStackTrace(ex));
+                            exeResult = ExecuteResult.ERROR;
+                        }
 
-                    if (ExecuteResult.SUCCESS == exeResult) {
-                        allFailedRecords.addAll(failedRecords);
-                        allProcessedRecords.addAll(processedRecords);
-                        failedRecords.clear();
-                        processedRecords.clear();
-                        break; // do next eventData
-                    } else if (ExecuteResult.RETRY == exeResult) {
-                        retryCount = retryCount + 1;
-                        processedRecords.clear();
-                        failedRecords.clear();
-                        failedRecords.addAll(splitDatas);
-                        int retry = 3;
-                        if (retryCount >= retry) {
-                            processFailedDatas(index);
-                            throw new RuntimeException(String.format("execute 
retry %s times failed", retryCount), error);
-                        } else {
-                            try {
-                                int retryWait = 3000;
-                                int wait = retryCount * retryWait;
-                                wait = Math.max(wait, retryWait);
-                                Thread.sleep(wait);
-                            } catch (InterruptedException ex) {
-                                Thread.interrupted();
+                        if (ExecuteResult.SUCCESS == exeResult) {
+                            allFailedRecords.addAll(failedRecords);
+                            allProcessedRecords.addAll(processedRecords);
+                            failedRecords.clear();
+                            processedRecords.clear();
+                            break; // do next eventData
+                        } else if (ExecuteResult.RETRY == exeResult) {
+                            retryCount = retryCount + 1;
+                            processedRecords.clear();
+                            failedRecords.clear();
+                            failedRecords.addAll(toExecuteRecords);
+                            int retry = 3;
+                            if (retryCount >= retry) {
                                 processFailedDatas(index);
-                                throw new RuntimeException(ex);
+                                throw new 
RuntimeException(String.format("execute retry %s times failed", retryCount), 
error);
+                            } else {
+                                try {
+                                    int retryWait = 3000;
+                                    int wait = retryCount * retryWait;
+                                    wait = Math.max(wait, retryWait);
+                                    Thread.sleep(wait);
+                                } catch (InterruptedException ex) {
+                                    Thread.interrupted();
+                                    processFailedDatas(index);
+                                    throw new RuntimeException(ex);
+                                }
                             }
+                        } else {
+                            processedRecords.clear();
+                            failedRecords.clear();
+                            failedRecords.addAll(toExecuteRecords);
+                            processFailedDatas(index);
+                            throw error;
                         }
-                    } else {
-                        processedRecords.clear();
-                        failedRecords.clear();
-                        failedRecords.addAll(splitDatas);
-                        processFailedDatas(index);
-                        throw error;
                     }
                 }
             }
+
             context.getFailedRecords().addAll(allFailedRecords);
             context.getProcessedRecords().addAll(allProcessedRecords);
             return null;
         }
 
         private void doPreparedStatement(PreparedStatement ps, DbDialect 
dbDialect, LobCreator lobCreator,
-                                         CanalConnectRecord record) throws 
SQLException {
+            CanalConnectRecord record) throws SQLException {
             EventType type = record.getEventType();
             List<EventColumn> columns = new ArrayList<EventColumn>();
             if (type.isInsert()) {
@@ -530,11 +704,7 @@ public class CanalSinkConnector implements Sink, 
ConnectorCreateService<Sink> {
             } else if (type.isUpdate()) {
                 boolean existOldKeys = 
!CollectionUtils.isEmpty(record.getOldKeys());
                 columns.addAll(record.getUpdatedColumns());
-                if (existOldKeys && dbDialect.isDRDS()) {
-                    columns.addAll(record.getUpdatedKeys());
-                } else {
-                    columns.addAll(record.getKeys());
-                }
+                columns.addAll(record.getKeys());
                 if (existOldKeys) {
                     columns.addAll(record.getOldKeys());
                 }
diff --git 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java
 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java
index 8ef60ff04..708d5d120 100644
--- 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java
+++ 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java
@@ -60,9 +60,17 @@ public class EntryParser {
                 switch (entry.getEntryType()) {
                     case ROWDATA:
                         RowChange rowChange = 
RowChange.parseFrom(entry.getStoreValue());
-                        needSync = checkNeedSync(sourceConfig, 
rowChange.getRowDatas(0));
-                        if (needSync) {
-                            transactionDataBuffer.add(entry);
+                        if (sourceConfig.getServerUUID() != null && 
sourceConfig.isGTIDMode()) {
+                            String currentGtid = 
entry.getHeader().getPropsList().get(0).getValue();
+                            if 
(currentGtid.contains(sourceConfig.getServerUUID())) {
+                                transactionDataBuffer.add(entry);
+                            }
+                        } else {
+                            // if not gtid mode, need check weather the entry 
is loopback by specified column value
+                            needSync = checkNeedSync(sourceConfig, 
rowChange.getRowDatas(0));
+                            if (needSync) {
+                                transactionDataBuffer.add(entry);
+                            }
                         }
                         break;
                     case TRANSACTIONEND:
@@ -169,6 +177,14 @@ public class EntryParser {
         canalConnectRecord.setExecuteTime(entry.getHeader().getExecuteTime());
         canalConnectRecord.setJournalName(entry.getHeader().getLogfileName());
         
canalConnectRecord.setBinLogOffset(entry.getHeader().getLogfileOffset());
+        // if enabled gtid mode, gtid not null
+        if (canalSourceConfig.isGTIDMode()) {
+            String currentGtid = 
entry.getHeader().getPropsList().get(0).getValue();
+            String gtidRange = replaceGtidRange(entry.getHeader().getGtid(), 
currentGtid, canalSourceConfig.getServerUUID());
+            canalConnectRecord.setGtid(gtidRange);
+            canalConnectRecord.setCurrentGtid(currentGtid);
+        }
+
         EventType eventType = canalConnectRecord.getEventType();
 
         List<Column> beforeColumns = rowData.getBeforeColumnsList();
@@ -248,6 +264,17 @@ public class EntryParser {
         return canalConnectRecord;
     }
 
+    public static String replaceGtidRange(String gtid, String currentGtid, 
String serverUUID) {
+        String[] gtidRangeArray = gtid.split(",");
+        for (int i = 0; i < gtidRangeArray.length; i++) {
+            String gtidRange = gtidRangeArray[i];
+            if (gtidRange.startsWith(serverUUID)) {
+                gtidRangeArray[i] = gtidRange.replaceFirst("\\d+$", 
currentGtid.split(":")[1]);
+            }
+        }
+        return String.join(",", gtidRangeArray);
+    }
+
     private static void checkUpdateKeyColumns(Map<String, EventColumn> 
oldKeyColumns,
                                               Map<String, EventColumn> 
keyColumns) {
         if (oldKeyColumns.isEmpty()) {
diff --git 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java
 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java
index 4b9617731..6cd575cb7 100644
--- 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java
@@ -150,6 +150,8 @@ public class CanalSourceConnector implements Source, 
ConnectorCreateService<Sour
                 return instance;
             }
         });
+        DatabaseConnection.sourceConfig = 
sourceConfig.getSourceConnectorConfig();
+        DatabaseConnection.initSourceConnection();
         tableMgr = new RdbTableMgr(sourceConfig.getSourceConnectorConfig(), 
DatabaseConnection.sourceDataSource);
     }
 
@@ -180,6 +182,9 @@ public class CanalSourceConnector implements Source, 
ConnectorCreateService<Sour
         
parameter.setDbUsername(sourceConfig.getSourceConnectorConfig().getUserName());
         
parameter.setDbPassword(sourceConfig.getSourceConnectorConfig().getPassWord());
 
+        // set if enabled gtid mode
+        parameter.setGtidEnable(sourceConfig.isGTIDMode());
+
         // check positions
         // example: 
Arrays.asList("{\"journalName\":\"mysql-bin.000001\",\"position\":6163L,\"timestamp\":1322803601000L}",
         //         
"{\"journalName\":\"mysql-bin.000001\",\"position\":6163L,\"timestamp\":1322803601000L}")
@@ -193,6 +198,14 @@ public class CanalSourceConnector implements Source, 
ConnectorCreateService<Sour
                 recordPositionMap.put("journalName", 
canalRecordPartition.getJournalName());
                 recordPositionMap.put("timestamp", 
canalRecordPartition.getTimeStamp());
                 recordPositionMap.put("position", 
canalRecordOffset.getOffset());
+                String gtidRange = canalRecordOffset.getGtid();
+                if (gtidRange != null) {
+                    if (canalRecordOffset.getCurrentGtid() != null) {
+                        gtidRange = 
EntryParser.replaceGtidRange(canalRecordOffset.getGtid(), 
canalRecordOffset.getCurrentGtid(),
+                            sourceConfig.getServerUUID());
+                    }
+                    recordPositionMap.put("gtid", gtidRange);
+                }
                 positions.add(JsonUtils.toJSONString(recordPositionMap));
             });
             parameter.setPositions(positions);
@@ -237,7 +250,13 @@ public class CanalSourceConnector implements Source, 
ConnectorCreateService<Sour
     @Override
     public void commit(ConnectRecord record) {
         long batchId = Long.parseLong(record.getExtension("messageId"));
-        canalServer.ack(clientIdentity, batchId);
+        int batchIndex = record.getExtension("batchIndex", Integer.class);
+        int totalBatches = record.getExtension("totalBatches", Integer.class);
+        if (batchIndex == totalBatches - 1) {
+            log.debug("ack records batchIndex:{}, totalBatches:{}, batchId:{}",
+                batchIndex, totalBatches, batchId);
+            canalServer.ack(clientIdentity, batchId);
+        }
     }
 
     @Override
@@ -301,21 +320,37 @@ public class CanalSourceConnector implements Source, 
ConnectorCreateService<Sour
         if (!connectorRecordMap.isEmpty()) {
             Set<Map.Entry<Long, List<CanalConnectRecord>>> entrySet = 
connectorRecordMap.entrySet();
             for (Map.Entry<Long, List<CanalConnectRecord>> entry : entrySet) {
-                // Xid offset
-                Long binLogOffset = entry.getKey();
                 List<CanalConnectRecord> connectRecordList = entry.getValue();
                 CanalConnectRecord lastRecord = 
entry.getValue().get(connectRecordList.size() - 1);
                 CanalRecordPartition canalRecordPartition = new 
CanalRecordPartition();
+                
canalRecordPartition.setServerUUID(sourceConfig.getServerUUID());
                 
canalRecordPartition.setJournalName(lastRecord.getJournalName());
                 canalRecordPartition.setTimeStamp(lastRecord.getExecuteTime());
-
+                // Xid offset with gtid
+                Long binLogOffset = entry.getKey();
                 CanalRecordOffset canalRecordOffset = new CanalRecordOffset();
                 canalRecordOffset.setOffset(binLogOffset);
+                if (StringUtils.isNotEmpty(lastRecord.getGtid()) && 
StringUtils.isNotEmpty(lastRecord.getCurrentGtid())) {
+                    canalRecordOffset.setGtid(lastRecord.getGtid());
+                    
canalRecordOffset.setCurrentGtid(lastRecord.getCurrentGtid());
+                }
 
-                ConnectRecord connectRecord = new 
ConnectRecord(canalRecordPartition, canalRecordOffset, 
System.currentTimeMillis());
-                connectRecord.addExtension("messageId", 
String.valueOf(message.getId()));
-                connectRecord.setData(connectRecordList);
-                result.add(connectRecord);
+                // split record list
+                List<List<CanalConnectRecord>> splitLists = new ArrayList<>();
+                for (int i = 0; i < connectRecordList.size(); i += 
sourceConfig.getBatchSize()) {
+                    int end = Math.min(i + sourceConfig.getBatchSize(), 
connectRecordList.size());
+                    List<CanalConnectRecord> subList = 
connectRecordList.subList(i, end);
+                    splitLists.add(subList);
+                }
+
+                for (int i = 0; i < splitLists.size(); i++) {
+                    ConnectRecord connectRecord = new 
ConnectRecord(canalRecordPartition, canalRecordOffset, 
System.currentTimeMillis());
+                    connectRecord.addExtension("messageId", 
String.valueOf(message.getId()));
+                    connectRecord.addExtension("batchIndex", i);
+                    connectRecord.addExtension("totalBatches", 
splitLists.size());
+                    connectRecord.setData(splitLists.get(i));
+                    result.add(connectRecord);
+                }
             }
         } else {
             // for the message has been filtered need ack message


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to