This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new f88a19453 [ISSUE #5040] Support gtid mode for sync data with mysql
(#5041)
f88a19453 is described below
commit f88a19453c0b8c91f331dddbc7ef8ac13c3a9a8d
Author: mike_xwm <[email protected]>
AuthorDate: Wed Jul 24 14:34:18 2024 +0800
[ISSUE #5040] Support gtid mode for sync data with mysql (#5041)
* [ISSUE #5040] Support gtid mode for sync data with mysql
* fix conflicts with master
* fix checkstyle error
---
eventmesh-admin-server/conf/eventmesh.sql | 3 +
.../conf/mapper/EventMeshMysqlPositionMapper.xml | 28 +-
.../web/db/entity/EventMeshMysqlPosition.java | 6 +
.../position/impl/MysqlPositionHandler.java | 8 +-
.../connector/rdb/canal/CanalSinkConfig.java | 2 +
.../connector/rdb/canal/CanalSourceConfig.java | 4 +
.../remote/offset/canal/CanalRecordOffset.java | 5 +
.../remote/offset/canal/CanalRecordPartition.java | 2 +
.../connector/canal/CanalConnectRecord.java | 6 +
.../connector/canal/dialect/AbstractDbDialect.java | 4 -
.../connector/canal/dialect/DbDialect.java | 2 -
.../connector/canal/dialect/MysqlDialect.java | 4 -
.../interceptor/SqlBuilderLoadInterceptor.java | 24 +-
.../connector/canal/sink/DbLoadContext.java | 2 +
.../sink/{DbLoadContext.java => GtidBatch.java} | 37 +-
.../{DbLoadContext.java => GtidBatchManager.java} | 32 +-
.../canal/sink/connector/CanalSinkConnector.java | 404 +++++++++++++++------
.../connector/canal/source/EntryParser.java | 33 +-
.../source/connector/CanalSourceConnector.java | 51 ++-
19 files changed, 452 insertions(+), 205 deletions(-)
diff --git a/eventmesh-admin-server/conf/eventmesh.sql
b/eventmesh-admin-server/conf/eventmesh.sql
index 3b6fc9b77..226101661 100644
--- a/eventmesh-admin-server/conf/eventmesh.sql
+++ b/eventmesh-admin-server/conf/eventmesh.sql
@@ -71,8 +71,11 @@ CREATE TABLE IF NOT EXISTS `event_mesh_job_info` (
CREATE TABLE IF NOT EXISTS `event_mesh_mysql_position` (
`id` int unsigned NOT NULL AUTO_INCREMENT,
`jobID` int unsigned NOT NULL,
+ `serverUUID` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci
DEFAULT NULL,
`address` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT
NULL,
`position` bigint DEFAULT NULL,
+ `gtid` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT
NULL,
+ `currentGtid` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci
DEFAULT NULL,
`timestamp` bigint DEFAULT NULL,
`journalName` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
`createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
diff --git
a/eventmesh-admin-server/conf/mapper/EventMeshMysqlPositionMapper.xml
b/eventmesh-admin-server/conf/mapper/EventMeshMysqlPositionMapper.xml
index bc3a3292a..cbb7c094d 100644
--- a/eventmesh-admin-server/conf/mapper/EventMeshMysqlPositionMapper.xml
+++ b/eventmesh-admin-server/conf/mapper/EventMeshMysqlPositionMapper.xml
@@ -16,24 +16,28 @@
limitations under the License.
-->
<!DOCTYPE mapper
- PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
- "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
+ "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper
namespace="org.apache.eventmesh.admin.server.web.db.mapper.EventMeshMysqlPositionMapper">
<resultMap id="BaseResultMap"
type="org.apache.eventmesh.admin.server.web.db.entity.EventMeshMysqlPosition">
- <id property="id" column="id" jdbcType="INTEGER"/>
- <result property="jobID" column="jobID" jdbcType="INTEGER"/>
- <result property="address" column="address" jdbcType="VARCHAR"/>
- <result property="position" column="position" jdbcType="BIGINT"/>
- <result property="timestamp" column="timestamp" jdbcType="BIGINT"/>
- <result property="journalName" column="journalName"
jdbcType="VARCHAR"/>
- <result property="createTime" column="createTime"
jdbcType="TIMESTAMP"/>
- <result property="updateTime" column="updateTime"
jdbcType="TIMESTAMP"/>
+ <id property="id" column="id" jdbcType="INTEGER"/>
+ <result property="jobID" column="jobID" jdbcType="INTEGER"/>
+ <result property="serverUUID" column="serverUUID" jdbcType="VARCHAR"/>
+ <result property="address" column="address" jdbcType="VARCHAR"/>
+ <result property="position" column="position" jdbcType="BIGINT"/>
+ <result property="gtid" column="gtid" jdbcType="VARCHAR"/>
+ <result property="currentGtid" column="currentGtid"
jdbcType="VARCHAR"/>
+ <result property="timestamp" column="timestamp" jdbcType="BIGINT"/>
+ <result property="journalName" column="journalName"
jdbcType="VARCHAR"/>
+ <result property="createTime" column="createTime"
jdbcType="TIMESTAMP"/>
+ <result property="updateTime" column="updateTime"
jdbcType="TIMESTAMP"/>
</resultMap>
<sql id="Base_Column_List">
- id,jobID,address,
- position,timestamp,journalName,
+ id
+ ,jobID,serverUUID,address,
+ position,gtid,currentGtid,timestamp,journalName,
createTime,updateTime
</sql>
</mapper>
diff --git
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshMysqlPosition.java
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshMysqlPosition.java
index ffe3e446d..65a38b54b 100644
---
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshMysqlPosition.java
+++
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshMysqlPosition.java
@@ -38,10 +38,16 @@ public class EventMeshMysqlPosition implements Serializable
{
private Integer jobID;
+ private String serverUUID;
+
private String address;
private Long position;
+ private String gtid;
+
+ private String currentGtid;
+
private Long timestamp;
private String journalName;
diff --git
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java
index 525fe02c0..f2c174c3b 100644
---
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java
+++
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java
@@ -115,9 +115,12 @@ public class MysqlPositionHandler extends PositionHandler {
CanalRecordOffset offset = (CanalRecordOffset)
recordPosition.getRecordOffset();
if (offset != null) {
position.setPosition(offset.getOffset());
+ position.setGtid(offset.getGtid());
+ position.setCurrentGtid(offset.getCurrentGtid());
}
CanalRecordPartition partition = (CanalRecordPartition)
recordPosition.getRecordPartition();
if (partition != null) {
+ position.setServerUUID(partition.getServerUUID());
position.setTimestamp(partition.getTimeStamp());
position.setJournalName(partition.getJournalName());
}
@@ -148,13 +151,16 @@ public class MysqlPositionHandler extends PositionHandler
{
request.getJobID()));
List<RecordPosition> recordPositionList = new ArrayList<>();
for (EventMeshMysqlPosition position : positionList) {
- RecordPosition recordPosition = new RecordPosition();
CanalRecordPartition partition = new CanalRecordPartition();
partition.setTimeStamp(position.getTimestamp());
partition.setJournalName(position.getJournalName());
+ partition.setServerUUID(position.getServerUUID());
+ RecordPosition recordPosition = new RecordPosition();
recordPosition.setRecordPartition(partition);
CanalRecordOffset offset = new CanalRecordOffset();
offset.setOffset(position.getPosition());
+ offset.setGtid(position.getGtid());
+ offset.setCurrentGtid(position.getCurrentGtid());
recordPosition.setRecordOffset(offset);
recordPositionList.add(recordPosition);
}
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java
index 85484b2ce..80aec7bfe 100644
---
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java
@@ -39,6 +39,8 @@ public class CanalSinkConfig extends SinkConfig {
// sync mode: field/row
private SyncMode syncMode;
+ private boolean isGTIDMode = true;
+
// skip sink process exception
private Boolean skipException = false;
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java
index d75ceb6b5..707f10290 100644
---
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java
@@ -45,6 +45,10 @@ public class CanalSourceConfig extends SourceConfig {
private Short clientId;
+ private String serverUUID;
+
+ private boolean isGTIDMode = true;
+
private Integer batchSize = 10000;
private Long batchTimeout = -1L;
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalRecordOffset.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalRecordOffset.java
index 90c94c99b..d0f2053f4 100644
---
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalRecordOffset.java
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalRecordOffset.java
@@ -30,6 +30,11 @@ public class CanalRecordOffset extends RecordOffset {
private Long offset;
+ // mysql instance gtid range
+ private String gtid;
+
+ private String currentGtid;
+
@Override
public Class<? extends RecordOffset> getRecordOffsetClass() {
return CanalRecordOffset.class;
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalRecordPartition.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalRecordPartition.java
index 72d404bab..ded82306e 100644
---
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalRecordPartition.java
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalRecordPartition.java
@@ -29,6 +29,8 @@ import lombok.ToString;
@ToString
public class CanalRecordPartition extends RecordPartition {
+ private String serverUUID;
+
private String journalName;
private Long timeStamp;
diff --git
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/CanalConnectRecord.java
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/CanalConnectRecord.java
index a723b24dc..36ecd158f 100644
---
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/CanalConnectRecord.java
+++
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/CanalConnectRecord.java
@@ -31,8 +31,14 @@ import lombok.Data;
public class CanalConnectRecord {
private String schemaName;
+
private String tableName;
+ // mysql instance gtid range
+ private String gtid;
+
+ private String currentGtid;
+
/**
* The business type of the changed data (I/U/D/C/A/E), consistent with
the EventType defined in EntryProtocol in canal.
*/
diff --git
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/AbstractDbDialect.java
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/AbstractDbDialect.java
index f5c2245b9..4cf0f82ec 100644
---
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/AbstractDbDialect.java
+++
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/AbstractDbDialect.java
@@ -97,10 +97,6 @@ public abstract class AbstractDbDialect implements DbDialect
{
return sqlTemplate;
}
- public boolean isDRDS() {
- return false;
- }
-
public String getShardColumns(String schema, String table) {
return null;
}
diff --git
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/DbDialect.java
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/DbDialect.java
index a18edfd5b..781c2fe95 100644
---
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/DbDialect.java
+++
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/DbDialect.java
@@ -48,8 +48,6 @@ public interface DbDialect {
public boolean isSupportMergeSql();
- public boolean isDRDS();
-
public LobHandler getLobHandler();
public JdbcTemplate getJdbcTemplate();
diff --git
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/MysqlDialect.java
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/MysqlDialect.java
index acd491ba6..bfe562871 100644
---
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/MysqlDialect.java
+++
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/MysqlDialect.java
@@ -50,10 +50,6 @@ public class MysqlDialect extends AbstractDbDialect {
return null;
}
- public boolean isDRDS() {
- return false;
- }
-
public String getDefaultCatalog() {
return jdbcTemplate.queryForObject("select database()", String.class);
}
diff --git
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java
index 24d6b42f8..0ad07577f 100644
---
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java
+++
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java
@@ -51,35 +51,21 @@ public class SqlBuilderLoadInterceptor {
String shardColumns = null;
if (type.isInsert()) {
- if (CollectionUtils.isEmpty(record.getColumns())
- && (dbDialect.isDRDS())) {
- // sql
- sql = sqlTemplate.getInsertSql(schemaName,
- record.getTableName(),
- buildColumnNames(record.getKeys()),
- buildColumnNames(record.getColumns()));
- } else {
- sql = sqlTemplate.getMergeSql(schemaName,
+ sql = sqlTemplate.getMergeSql(schemaName,
record.getTableName(),
buildColumnNames(record.getKeys()),
buildColumnNames(record.getColumns()),
new String[] {},
- !dbDialect.isDRDS(),
+ true,
shardColumns);
- }
} else if (type.isUpdate()) {
-
boolean existOldKeys =
!CollectionUtils.isEmpty(record.getOldKeys());
boolean rowMode = sinkConfig.getSyncMode().isRow();
String[] keyColumns = null;
String[] otherColumns = null;
if (existOldKeys) {
keyColumns = buildColumnNames(record.getOldKeys());
- if (dbDialect.isDRDS()) {
- otherColumns =
buildColumnNames(record.getUpdatedColumns(), record.getUpdatedKeys());
- } else {
- otherColumns =
buildColumnNames(record.getUpdatedColumns(), record.getKeys());
- }
+ otherColumns = buildColumnNames(record.getUpdatedColumns(),
record.getKeys());
} else {
keyColumns = buildColumnNames(record.getKeys());
otherColumns = buildColumnNames(record.getUpdatedColumns());
@@ -91,10 +77,10 @@ public class SqlBuilderLoadInterceptor {
keyColumns,
otherColumns,
new String[] {},
- !dbDialect.isDRDS(),
+ true,
shardColumns);
} else {
- sql = sqlTemplate.getUpdateSql(schemaName,
record.getTableName(), keyColumns, otherColumns, !dbDialect.isDRDS(),
shardColumns);
+ sql = sqlTemplate.getUpdateSql(schemaName,
record.getTableName(), keyColumns, otherColumns, true, shardColumns);
}
} else if (type.isDelete()) {
sql = sqlTemplate.getDeleteSql(schemaName,
diff --git
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/DbLoadContext.java
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/DbLoadContext.java
index 561d89487..3498e87e7 100644
---
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/DbLoadContext.java
+++
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/DbLoadContext.java
@@ -28,6 +28,8 @@ import lombok.Data;
@Data
public class DbLoadContext {
+ private String gtid;
+
private List<CanalConnectRecord> lastProcessedRecords;
private List<CanalConnectRecord> prepareRecords;
diff --git
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/DbLoadContext.java
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/GtidBatch.java
similarity index 56%
copy from
eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/DbLoadContext.java
copy to
eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/GtidBatch.java
index 561d89487..dd6559b83 100644
---
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/DbLoadContext.java
+++
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/GtidBatch.java
@@ -19,29 +19,30 @@ package org.apache.eventmesh.connector.canal.sink;
import org.apache.eventmesh.connector.canal.CanalConnectRecord;
-import java.util.Collections;
-import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
-import lombok.Data;
+public class GtidBatch {
+ private int totalBatches;
+ private List<List<CanalConnectRecord>> batches;
+ private int receivedBatchCount;
-@Data
-public class DbLoadContext {
-
- private List<CanalConnectRecord> lastProcessedRecords;
-
- private List<CanalConnectRecord> prepareRecords;
-
- private List<CanalConnectRecord> processedRecords;
-
- private List<CanalConnectRecord> failedRecords;
+ public GtidBatch(int totalBatches) {
+ this.totalBatches = totalBatches;
+ this.batches = new CopyOnWriteArrayList<>(new List[totalBatches]);
+ this.receivedBatchCount = 0;
+ }
- public DbLoadContext() {
- lastProcessedRecords = Collections.synchronizedList(new
LinkedList<>());
- prepareRecords = Collections.synchronizedList(new LinkedList<>());
- processedRecords = Collections.synchronizedList(new LinkedList<>());
- failedRecords = Collections.synchronizedList(new LinkedList<>());
+ public void addBatch(int batchIndex, List<CanalConnectRecord>
batchRecords) {
+ batches.set(batchIndex, batchRecords);
+ receivedBatchCount++;
}
+ public List<List<CanalConnectRecord>> getBatches() {
+ return batches;
+ }
+ public boolean isComplete() {
+ return receivedBatchCount == totalBatches;
+ }
}
diff --git
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/DbLoadContext.java
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/GtidBatchManager.java
similarity index 55%
copy from
eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/DbLoadContext.java
copy to
eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/GtidBatchManager.java
index 561d89487..30060aa8f 100644
---
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/DbLoadContext.java
+++
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/GtidBatchManager.java
@@ -19,29 +19,27 @@ package org.apache.eventmesh.connector.canal.sink;
import org.apache.eventmesh.connector.canal.CanalConnectRecord;
-import java.util.Collections;
-import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
-import lombok.Data;
+public class GtidBatchManager {
-@Data
-public class DbLoadContext {
+ private static ConcurrentHashMap<String, GtidBatch> gtidBatchMap = new
ConcurrentHashMap<>();
- private List<CanalConnectRecord> lastProcessedRecords;
-
- private List<CanalConnectRecord> prepareRecords;
-
- private List<CanalConnectRecord> processedRecords;
-
- private List<CanalConnectRecord> failedRecords;
+ public static void addBatch(String gtid, int batchIndex, int totalBatches,
List<CanalConnectRecord> batchRecords) {
+ gtidBatchMap.computeIfAbsent(gtid, k -> new
GtidBatch(totalBatches)).addBatch(batchIndex, batchRecords);
+ }
- public DbLoadContext() {
- lastProcessedRecords = Collections.synchronizedList(new
LinkedList<>());
- prepareRecords = Collections.synchronizedList(new LinkedList<>());
- processedRecords = Collections.synchronizedList(new LinkedList<>());
- failedRecords = Collections.synchronizedList(new LinkedList<>());
+ public static GtidBatch getGtidBatch(String gtid) {
+ return gtidBatchMap.get(gtid);
}
+ public static boolean isComplete(String gtid) {
+ GtidBatch batch = gtidBatchMap.get(gtid);
+ return batch != null && batch.isComplete();
+ }
+ public static void removeGtidBatch(String gtid) {
+ gtidBatchMap.remove(gtid);
+ }
}
diff --git
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java
index 8f9df7595..5f3c0a2bc 100644
---
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java
@@ -31,6 +31,8 @@ import
org.apache.eventmesh.connector.canal.sink.DbLoadContext;
import org.apache.eventmesh.connector.canal.sink.DbLoadData;
import org.apache.eventmesh.connector.canal.sink.DbLoadData.TableLoadData;
import org.apache.eventmesh.connector.canal.sink.DbLoadMerger;
+import org.apache.eventmesh.connector.canal.sink.GtidBatch;
+import org.apache.eventmesh.connector.canal.sink.GtidBatchManager;
import org.apache.eventmesh.connector.canal.source.table.RdbTableMgr;
import org.apache.eventmesh.openconnect.api.ConnectorCreateService;
import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
@@ -38,7 +40,6 @@ import
org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext;
import org.apache.eventmesh.openconnect.api.sink.Sink;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
-
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
@@ -52,6 +53,7 @@ import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -86,9 +88,12 @@ public class CanalSinkConnector implements Sink,
ConnectorCreateService<Sink> {
private ExecutorService executor;
+ private ExecutorService gtidSingleExecutor;
+
private int batchSize = 50;
private boolean useBatch = true;
+
private RdbTableMgr tableMgr;
@Override
@@ -123,6 +128,7 @@ public class CanalSinkConnector implements Sink,
ConnectorCreateService<Sink> {
new ArrayBlockingQueue<>(sinkConfig.getPoolSize() * 4),
new NamedThreadFactory("canalSink"),
new ThreadPoolExecutor.CallerRunsPolicy());
+ gtidSingleExecutor = Executors.newSingleThreadExecutor(r -> new
Thread(r, "gtidSingleExecutor"));
}
@Override
@@ -143,6 +149,7 @@ public class CanalSinkConnector implements Sink,
ConnectorCreateService<Sink> {
@Override
public void stop() {
executor.shutdown();
+ gtidSingleExecutor.shutdown();
}
@Override
@@ -153,6 +160,8 @@ public class CanalSinkConnector implements Sink,
ConnectorCreateService<Sink> {
canalConnectRecordList = filterRecord(canalConnectRecordList);
if (isDdlDatas(canalConnectRecordList)) {
doDdl(context, canalConnectRecordList);
+ } else if (sinkConfig.isGTIDMode()) {
+ doLoadWithGtid(context, sinkConfig, connectRecord);
} else {
canalConnectRecordList =
DbLoadMerger.merge(canalConnectRecordList);
@@ -257,6 +266,57 @@ public class CanalSinkConnector implements Sink,
ConnectorCreateService<Sink> {
batchDatas.clear();
}
+ private void doLoadWithGtid(DbLoadContext context, CanalSinkConfig
sinkConfig, ConnectRecord connectRecord) {
+ int batchIndex = connectRecord.getExtension("batchIndex",
Integer.class);
+ int totalBatches = connectRecord.getExtension("totalBatches",
Integer.class);
+ List<CanalConnectRecord> canalConnectRecordList =
(List<CanalConnectRecord>) connectRecord.getData();
+ String gtid = canalConnectRecordList.get(0).getCurrentGtid();
+ GtidBatchManager.addBatch(gtid, batchIndex, totalBatches,
canalConnectRecordList);
+ // check whether the batch is complete
+ if (GtidBatchManager.isComplete(gtid)) {
+ GtidBatch batch = GtidBatchManager.getGtidBatch(gtid);
+ List<List<CanalConnectRecord>> totalRows = batch.getBatches();
+ List<CanalConnectRecord> filteredRows = new ArrayList<>();
+ for (List<CanalConnectRecord> canalConnectRecords : totalRows) {
+ canalConnectRecords = filterRecord(canalConnectRecords);
+ if (!CollectionUtils.isEmpty(canalConnectRecords)) {
+ for (final CanalConnectRecord record :
canalConnectRecords) {
+ boolean filter = interceptor.before(sinkConfig,
record);
+ filteredRows.add(record);
+ }
+ }
+ }
+ context.setGtid(gtid);
+ Future<Exception> result = gtidSingleExecutor.submit(new
DbLoadWorker(context, filteredRows, dbDialect, false, sinkConfig));
+ Exception ex = null;
+ try {
+ ex = result.get();
+ } catch (Exception e) {
+ ex = e;
+ }
+ Boolean skipException = sinkConfig.getSkipException();
+ if (skipException != null && skipException) {
+ if (ex != null) {
+ // do skip
+ log.warn("skip exception for data : {} , caused by {}",
+ filteredRows,
+ ExceptionUtils.getFullStackTrace(ex));
+ GtidBatchManager.removeGtidBatch(gtid);
+ }
+ } else {
+ if (ex != null) {
+ log.error("sink connector will shutdown by " +
ex.getMessage(), ExceptionUtils.getFullStackTrace(ex));
+ gtidSingleExecutor.shutdown();
+ System.exit(1);
+ } else {
+ GtidBatchManager.removeGtidBatch(gtid);
+ }
+ }
+ } else {
+ log.info("Batch received, waiting for other batches.");
+ }
+ }
+
private List<List<CanalConnectRecord>> split(List<CanalConnectRecord>
records) {
List<List<CanalConnectRecord>> result = new ArrayList<>();
if (records == null || records.isEmpty()) {
@@ -296,12 +356,12 @@ public class CanalSinkConnector implements Sink,
ConnectorCreateService<Sink> {
}
private void doTwoPhase(DbLoadContext context, CanalSinkConfig sinkConfig,
List<List<CanalConnectRecord>> totalRows, boolean canBatch) {
- List<Future<Exception>> results = new ArrayList<Future<Exception>>();
+ List<Future<Exception>> results = new ArrayList<>();
for (List<CanalConnectRecord> rows : totalRows) {
if (CollectionUtils.isEmpty(rows)) {
continue;
}
- results.add(executor.submit(new DbLoadWorker(context, rows,
dbDialect, canBatch)));
+ results.add(executor.submit(new DbLoadWorker(context, rows,
dbDialect, canBatch, sinkConfig)));
}
boolean partFailed = false;
@@ -330,7 +390,7 @@ public class CanalSinkConnector implements Sink,
ConnectorCreateService<Sink> {
Boolean skipException = sinkConfig.getSkipException();
if (skipException != null && skipException) {
for (CanalConnectRecord retryRecord : retryRecords) {
- DbLoadWorker worker = new DbLoadWorker(context,
Arrays.asList(retryRecord), dbDialect, false);
+ DbLoadWorker worker = new DbLoadWorker(context,
Arrays.asList(retryRecord), dbDialect, false, sinkConfig);
try {
Exception ex = worker.call();
if (ex != null) {
@@ -347,7 +407,7 @@ public class CanalSinkConnector implements Sink,
ConnectorCreateService<Sink> {
}
}
} else {
- DbLoadWorker worker = new DbLoadWorker(context, retryRecords,
dbDialect, false);
+ DbLoadWorker worker = new DbLoadWorker(context, retryRecords,
dbDialect, false, sinkConfig);
try {
Exception ex = worker.call();
if (ex != null) {
@@ -355,7 +415,9 @@ public class CanalSinkConnector implements Sink,
ConnectorCreateService<Sink> {
}
} catch (Exception ex) {
log.error("##load phase two failed!", ex);
- throw new RuntimeException(ex);
+ log.error("sink connector will shutdown by " +
ex.getMessage(), ex);
+ executor.shutdown();
+ System.exit(1);
}
}
}
@@ -371,16 +433,21 @@ public class CanalSinkConnector implements Sink,
ConnectorCreateService<Sink> {
private final DbDialect dbDialect;
private final List<CanalConnectRecord> records;
private final boolean canBatch;
+
+ private final CanalSinkConfig sinkConfig;
+
private final List<CanalConnectRecord> allFailedRecords = new
ArrayList<>();
private final List<CanalConnectRecord> allProcessedRecords = new
ArrayList<>();
private final List<CanalConnectRecord> processedRecords = new
ArrayList<>();
private final List<CanalConnectRecord> failedRecords = new
ArrayList<>();
- public DbLoadWorker(DbLoadContext context, List<CanalConnectRecord>
records, DbDialect dbDialect, boolean canBatch) {
+ public DbLoadWorker(DbLoadContext context, List<CanalConnectRecord>
records, DbDialect dbDialect, boolean canBatch,
+ CanalSinkConfig sinkConfig) {
this.context = context;
this.records = records;
this.canBatch = canBatch;
this.dbDialect = dbDialect;
+ this.sinkConfig = sinkConfig;
}
public Exception call() throws Exception {
@@ -394,132 +461,239 @@ public class CanalSinkConnector implements Sink,
ConnectorCreateService<Sink> {
private Exception doCall() {
RuntimeException error = null;
ExecuteResult exeResult = null;
- int index = 0;
- while (index < records.size()) {
- final List<CanalConnectRecord> splitDatas = new ArrayList<>();
- if (useBatch && canBatch) {
- int end = Math.min(index + batchSize, records.size());
- splitDatas.addAll(records.subList(index, end));
- index = end;
- } else {
- splitDatas.add(records.get(index));
- index = index + 1;
- }
+ if (sinkConfig.isGTIDMode()) {
int retryCount = 0;
- while (true) {
- try {
- if (!CollectionUtils.isEmpty(failedRecords)) {
- splitDatas.clear();
- splitDatas.addAll(failedRecords);
- } else {
- failedRecords.addAll(splitDatas);
+ final List<CanalConnectRecord> toExecuteRecords = new
ArrayList<>();
+ try {
+ if (!CollectionUtils.isEmpty(failedRecords)) {
+ // if failedRecords not empty, make it retry
+ toExecuteRecords.addAll(failedRecords);
+ } else {
+ toExecuteRecords.addAll(records);
+ // add to failed record first, maybe get lob or
datasource error
+ failedRecords.addAll(toExecuteRecords);
+ }
+ JdbcTemplate template = dbDialect.getJdbcTemplate();
+ String sourceGtid = context.getGtid();
+ if (StringUtils.isNotEmpty(sourceGtid)) {
+ String setGtid = "SET @@session.gtid_next = '" +
sourceGtid + "';";
+ template.execute(setGtid);
+ } else {
+ log.error("gtid is empty in gtid mode");
+ throw new RuntimeException("gtid is empty in gtid
mode");
+ }
+
+ final LobCreator lobCreator =
dbDialect.getLobHandler().getLobCreator();
+ int affect = (Integer)
dbDialect.getTransactionTemplate().execute((TransactionCallback) status -> {
+ try {
+ failedRecords.clear();
+ processedRecords.clear();
+ int affect1 = 0;
+ for (CanalConnectRecord record : toExecuteRecords)
{
+ int affects = template.update(record.getSql(),
new PreparedStatementSetter() {
+ public void setValues(PreparedStatement
ps) throws SQLException {
+ doPreparedStatement(ps, dbDialect,
lobCreator, record);
+ }
+ });
+ affect1 = affect1 + affects;
+ processStat(record, affects, false);
+ }
+ return affect1;
+ } catch (Exception e) {
+ // rollback
+ status.setRollbackOnly();
+ throw new RuntimeException("Failed to executed",
e);
+ } finally {
+ lobCreator.close();
}
+ });
+
+ // reset gtid
+ String resetGtid = "SET @@session.gtid_next = AUTOMATIC;";
+ dbDialect.getJdbcTemplate().execute(resetGtid);
+ error = null;
+ exeResult = ExecuteResult.SUCCESS;
+ } catch (DeadlockLoserDataAccessException ex) {
+ error = new
RuntimeException(ExceptionUtils.getFullStackTrace(ex));
+ exeResult = ExecuteResult.RETRY;
+ } catch (Throwable ex) {
+ error = new
RuntimeException(ExceptionUtils.getFullStackTrace(ex));
+ exeResult = ExecuteResult.ERROR;
+ }
- final LobCreator lobCreator =
dbDialect.getLobHandler().getLobCreator();
- if (useBatch && canBatch) {
- final String sql = splitDatas.get(0).getSql();
- int[] affects = new int[splitDatas.size()];
- affects = (int[])
dbDialect.getTransactionTemplate().execute((TransactionCallback) status -> {
- try {
- failedRecords.clear();
- processedRecords.clear();
- JdbcTemplate template =
dbDialect.getJdbcTemplate();
- int[] affects1 = template.batchUpdate(sql,
new BatchPreparedStatementSetter() {
-
- public void
setValues(PreparedStatement ps, int idx) throws SQLException {
- doPreparedStatement(ps, dbDialect,
lobCreator, splitDatas.get(idx));
- }
-
- public int getBatchSize() {
- return splitDatas.size();
- }
- });
- return affects1;
- } finally {
- lobCreator.close();
- }
- });
+ if (ExecuteResult.SUCCESS == exeResult) {
+ allFailedRecords.addAll(failedRecords);
+ allProcessedRecords.addAll(processedRecords);
+ failedRecords.clear();
+ processedRecords.clear();
+ } else if (ExecuteResult.RETRY == exeResult) {
+ retryCount = retryCount + 1;
+ processedRecords.clear();
+ failedRecords.clear();
+ failedRecords.addAll(toExecuteRecords);
+ int retry = 3;
+ if (retryCount >= retry) {
+ processFailedDatas(toExecuteRecords.size());
+ throw new RuntimeException(String.format("execute
retry %s times failed", retryCount), error);
+ } else {
+ try {
+ int retryWait = 3000;
+ int wait = retryCount * retryWait;
+ wait = Math.max(wait, retryWait);
+ Thread.sleep(wait);
+ } catch (InterruptedException ex) {
+ Thread.interrupted();
+ processFailedDatas(toExecuteRecords.size());
+ throw new RuntimeException(ex);
+ }
+ }
+ } else {
+ processedRecords.clear();
+ failedRecords.clear();
+ failedRecords.addAll(toExecuteRecords);
+ processFailedDatas(toExecuteRecords.size());
+ throw error;
+ }
+ } else {
+ int index = 0;
+ while (index < records.size()) {
+ final List<CanalConnectRecord> toExecuteRecords = new
ArrayList<>();
+ if (useBatch && canBatch) {
+ int end = Math.min(index + batchSize, records.size());
+ toExecuteRecords.addAll(records.subList(index, end));
+ index = end;
+ } else {
+ toExecuteRecords.add(records.get(index));
+ index = index + 1;
+ }
- for (int i = 0; i < splitDatas.size(); i++) {
- assert affects != null;
- processStat(splitDatas.get(i), affects[i],
true);
+ int retryCount = 0;
+ while (true) {
+ try {
+ if (!CollectionUtils.isEmpty(failedRecords)) {
+ toExecuteRecords.clear();
+ toExecuteRecords.addAll(failedRecords);
+ } else {
+ failedRecords.addAll(toExecuteRecords);
}
- } else {
- final CanalConnectRecord record =
splitDatas.get(0);
- int affect = 0;
- affect = (Integer)
dbDialect.getTransactionTemplate().execute((TransactionCallback) status -> {
- try {
- failedRecords.clear();
- processedRecords.clear();
- JdbcTemplate template =
dbDialect.getJdbcTemplate();
- int affect1 =
template.update(record.getSql(), new PreparedStatementSetter() {
-
- public void
setValues(PreparedStatement ps) throws SQLException {
- doPreparedStatement(ps, dbDialect,
lobCreator, record);
- }
- });
- return affect1;
- } finally {
- lobCreator.close();
+
+ final LobCreator lobCreator =
dbDialect.getLobHandler().getLobCreator();
+ if (useBatch && canBatch) {
+ JdbcTemplate template =
dbDialect.getJdbcTemplate();
+ final String sql =
toExecuteRecords.get(0).getSql();
+
+ int[] affects = new
int[toExecuteRecords.size()];
+
+ affects = (int[])
dbDialect.getTransactionTemplate().execute((TransactionCallback) status -> {
+ try {
+ failedRecords.clear();
+ processedRecords.clear();
+ int[] affects1 =
template.batchUpdate(sql, new BatchPreparedStatementSetter() {
+
+ public void
setValues(PreparedStatement ps, int idx) throws SQLException {
+ doPreparedStatement(ps,
dbDialect, lobCreator, toExecuteRecords.get(idx));
+ }
+
+ public int getBatchSize() {
+ return toExecuteRecords.size();
+ }
+ });
+ return affects1;
+ } catch (Exception e) {
+ // rollback
+ status.setRollbackOnly();
+ throw new RuntimeException("Failed to
execute batch with GTID", e);
+ } finally {
+ lobCreator.close();
+ }
+ });
+
+ for (int i = 0; i < toExecuteRecords.size();
i++) {
+ assert affects != null;
+ processStat(toExecuteRecords.get(i),
affects[i], true);
}
- });
- processStat(record, affect, false);
- }
+ } else {
+ final CanalConnectRecord record =
toExecuteRecords.get(0);
+ JdbcTemplate template =
dbDialect.getJdbcTemplate();
+ int affect = 0;
+ affect = (Integer)
dbDialect.getTransactionTemplate().execute((TransactionCallback) status -> {
+ try {
+ failedRecords.clear();
+ processedRecords.clear();
+ int affect1 =
template.update(record.getSql(), new PreparedStatementSetter() {
+
+ public void
setValues(PreparedStatement ps) throws SQLException {
+ doPreparedStatement(ps,
dbDialect, lobCreator, record);
+ }
+ });
+ return affect1;
+ } catch (Exception e) {
+ // rollback
+ status.setRollbackOnly();
+ throw new RuntimeException("Failed to
executed", e);
+ } finally {
+ lobCreator.close();
+ }
+ });
+ processStat(record, affect, false);
+ }
- error = null;
- exeResult = ExecuteResult.SUCCESS;
- } catch (DeadlockLoserDataAccessException ex) {
- error = new
RuntimeException(ExceptionUtils.getFullStackTrace(ex));
- exeResult = ExecuteResult.RETRY;
- } catch (Throwable ex) {
- error = new
RuntimeException(ExceptionUtils.getFullStackTrace(ex));
- exeResult = ExecuteResult.ERROR;
- }
+ error = null;
+ exeResult = ExecuteResult.SUCCESS;
+ } catch (DeadlockLoserDataAccessException ex) {
+ error = new
RuntimeException(ExceptionUtils.getFullStackTrace(ex));
+ exeResult = ExecuteResult.RETRY;
+ } catch (Throwable ex) {
+ error = new
RuntimeException(ExceptionUtils.getFullStackTrace(ex));
+ exeResult = ExecuteResult.ERROR;
+ }
- if (ExecuteResult.SUCCESS == exeResult) {
- allFailedRecords.addAll(failedRecords);
- allProcessedRecords.addAll(processedRecords);
- failedRecords.clear();
- processedRecords.clear();
- break; // do next eventData
- } else if (ExecuteResult.RETRY == exeResult) {
- retryCount = retryCount + 1;
- processedRecords.clear();
- failedRecords.clear();
- failedRecords.addAll(splitDatas);
- int retry = 3;
- if (retryCount >= retry) {
- processFailedDatas(index);
- throw new RuntimeException(String.format("execute
retry %s times failed", retryCount), error);
- } else {
- try {
- int retryWait = 3000;
- int wait = retryCount * retryWait;
- wait = Math.max(wait, retryWait);
- Thread.sleep(wait);
- } catch (InterruptedException ex) {
- Thread.interrupted();
+ if (ExecuteResult.SUCCESS == exeResult) {
+ allFailedRecords.addAll(failedRecords);
+ allProcessedRecords.addAll(processedRecords);
+ failedRecords.clear();
+ processedRecords.clear();
+ break; // do next eventData
+ } else if (ExecuteResult.RETRY == exeResult) {
+ retryCount = retryCount + 1;
+ processedRecords.clear();
+ failedRecords.clear();
+ failedRecords.addAll(toExecuteRecords);
+ int retry = 3;
+ if (retryCount >= retry) {
processFailedDatas(index);
- throw new RuntimeException(ex);
+ throw new
RuntimeException(String.format("execute retry %s times failed", retryCount),
error);
+ } else {
+ try {
+ int retryWait = 3000;
+ int wait = retryCount * retryWait;
+ wait = Math.max(wait, retryWait);
+ Thread.sleep(wait);
+ } catch (InterruptedException ex) {
+ Thread.interrupted();
+ processFailedDatas(index);
+ throw new RuntimeException(ex);
+ }
}
+ } else {
+ processedRecords.clear();
+ failedRecords.clear();
+ failedRecords.addAll(toExecuteRecords);
+ processFailedDatas(index);
+ throw error;
}
- } else {
- processedRecords.clear();
- failedRecords.clear();
- failedRecords.addAll(splitDatas);
- processFailedDatas(index);
- throw error;
}
}
}
+
context.getFailedRecords().addAll(allFailedRecords);
context.getProcessedRecords().addAll(allProcessedRecords);
return null;
}
private void doPreparedStatement(PreparedStatement ps, DbDialect
dbDialect, LobCreator lobCreator,
- CanalConnectRecord record) throws
SQLException {
+ CanalConnectRecord record) throws SQLException {
EventType type = record.getEventType();
List<EventColumn> columns = new ArrayList<EventColumn>();
if (type.isInsert()) {
@@ -530,11 +704,7 @@ public class CanalSinkConnector implements Sink,
ConnectorCreateService<Sink> {
} else if (type.isUpdate()) {
boolean existOldKeys =
!CollectionUtils.isEmpty(record.getOldKeys());
columns.addAll(record.getUpdatedColumns());
- if (existOldKeys && dbDialect.isDRDS()) {
- columns.addAll(record.getUpdatedKeys());
- } else {
- columns.addAll(record.getKeys());
- }
+ columns.addAll(record.getKeys());
if (existOldKeys) {
columns.addAll(record.getOldKeys());
}
diff --git
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java
index 8ef60ff04..708d5d120 100644
---
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java
+++
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java
@@ -60,9 +60,17 @@ public class EntryParser {
switch (entry.getEntryType()) {
case ROWDATA:
RowChange rowChange =
RowChange.parseFrom(entry.getStoreValue());
- needSync = checkNeedSync(sourceConfig,
rowChange.getRowDatas(0));
- if (needSync) {
- transactionDataBuffer.add(entry);
+ if (sourceConfig.getServerUUID() != null &&
sourceConfig.isGTIDMode()) {
+ String currentGtid =
entry.getHeader().getPropsList().get(0).getValue();
+ if
(currentGtid.contains(sourceConfig.getServerUUID())) {
+ transactionDataBuffer.add(entry);
+ }
+ } else {
+ // if not gtid mode, need check weather the entry
is loopback by specified column value
+ needSync = checkNeedSync(sourceConfig,
rowChange.getRowDatas(0));
+ if (needSync) {
+ transactionDataBuffer.add(entry);
+ }
}
break;
case TRANSACTIONEND:
@@ -169,6 +177,14 @@ public class EntryParser {
canalConnectRecord.setExecuteTime(entry.getHeader().getExecuteTime());
canalConnectRecord.setJournalName(entry.getHeader().getLogfileName());
canalConnectRecord.setBinLogOffset(entry.getHeader().getLogfileOffset());
+ // if enabled gtid mode, gtid not null
+ if (canalSourceConfig.isGTIDMode()) {
+ String currentGtid =
entry.getHeader().getPropsList().get(0).getValue();
+ String gtidRange = replaceGtidRange(entry.getHeader().getGtid(),
currentGtid, canalSourceConfig.getServerUUID());
+ canalConnectRecord.setGtid(gtidRange);
+ canalConnectRecord.setCurrentGtid(currentGtid);
+ }
+
EventType eventType = canalConnectRecord.getEventType();
List<Column> beforeColumns = rowData.getBeforeColumnsList();
@@ -248,6 +264,17 @@ public class EntryParser {
return canalConnectRecord;
}
+ public static String replaceGtidRange(String gtid, String currentGtid,
String serverUUID) {
+ String[] gtidRangeArray = gtid.split(",");
+ for (int i = 0; i < gtidRangeArray.length; i++) {
+ String gtidRange = gtidRangeArray[i];
+ if (gtidRange.startsWith(serverUUID)) {
+ gtidRangeArray[i] = gtidRange.replaceFirst("\\d+$",
currentGtid.split(":")[1]);
+ }
+ }
+ return String.join(",", gtidRangeArray);
+ }
+
private static void checkUpdateKeyColumns(Map<String, EventColumn>
oldKeyColumns,
Map<String, EventColumn>
keyColumns) {
if (oldKeyColumns.isEmpty()) {
diff --git
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java
index 4b9617731..6cd575cb7 100644
---
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java
@@ -150,6 +150,8 @@ public class CanalSourceConnector implements Source,
ConnectorCreateService<Sour
return instance;
}
});
+ DatabaseConnection.sourceConfig =
sourceConfig.getSourceConnectorConfig();
+ DatabaseConnection.initSourceConnection();
tableMgr = new RdbTableMgr(sourceConfig.getSourceConnectorConfig(),
DatabaseConnection.sourceDataSource);
}
@@ -180,6 +182,9 @@ public class CanalSourceConnector implements Source,
ConnectorCreateService<Sour
parameter.setDbUsername(sourceConfig.getSourceConnectorConfig().getUserName());
parameter.setDbPassword(sourceConfig.getSourceConnectorConfig().getPassWord());
+ // set if enabled gtid mode
+ parameter.setGtidEnable(sourceConfig.isGTIDMode());
+
// check positions
// example:
Arrays.asList("{\"journalName\":\"mysql-bin.000001\",\"position\":6163L,\"timestamp\":1322803601000L}",
//
"{\"journalName\":\"mysql-bin.000001\",\"position\":6163L,\"timestamp\":1322803601000L}")
@@ -193,6 +198,14 @@ public class CanalSourceConnector implements Source,
ConnectorCreateService<Sour
recordPositionMap.put("journalName",
canalRecordPartition.getJournalName());
recordPositionMap.put("timestamp",
canalRecordPartition.getTimeStamp());
recordPositionMap.put("position",
canalRecordOffset.getOffset());
+ String gtidRange = canalRecordOffset.getGtid();
+ if (gtidRange != null) {
+ if (canalRecordOffset.getCurrentGtid() != null) {
+ gtidRange =
EntryParser.replaceGtidRange(canalRecordOffset.getGtid(),
canalRecordOffset.getCurrentGtid(),
+ sourceConfig.getServerUUID());
+ }
+ recordPositionMap.put("gtid", gtidRange);
+ }
positions.add(JsonUtils.toJSONString(recordPositionMap));
});
parameter.setPositions(positions);
@@ -237,7 +250,13 @@ public class CanalSourceConnector implements Source,
ConnectorCreateService<Sour
@Override
public void commit(ConnectRecord record) {
long batchId = Long.parseLong(record.getExtension("messageId"));
- canalServer.ack(clientIdentity, batchId);
+ int batchIndex = record.getExtension("batchIndex", Integer.class);
+ int totalBatches = record.getExtension("totalBatches", Integer.class);
+ if (batchIndex == totalBatches - 1) {
+ log.debug("ack records batchIndex:{}, totalBatches:{}, batchId:{}",
+ batchIndex, totalBatches, batchId);
+ canalServer.ack(clientIdentity, batchId);
+ }
}
@Override
@@ -301,21 +320,37 @@ public class CanalSourceConnector implements Source,
ConnectorCreateService<Sour
if (!connectorRecordMap.isEmpty()) {
Set<Map.Entry<Long, List<CanalConnectRecord>>> entrySet =
connectorRecordMap.entrySet();
for (Map.Entry<Long, List<CanalConnectRecord>> entry : entrySet) {
- // Xid offset
- Long binLogOffset = entry.getKey();
List<CanalConnectRecord> connectRecordList = entry.getValue();
CanalConnectRecord lastRecord =
entry.getValue().get(connectRecordList.size() - 1);
CanalRecordPartition canalRecordPartition = new
CanalRecordPartition();
+
canalRecordPartition.setServerUUID(sourceConfig.getServerUUID());
canalRecordPartition.setJournalName(lastRecord.getJournalName());
canalRecordPartition.setTimeStamp(lastRecord.getExecuteTime());
-
+ // Xid offset with gtid
+ Long binLogOffset = entry.getKey();
CanalRecordOffset canalRecordOffset = new CanalRecordOffset();
canalRecordOffset.setOffset(binLogOffset);
+ if (StringUtils.isNotEmpty(lastRecord.getGtid()) &&
StringUtils.isNotEmpty(lastRecord.getCurrentGtid())) {
+ canalRecordOffset.setGtid(lastRecord.getGtid());
+
canalRecordOffset.setCurrentGtid(lastRecord.getCurrentGtid());
+ }
- ConnectRecord connectRecord = new
ConnectRecord(canalRecordPartition, canalRecordOffset,
System.currentTimeMillis());
- connectRecord.addExtension("messageId",
String.valueOf(message.getId()));
- connectRecord.setData(connectRecordList);
- result.add(connectRecord);
+ // split record list
+ List<List<CanalConnectRecord>> splitLists = new ArrayList<>();
+ for (int i = 0; i < connectRecordList.size(); i +=
sourceConfig.getBatchSize()) {
+ int end = Math.min(i + sourceConfig.getBatchSize(),
connectRecordList.size());
+ List<CanalConnectRecord> subList =
connectRecordList.subList(i, end);
+ splitLists.add(subList);
+ }
+
+ for (int i = 0; i < splitLists.size(); i++) {
+ ConnectRecord connectRecord = new
ConnectRecord(canalRecordPartition, canalRecordOffset,
System.currentTimeMillis());
+ connectRecord.addExtension("messageId",
String.valueOf(message.getId()));
+ connectRecord.addExtension("batchIndex", i);
+ connectRecord.addExtension("totalBatches",
splitLists.size());
+ connectRecord.setData(splitLists.get(i));
+ result.add(connectRecord);
+ }
}
} else {
// for the message has been filtered need ack message
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]