This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new 294e4c223 [ISSUE #5044] Data synchronization strong verification in 
mariadb gtid mode (#5045)
294e4c223 is described below

commit 294e4c2230a91dde0846f8a67752c0bf7f51cc7e
Author: mike_xwm <[email protected]>
AuthorDate: Fri Jul 26 16:49:57 2024 +0800

    [ISSUE #5044] Data synchronization strong verification in mariadb gtid mode 
(#5045)
    
    * [ISSUE #5040] Support gtid mode for sync data with mysql
    
    * fix conflicts with master
    
    * fix checkstyle error
    
    * [ISSUE #5044] Data synchronization strong verification in mariadb gtid 
mode
    
    * fix checkstyle error
---
 .../connector/rdb/canal/CanalSinkConfig.java       |  2 ++
 .../connector/rdb/canal/CanalSourceConfig.java     |  2 ++
 .../canal/sink/connector/CanalSinkConnector.java   | 17 +++++++---
 .../connector/canal/source/EntryParser.java        | 37 ++++++++++++++--------
 .../source/connector/CanalSourceConnector.java     | 15 +++++----
 5 files changed, 49 insertions(+), 24 deletions(-)

diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java
index 80aec7bfe..026f33f4f 100644
--- 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java
@@ -41,6 +41,8 @@ public class CanalSinkConfig extends SinkConfig {
 
     private boolean isGTIDMode = true;
 
+    private boolean isMariaDB = true;
+
     // skip sink process exception
     private Boolean skipException = false;
 
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java
index 707f10290..8331d32cb 100644
--- 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java
@@ -47,6 +47,8 @@ public class CanalSourceConfig extends SourceConfig {
 
     private String serverUUID;
 
+    private boolean isMariaDB = true;
+
     private boolean isGTIDMode = true;
 
     private Integer batchSize = 10000;
diff --git 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java
 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java
index 5f3c0a2bc..8ecda8e12 100644
--- 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java
@@ -476,9 +476,11 @@ public class CanalSinkConnector implements Sink, 
ConnectorCreateService<Sink> {
                     }
                     JdbcTemplate template = dbDialect.getJdbcTemplate();
                     String sourceGtid = context.getGtid();
-                    if (StringUtils.isNotEmpty(sourceGtid)) {
-                        String setGtid = "SET @@session.gtid_next = '" + 
sourceGtid + "';";
-                        template.execute(setGtid);
+                    if (StringUtils.isNotEmpty(sourceGtid) && 
!sinkConfig.isMariaDB()) {
+                        String setMySQLGtid = "SET @@session.gtid_next = '" + 
sourceGtid + "';";
+                        template.execute(setMySQLGtid);
+                    } else if (StringUtils.isNotEmpty(sourceGtid) && 
sinkConfig.isMariaDB()) {
+                        throw new RuntimeException("unsupport gtid mode for 
mariaDB");
                     } else {
                         log.error("gtid is empty in gtid mode");
                         throw new RuntimeException("gtid is empty in gtid 
mode");
@@ -510,8 +512,13 @@ public class CanalSinkConnector implements Sink, 
ConnectorCreateService<Sink> {
                     });
 
                     // reset gtid
-                    String resetGtid = "SET @@session.gtid_next = AUTOMATIC;";
-                    dbDialect.getJdbcTemplate().execute(resetGtid);
+                    if (sinkConfig.isMariaDB()) {
+                        throw new RuntimeException("unsupport gtid mode for 
mariaDB");
+                    } else {
+                        String resetMySQLGtid = "SET @@session.gtid_next = 
'AUTOMATIC';";
+                        dbDialect.getJdbcTemplate().execute(resetMySQLGtid);
+                    }
+
                     error = null;
                     exeResult = ExecuteResult.SUCCESS;
                 } catch (DeadlockLoserDataAccessException ex) {
diff --git 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java
 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java
index 708d5d120..5c4303588 100644
--- 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java
+++ 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java
@@ -49,7 +49,7 @@ import lombok.extern.slf4j.Slf4j;
 public class EntryParser {
 
     public static Map<Long, List<CanalConnectRecord>> parse(CanalSourceConfig 
sourceConfig, List<Entry> datas,
-                                                            RdbTableMgr 
tables) {
+        RdbTableMgr tables) {
         List<CanalConnectRecord> recordList = new ArrayList<>();
         List<Entry> transactionDataBuffer = new ArrayList<>();
         // need check weather the entry is loopback
@@ -60,9 +60,9 @@ public class EntryParser {
                 switch (entry.getEntryType()) {
                     case ROWDATA:
                         RowChange rowChange = 
RowChange.parseFrom(entry.getStoreValue());
-                        if (sourceConfig.getServerUUID() != null && 
sourceConfig.isGTIDMode()) {
-                            String currentGtid = 
entry.getHeader().getPropsList().get(0).getValue();
-                            if 
(currentGtid.contains(sourceConfig.getServerUUID())) {
+                        // don't support gtid for mariadb
+                        if (sourceConfig.getServerUUID() != null && 
sourceConfig.isGTIDMode() && !sourceConfig.isMariaDB()) {
+                            if (checkGtidForEntry(entry, sourceConfig)) {
                                 transactionDataBuffer.add(entry);
                             }
                         } else {
@@ -90,9 +90,14 @@ public class EntryParser {
         return recordMap;
     }
 
+    private static boolean checkGtidForEntry(Entry entry, CanalSourceConfig 
sourceConfig) {
+        String currentGtid = 
entry.getHeader().getPropsList().get(0).getValue();
+        return currentGtid.contains(sourceConfig.getServerUUID());
+    }
+
     private static void parseRecordListWithEntryBuffer(CanalSourceConfig 
sourceConfig,
-                                                       
List<CanalConnectRecord> recordList,
-                                                       List<Entry> 
transactionDataBuffer, RdbTableMgr tables) {
+        List<CanalConnectRecord> recordList,
+        List<Entry> transactionDataBuffer, RdbTableMgr tables) {
         for (Entry bufferEntry : transactionDataBuffer) {
             List<CanalConnectRecord> recordParsedList = 
internParse(sourceConfig, bufferEntry, tables);
             if (CollectionUtils.isEmpty(recordParsedList)) {
@@ -130,7 +135,7 @@ public class EntryParser {
     }
 
     private static List<CanalConnectRecord> internParse(CanalSourceConfig 
sourceConfig, Entry entry,
-                                                        RdbTableMgr tableMgr) {
+        RdbTableMgr tableMgr) {
         String schemaName = entry.getHeader().getSchemaName();
         String tableName = entry.getHeader().getTableName();
         if (tableMgr.getTable(schemaName, tableName) == null) {
@@ -169,7 +174,7 @@ public class EntryParser {
     }
 
     private static CanalConnectRecord internParse(CanalSourceConfig 
canalSourceConfig, Entry entry,
-                                                  RowChange rowChange, RowData 
rowData) {
+        RowChange rowChange, RowData rowData) {
         CanalConnectRecord canalConnectRecord = new CanalConnectRecord();
         canalConnectRecord.setTableName(entry.getHeader().getTableName());
         canalConnectRecord.setSchemaName(entry.getHeader().getSchemaName());
@@ -179,10 +184,16 @@ public class EntryParser {
         
canalConnectRecord.setBinLogOffset(entry.getHeader().getLogfileOffset());
         // if enabled gtid mode, gtid not null
         if (canalSourceConfig.isGTIDMode()) {
-            String currentGtid = 
entry.getHeader().getPropsList().get(0).getValue();
-            String gtidRange = replaceGtidRange(entry.getHeader().getGtid(), 
currentGtid, canalSourceConfig.getServerUUID());
-            canalConnectRecord.setGtid(gtidRange);
-            canalConnectRecord.setCurrentGtid(currentGtid);
+            if (canalSourceConfig.isMariaDB()) {
+                String currentGtid = entry.getHeader().getGtid();
+                canalConnectRecord.setGtid(currentGtid);
+                canalConnectRecord.setCurrentGtid(currentGtid);
+            } else {
+                String currentGtid = 
entry.getHeader().getPropsList().get(0).getValue();
+                String gtidRange = 
replaceGtidRange(entry.getHeader().getGtid(), currentGtid, 
canalSourceConfig.getServerUUID());
+                canalConnectRecord.setGtid(gtidRange);
+                canalConnectRecord.setCurrentGtid(currentGtid);
+            }
         }
 
         EventType eventType = canalConnectRecord.getEventType();
@@ -276,7 +287,7 @@ public class EntryParser {
     }
 
     private static void checkUpdateKeyColumns(Map<String, EventColumn> 
oldKeyColumns,
-                                              Map<String, EventColumn> 
keyColumns) {
+        Map<String, EventColumn> keyColumns) {
         if (oldKeyColumns.isEmpty()) {
             return;
         }
diff --git 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java
 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java
index 6cd575cb7..f3f8b2e16 100644
--- 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java
@@ -198,13 +198,16 @@ public class CanalSourceConnector implements Source, 
ConnectorCreateService<Sour
                 recordPositionMap.put("journalName", 
canalRecordPartition.getJournalName());
                 recordPositionMap.put("timestamp", 
canalRecordPartition.getTimeStamp());
                 recordPositionMap.put("position", 
canalRecordOffset.getOffset());
-                String gtidRange = canalRecordOffset.getGtid();
-                if (gtidRange != null) {
-                    if (canalRecordOffset.getCurrentGtid() != null) {
-                        gtidRange = 
EntryParser.replaceGtidRange(canalRecordOffset.getGtid(), 
canalRecordOffset.getCurrentGtid(),
-                            sourceConfig.getServerUUID());
+                // for mariaDB not support gtid mode
+                if (sourceConfig.isGTIDMode() && !sourceConfig.isMariaDB()) {
+                    String gtidRange = canalRecordOffset.getGtid();
+                    if (gtidRange != null) {
+                        if (canalRecordOffset.getCurrentGtid() != null) {
+                            gtidRange = 
EntryParser.replaceGtidRange(canalRecordOffset.getGtid(), 
canalRecordOffset.getCurrentGtid(),
+                                sourceConfig.getServerUUID());
+                        }
+                        recordPositionMap.put("gtid", gtidRange);
                     }
-                    recordPositionMap.put("gtid", gtidRange);
                 }
                 positions.add(JsonUtils.toJSONString(recordPositionMap));
             });


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to