This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new 294e4c223 [ISSUE #5044] Data synchronization strong verification in
mariadb gtid mode (#5045)
294e4c223 is described below
commit 294e4c2230a91dde0846f8a67752c0bf7f51cc7e
Author: mike_xwm <[email protected]>
AuthorDate: Fri Jul 26 16:49:57 2024 +0800
[ISSUE #5044] Data synchronization strong verification in mariadb gtid mode
(#5045)
* [ISSUE #5040] Support gtid mode for sync data with mysql
* fix conflicts with master
* fix checkstyle error
* [ISSUE #5044] Data synchronization strong verification in mariadb gtid
mode
* fix checkstyle error
---
.../connector/rdb/canal/CanalSinkConfig.java | 2 ++
.../connector/rdb/canal/CanalSourceConfig.java | 2 ++
.../canal/sink/connector/CanalSinkConnector.java | 17 +++++++---
.../connector/canal/source/EntryParser.java | 37 ++++++++++++++--------
.../source/connector/CanalSourceConnector.java | 15 +++++----
5 files changed, 49 insertions(+), 24 deletions(-)
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java
index 80aec7bfe..026f33f4f 100644
---
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java
@@ -41,6 +41,8 @@ public class CanalSinkConfig extends SinkConfig {
private boolean isGTIDMode = true;
+ private boolean isMariaDB = true;
+
// skip sink process exception
private Boolean skipException = false;
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java
index 707f10290..8331d32cb 100644
---
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java
@@ -47,6 +47,8 @@ public class CanalSourceConfig extends SourceConfig {
private String serverUUID;
+ private boolean isMariaDB = true;
+
private boolean isGTIDMode = true;
private Integer batchSize = 10000;
diff --git
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java
index 5f3c0a2bc..8ecda8e12 100644
---
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java
@@ -476,9 +476,11 @@ public class CanalSinkConnector implements Sink,
ConnectorCreateService<Sink> {
}
JdbcTemplate template = dbDialect.getJdbcTemplate();
String sourceGtid = context.getGtid();
- if (StringUtils.isNotEmpty(sourceGtid)) {
- String setGtid = "SET @@session.gtid_next = '" +
sourceGtid + "';";
- template.execute(setGtid);
+ if (StringUtils.isNotEmpty(sourceGtid) &&
!sinkConfig.isMariaDB()) {
+ String setMySQLGtid = "SET @@session.gtid_next = '" +
sourceGtid + "';";
+ template.execute(setMySQLGtid);
+ } else if (StringUtils.isNotEmpty(sourceGtid) &&
sinkConfig.isMariaDB()) {
+ throw new RuntimeException("unsupport gtid mode for
mariaDB");
} else {
log.error("gtid is empty in gtid mode");
throw new RuntimeException("gtid is empty in gtid
mode");
@@ -510,8 +512,13 @@ public class CanalSinkConnector implements Sink,
ConnectorCreateService<Sink> {
});
// reset gtid
- String resetGtid = "SET @@session.gtid_next = AUTOMATIC;";
- dbDialect.getJdbcTemplate().execute(resetGtid);
+ if (sinkConfig.isMariaDB()) {
+ throw new RuntimeException("unsupport gtid mode for
mariaDB");
+ } else {
+ String resetMySQLGtid = "SET @@session.gtid_next =
'AUTOMATIC';";
+ dbDialect.getJdbcTemplate().execute(resetMySQLGtid);
+ }
+
error = null;
exeResult = ExecuteResult.SUCCESS;
} catch (DeadlockLoserDataAccessException ex) {
diff --git
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java
index 708d5d120..5c4303588 100644
---
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java
+++
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java
@@ -49,7 +49,7 @@ import lombok.extern.slf4j.Slf4j;
public class EntryParser {
public static Map<Long, List<CanalConnectRecord>> parse(CanalSourceConfig
sourceConfig, List<Entry> datas,
- RdbTableMgr
tables) {
+ RdbTableMgr tables) {
List<CanalConnectRecord> recordList = new ArrayList<>();
List<Entry> transactionDataBuffer = new ArrayList<>();
// need check weather the entry is loopback
@@ -60,9 +60,9 @@ public class EntryParser {
switch (entry.getEntryType()) {
case ROWDATA:
RowChange rowChange =
RowChange.parseFrom(entry.getStoreValue());
- if (sourceConfig.getServerUUID() != null &&
sourceConfig.isGTIDMode()) {
- String currentGtid =
entry.getHeader().getPropsList().get(0).getValue();
- if
(currentGtid.contains(sourceConfig.getServerUUID())) {
+ // don't support gtid for mariadb
+ if (sourceConfig.getServerUUID() != null &&
sourceConfig.isGTIDMode() && !sourceConfig.isMariaDB()) {
+ if (checkGtidForEntry(entry, sourceConfig)) {
transactionDataBuffer.add(entry);
}
} else {
@@ -90,9 +90,14 @@ public class EntryParser {
return recordMap;
}
+ private static boolean checkGtidForEntry(Entry entry, CanalSourceConfig
sourceConfig) {
+ String currentGtid =
entry.getHeader().getPropsList().get(0).getValue();
+ return currentGtid.contains(sourceConfig.getServerUUID());
+ }
+
private static void parseRecordListWithEntryBuffer(CanalSourceConfig
sourceConfig,
-
List<CanalConnectRecord> recordList,
- List<Entry>
transactionDataBuffer, RdbTableMgr tables) {
+ List<CanalConnectRecord> recordList,
+ List<Entry> transactionDataBuffer, RdbTableMgr tables) {
for (Entry bufferEntry : transactionDataBuffer) {
List<CanalConnectRecord> recordParsedList =
internParse(sourceConfig, bufferEntry, tables);
if (CollectionUtils.isEmpty(recordParsedList)) {
@@ -130,7 +135,7 @@ public class EntryParser {
}
private static List<CanalConnectRecord> internParse(CanalSourceConfig
sourceConfig, Entry entry,
- RdbTableMgr tableMgr) {
+ RdbTableMgr tableMgr) {
String schemaName = entry.getHeader().getSchemaName();
String tableName = entry.getHeader().getTableName();
if (tableMgr.getTable(schemaName, tableName) == null) {
@@ -169,7 +174,7 @@ public class EntryParser {
}
private static CanalConnectRecord internParse(CanalSourceConfig
canalSourceConfig, Entry entry,
- RowChange rowChange, RowData
rowData) {
+ RowChange rowChange, RowData rowData) {
CanalConnectRecord canalConnectRecord = new CanalConnectRecord();
canalConnectRecord.setTableName(entry.getHeader().getTableName());
canalConnectRecord.setSchemaName(entry.getHeader().getSchemaName());
@@ -179,10 +184,16 @@ public class EntryParser {
canalConnectRecord.setBinLogOffset(entry.getHeader().getLogfileOffset());
// if enabled gtid mode, gtid not null
if (canalSourceConfig.isGTIDMode()) {
- String currentGtid =
entry.getHeader().getPropsList().get(0).getValue();
- String gtidRange = replaceGtidRange(entry.getHeader().getGtid(),
currentGtid, canalSourceConfig.getServerUUID());
- canalConnectRecord.setGtid(gtidRange);
- canalConnectRecord.setCurrentGtid(currentGtid);
+ if (canalSourceConfig.isMariaDB()) {
+ String currentGtid = entry.getHeader().getGtid();
+ canalConnectRecord.setGtid(currentGtid);
+ canalConnectRecord.setCurrentGtid(currentGtid);
+ } else {
+ String currentGtid =
entry.getHeader().getPropsList().get(0).getValue();
+ String gtidRange =
replaceGtidRange(entry.getHeader().getGtid(), currentGtid,
canalSourceConfig.getServerUUID());
+ canalConnectRecord.setGtid(gtidRange);
+ canalConnectRecord.setCurrentGtid(currentGtid);
+ }
}
EventType eventType = canalConnectRecord.getEventType();
@@ -276,7 +287,7 @@ public class EntryParser {
}
private static void checkUpdateKeyColumns(Map<String, EventColumn>
oldKeyColumns,
- Map<String, EventColumn>
keyColumns) {
+ Map<String, EventColumn> keyColumns) {
if (oldKeyColumns.isEmpty()) {
return;
}
diff --git
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java
index 6cd575cb7..f3f8b2e16 100644
---
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java
@@ -198,13 +198,16 @@ public class CanalSourceConnector implements Source,
ConnectorCreateService<Sour
recordPositionMap.put("journalName",
canalRecordPartition.getJournalName());
recordPositionMap.put("timestamp",
canalRecordPartition.getTimeStamp());
recordPositionMap.put("position",
canalRecordOffset.getOffset());
- String gtidRange = canalRecordOffset.getGtid();
- if (gtidRange != null) {
- if (canalRecordOffset.getCurrentGtid() != null) {
- gtidRange =
EntryParser.replaceGtidRange(canalRecordOffset.getGtid(),
canalRecordOffset.getCurrentGtid(),
- sourceConfig.getServerUUID());
+ // for mariaDB not support gtid mode
+ if (sourceConfig.isGTIDMode() && !sourceConfig.isMariaDB()) {
+ String gtidRange = canalRecordOffset.getGtid();
+ if (gtidRange != null) {
+ if (canalRecordOffset.getCurrentGtid() != null) {
+ gtidRange =
EntryParser.replaceGtidRange(canalRecordOffset.getGtid(),
canalRecordOffset.getCurrentGtid(),
+ sourceConfig.getServerUUID());
+ }
+ recordPositionMap.put("gtid", gtidRange);
}
- recordPositionMap.put("gtid", gtidRange);
}
positions.add(JsonUtils.toJSONString(recordPositionMap));
});
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]