This is an automated email from the ASF dual-hosted git repository. mikexue pushed a commit to branch eventmesh-function in repository https://gitbox.apache.org/repos/asf/eventmesh.git
commit 7513508c122e9db91b00aab58d6733311e738816 Author: xwm1992 <[email protected]> AuthorDate: Wed Jun 5 20:23:14 2024 +0800 [ISSUE #4979]Canal Connector supports bidirectional data synchronization --- .../connector/rdb/canal/CanalSinkConfig.java | 15 ++- .../connector/rdb/canal/CanalSourceConfig.java | 24 ++--- .../connector/canal/dialect/MysqlDialect.java | 18 +--- .../interceptor/SqlBuilderLoadInterceptor.java | 13 +-- .../connector/canal/source/EntryParser.java | 106 ++++++++++----------- .../canal/template/AbstractSqlTemplate.java | 10 +- .../connector/canal/template/MysqlSqlTemplate.java | 2 +- .../eventmesh/runtime/boot/RuntimeInstance.java | 50 +++++----- 8 files changed, 109 insertions(+), 129 deletions(-) diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java index f7a697625..85484b2ce 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java @@ -27,15 +27,20 @@ import lombok.EqualsAndHashCode; @EqualsAndHashCode(callSuper = true) public class CanalSinkConfig extends SinkConfig { - private Integer batchSize = 50; // batchSize + // batchSize + private Integer batchSize = 50; - private Boolean useBatch = true; // enable batch + // enable batch + private Boolean useBatch = true; - private Integer poolSize = 5; // sink thread size for single channel + // sink thread size for single channel + private Integer poolSize = 5; - private SyncMode syncMode; // sync mode: field/row + // sync mode: field/row + private SyncMode syncMode; - private Boolean skipException = false; // skip sink process exception + // skip sink process exception + private Boolean skipException = false; public SinkConnectorConfig sinkConnectorConfig; diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java index e5edc5a78..d6e6a7790 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java @@ -54,27 +54,23 @@ public class CanalSourceConfig extends SourceConfig { // ================================= channel parameter // ================================ - private Boolean enableRemedy = false; // enable remedy + // enable remedy + private Boolean enableRemedy = false; - private SyncMode syncMode; // sync mode: field/row + // sync mode: field/row + private SyncMode syncMode; - private SyncConsistency syncConsistency; // sync consistency + // sync consistency + private SyncConsistency syncConsistency; // ================================= system parameter // ================================ - private String systemSchema; // Default is retl + // Column name of the bidirectional synchronization mark + private String needSyncMarkTableColumnName = "needSync"; - private String systemMarkTable; // Bidirectional synchronization mark table - - private String systemMarkTableColumn; // Column name of the bidirectional synchronization mark - - private String systemMarkTableInfo; - // nfo information of the bidirectional synchronization mark, similar to BI_SYNC - - private String systemBufferTable; // sync buffer table - - private String systemDualTable; // sync heartbeat table + // Column value of the bidirectional synchronization mark + private String needSyncMarkTableColumnValue = "needSync"; private SourceConnectorConfig sourceConnectorConfig; } diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/MysqlDialect.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/MysqlDialect.java index 32bb79b54..1a47a0521 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/MysqlDialect.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/MysqlDialect.java @@ -29,19 +29,11 @@ import org.springframework.jdbc.support.lob.LobHandler; public class MysqlDialect extends AbstractDbDialect { - private Map<List<String>, String> shardColumns; - public MysqlDialect(JdbcTemplate jdbcTemplate, LobHandler lobHandler) { super(jdbcTemplate, lobHandler); sqlTemplate = new MysqlSqlTemplate(); } - public MysqlDialect(JdbcTemplate jdbcTemplate, LobHandler lobHandler, String name, String databaseVersion, - int majorVersion, int minorVersion) { - super(jdbcTemplate, lobHandler, name, majorVersion, minorVersion); - sqlTemplate = new MysqlSqlTemplate(); - } - public boolean isCharSpacePadded() { return false; } @@ -66,16 +58,8 @@ public class MysqlDialect extends AbstractDbDialect { return false; } - public String getShardColumns(String schema, String table) { - if (isDRDS()) { - return shardColumns.get(Arrays.asList(schema, table)); - } else { - return null; - } - } - public String getDefaultCatalog() { - return (String) jdbcTemplate.queryForObject("select database()", String.class); + return jdbcTemplate.queryForObject("select database()", String.class); } } diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java index ab0776c17..24d6b42f8 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java @@ -28,11 +28,16 @@ import java.util.List; import org.springframework.util.CollectionUtils; +import lombok.Getter; +import lombok.Setter; + /** * compute latest sql */ public class SqlBuilderLoadInterceptor { + @Getter + @Setter private DbDialect dbDialect; public boolean before(CanalSinkConfig sinkConfig, CanalConnectRecord record) { @@ -128,12 +133,4 @@ public class SqlBuilderLoadInterceptor { } return result; } - - public DbDialect getDbDialect() { - return dbDialect; - } - - public void setDbDialect(DbDialect dbDialect) { - this.dbDialect = dbDialect; - } } diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java index 3031a15df..c54462374 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java @@ -23,7 +23,7 @@ import org.apache.eventmesh.connector.canal.model.EventColumn; import org.apache.eventmesh.connector.canal.model.EventColumnIndexComparable; import org.apache.eventmesh.connector.canal.model.EventType; -import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.StringUtils; import java.util.ArrayList; import java.util.LinkedHashMap; @@ -49,57 +49,67 @@ public class EntryParser { public List<CanalConnectRecord> parse(CanalSourceConfig sourceConfig, List<Entry> datas) { List<CanalConnectRecord> recordList = new ArrayList<>(); List<Entry> transactionDataBuffer = new ArrayList<>(); + // need check weather the entry is loopback + boolean needSync; try { for (Entry entry : datas) { switch (entry.getEntryType()) { - case TRANSACTIONBEGIN: - break; case ROWDATA: - transactionDataBuffer.add(entry); + RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); + needSync = checkNeedSync(sourceConfig, rowChange.getRowDatas(0)); + if (needSync) { + transactionDataBuffer.add(entry); + } break; case TRANSACTIONEND: - for (Entry bufferEntry : transactionDataBuffer) { - List<CanalConnectRecord> recordParsedList = internParse(sourceConfig, bufferEntry); - if (CollectionUtils.isEmpty(recordParsedList)) { - continue; - } - long totalSize = bufferEntry.getHeader().getEventLength(); - long eachSize = totalSize / recordParsedList.size(); - for (CanalConnectRecord record : recordParsedList) { - if (record == null) { - continue; - } - record.setSize(eachSize); - recordList.add(record); - } - } + parseRecordListWithEntryBuffer(sourceConfig, recordList, transactionDataBuffer); transactionDataBuffer.clear(); break; default: break; } } + parseRecordListWithEntryBuffer(sourceConfig, recordList, transactionDataBuffer); + } catch (Exception e) { + throw new RuntimeException(e); + } + return recordList; + } - for (Entry bufferEntry : transactionDataBuffer) { - List<CanalConnectRecord> recordParsedList = internParse(sourceConfig, bufferEntry); - if (CollectionUtils.isEmpty(recordParsedList)) { + private void parseRecordListWithEntryBuffer(CanalSourceConfig sourceConfig, List<CanalConnectRecord> recordList, + List<Entry> transactionDataBuffer) { + for (Entry bufferEntry : transactionDataBuffer) { + List<CanalConnectRecord> recordParsedList = internParse(sourceConfig, bufferEntry); + if (CollectionUtils.isEmpty(recordParsedList)) { + continue; + } + long totalSize = bufferEntry.getHeader().getEventLength(); + long eachSize = totalSize / recordParsedList.size(); + for (CanalConnectRecord record : recordParsedList) { + if (record == null) { continue; } + record.setSize(eachSize); + recordList.add(record); + } + } + } - long totalSize = bufferEntry.getHeader().getEventLength(); - long eachSize = totalSize / recordParsedList.size(); - for (CanalConnectRecord record : recordParsedList) { - if (record == null) { - continue; - } - record.setSize(eachSize); - recordList.add(record); - } + private boolean checkNeedSync(CanalSourceConfig sourceConfig, RowData rowData) { + Column markedColumn = getColumnIgnoreCase(rowData.getAfterColumnsList(), sourceConfig.getNeedSyncMarkTableColumnName()); + if (markedColumn != null) { + return StringUtils.equalsIgnoreCase(markedColumn.getValue(), sourceConfig.getNeedSyncMarkTableColumnValue()); + } + return false; + } + + private Column getColumnIgnoreCase(List<Column> columns, String columName) { + for (Column column : columns) { + if (column.getName().equalsIgnoreCase(columName)) { + return column; } - } catch (Exception e) { - throw new RuntimeException(e); } - return recordList; + return null; } private List<CanalConnectRecord> internParse(CanalSourceConfig sourceConfig, Entry entry) { @@ -127,20 +137,9 @@ public class EntryParser { return null; } - if (StringUtils.equalsIgnoreCase(sourceConfig.getSystemSchema(), schemaName)) { - // do noting - if (eventType.isDdl()) { - return null; - } - - if (StringUtils.equalsIgnoreCase(sourceConfig.getSystemDualTable(), tableName)) { - return null; - } - } else { - if (eventType.isDdl()) { - log.warn("unsupported ddl event type: {}", eventType); - return null; - } + if (eventType.isDdl()) { + log.warn("unsupported ddl event type: {}", eventType); + return null; } List<CanalConnectRecord> recordList = new ArrayList<>(); @@ -164,13 +163,12 @@ public class EntryParser { List<Column> beforeColumns = rowData.getBeforeColumnsList(); List<Column> afterColumns = rowData.getAfterColumnsList(); - String tableName = canalConnectRecord.getSchemaName() + "." + canalConnectRecord.getTableName(); boolean isRowMode = canalSourceConfig.getSyncMode().isRow(); - Map<String, EventColumn> keyColumns = new LinkedHashMap<String, EventColumn>(); - Map<String, EventColumn> oldKeyColumns = new LinkedHashMap<String, EventColumn>(); - Map<String, EventColumn> notKeyColumns = new LinkedHashMap<String, EventColumn>(); + Map<String, EventColumn> keyColumns = new LinkedHashMap<>(); + Map<String, EventColumn> oldKeyColumns = new LinkedHashMap<>(); + Map<String, EventColumn> notKeyColumns = new LinkedHashMap<>(); if (eventType.isInsert()) { for (Column column : afterColumns) { @@ -195,7 +193,7 @@ public class EntryParser { keyColumns.put(column.getName(), copyEventColumn(column, true)); } else { if (isRowMode && entry.getHeader().getSourceType() == CanalEntry.Type.ORACLE) { - notKeyColumns.put(column.getName(), copyEventColumn(column, isRowMode)); + notKeyColumns.put(column.getName(), copyEventColumn(column, true)); } } } @@ -233,7 +231,7 @@ public class EntryParser { } canalConnectRecord.setColumns(columns); } else { - throw new RuntimeException("this row data has no pks , entry: " + entry.toString() + " and rowData: " + throw new RuntimeException("this row data has no pks , entry: " + entry + " and rowData: " + rowData); } diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/template/AbstractSqlTemplate.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/template/AbstractSqlTemplate.java index 10c647c8f..ceb509ef7 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/template/AbstractSqlTemplate.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/template/AbstractSqlTemplate.java @@ -32,7 +32,7 @@ public abstract class AbstractSqlTemplate implements SqlTemplate { } sql.append(" from ").append(getFullName(schemaName, tableName)).append(" where ( "); - appendColumnEquals(sql, pkNames, "and"); + appendColumnEquals(sql, pkNames); sql.append(" ) "); return sql.toString().intern(); } @@ -41,7 +41,7 @@ public abstract class AbstractSqlTemplate implements SqlTemplate { StringBuilder sql = new StringBuilder("update " + getFullName(schemaName, tableName) + " set "); appendExcludeSingleShardColumnEquals(sql, columnNames, ",", updatePks, shardColumn); sql.append(" where ("); - appendColumnEquals(sql, pkNames, "and"); + appendColumnEquals(sql, pkNames); sql.append(")"); return sql.toString().intern(); } @@ -65,7 +65,7 @@ public abstract class AbstractSqlTemplate implements SqlTemplate { public String getDeleteSql(String schemaName, String tableName, String[] pkNames) { StringBuilder sql = new StringBuilder("delete from " + getFullName(schemaName, tableName) + " where "); - appendColumnEquals(sql, pkNames, "and"); + appendColumnEquals(sql, pkNames); return sql.toString().intern(); } @@ -91,12 +91,12 @@ public abstract class AbstractSqlTemplate implements SqlTemplate { } } - protected void appendColumnEquals(StringBuilder sql, String[] columns, String separator) { + protected void appendColumnEquals(StringBuilder sql, String[] columns) { int size = columns.length; for (int i = 0; i < size; i++) { sql.append(" ").append(appendEscape(columns[i])).append(" = ").append("? "); if (i != size - 1) { - sql.append(separator); + sql.append("and"); } } } diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/template/MysqlSqlTemplate.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/template/MysqlSqlTemplate.java index a169ed20f..37b45c746 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/template/MysqlSqlTemplate.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/template/MysqlSqlTemplate.java @@ -47,7 +47,7 @@ public class MysqlSqlTemplate extends AbstractSqlTemplate { size = columnNames.length; for (int i = 0; i < size; i++) { - if (!includePks && shardColumn != null && columnNames[i].equals(shardColumn)) { + if (!includePks && columnNames[i].equals(shardColumn)) { continue; } diff --git a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstance.java b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstance.java index 0fade897f..56b3a5967 100644 --- a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstance.java +++ b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstance.java @@ -45,7 +45,7 @@ public class RuntimeInstance { private Map<String, RegisterServerInfo> adminServerInfoMap = new HashMap<>(); - private final RegistryService registryService; +// private final RegistryService registryService; private Runtime runtime; @@ -57,20 +57,20 @@ public class RuntimeInstance { public RuntimeInstance(RuntimeInstanceConfig runtimeInstanceConfig) { this.runtimeInstanceConfig = runtimeInstanceConfig; - this.registryService = RegistryFactory.getInstance(runtimeInstanceConfig.getRegistryPluginType()); +// this.registryService = RegistryFactory.getInstance(runtimeInstanceConfig.getRegistryPluginType()); } public void init() throws Exception { - registryService.init(); - QueryInstances queryInstances = new QueryInstances(); - queryInstances.setServiceName(runtimeInstanceConfig.getAdminServiceName()); - queryInstances.setHealth(true); - List<RegisterServerInfo> adminServerRegisterInfoList = registryService.selectInstances(queryInstances); - if (!adminServerRegisterInfoList.isEmpty()) { - adminServerAddr = getRandomAdminServerAddr(adminServerRegisterInfoList); - } else { - throw new RuntimeException("admin server address is empty, please check"); - } +// registryService.init(); +// QueryInstances queryInstances = new QueryInstances(); +// queryInstances.setServiceName(runtimeInstanceConfig.getAdminServiceName()); +// queryInstances.setHealth(true); +// List<RegisterServerInfo> adminServerRegisterInfoList = registryService.selectInstances(queryInstances); +// if (!adminServerRegisterInfoList.isEmpty()) { +// adminServerAddr = getRandomAdminServerAddr(adminServerRegisterInfoList); +// } else { +// throw new RuntimeException("admin server address is empty, please check"); +// } runtimeInstanceConfig.setAdminServerAddr(adminServerAddr); runtimeFactory = initRuntimeFactory(runtimeInstanceConfig); runtime = runtimeFactory.createRuntime(runtimeInstanceConfig); @@ -80,19 +80,19 @@ public class RuntimeInstance { public void start() throws Exception { if (!StringUtils.isBlank(adminServerAddr)) { - registryService.subscribe((event) -> { - log.info("runtime receive registry event: {}", event); - List<RegisterServerInfo> registerServerInfoList = event.getInstances(); - Map<String, RegisterServerInfo> registerServerInfoMap = new HashMap<>(); - for (RegisterServerInfo registerServerInfo : registerServerInfoList) { - registerServerInfoMap.put(registerServerInfo.getAddress(), registerServerInfo); - } - if (!registerServerInfoMap.isEmpty()) { - adminServerInfoMap = registerServerInfoMap; - updateAdminServerAddr(); - } - - }, runtimeInstanceConfig.getAdminServiceName()); +// registryService.subscribe((event) -> { +// log.info("runtime receive registry event: {}", event); +// List<RegisterServerInfo> registerServerInfoList = event.getInstances(); +// Map<String, RegisterServerInfo> registerServerInfoMap = new HashMap<>(); +// for (RegisterServerInfo registerServerInfo : registerServerInfoList) { +// registerServerInfoMap.put(registerServerInfo.getAddress(), registerServerInfo); +// } +// if (!registerServerInfoMap.isEmpty()) { +// adminServerInfoMap = registerServerInfoMap; +// updateAdminServerAddr(); +// } +// +// }, runtimeInstanceConfig.getAdminServiceName()); runtime.start(); isStarted = true; } else { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
