This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch eventmesh-function
in repository https://gitbox.apache.org/repos/asf/eventmesh.git

commit 7513508c122e9db91b00aab58d6733311e738816
Author: xwm1992 <[email protected]>
AuthorDate: Wed Jun 5 20:23:14 2024 +0800

    [ISSUE #4979]Canal Connector supports bidirectional data synchronization
---
 .../connector/rdb/canal/CanalSinkConfig.java       |  15 ++-
 .../connector/rdb/canal/CanalSourceConfig.java     |  24 ++---
 .../connector/canal/dialect/MysqlDialect.java      |  18 +---
 .../interceptor/SqlBuilderLoadInterceptor.java     |  13 +--
 .../connector/canal/source/EntryParser.java        | 106 ++++++++++-----------
 .../canal/template/AbstractSqlTemplate.java        |  10 +-
 .../connector/canal/template/MysqlSqlTemplate.java |   2 +-
 .../eventmesh/runtime/boot/RuntimeInstance.java    |  50 +++++-----
 8 files changed, 109 insertions(+), 129 deletions(-)

diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java
index f7a697625..85484b2ce 100644
--- 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java
@@ -27,15 +27,20 @@ import lombok.EqualsAndHashCode;
 @EqualsAndHashCode(callSuper = true)
 public class CanalSinkConfig extends SinkConfig {
 
-    private Integer batchSize = 50;                          // batchSize
+    // batchSize
+    private Integer batchSize = 50;
 
-    private Boolean useBatch = true;                        // enable batch
+    // enable batch
+    private Boolean useBatch = true;
 
-    private Integer poolSize = 5;                           // sink thread 
size for single channel
+    // sink thread size for single channel
+    private Integer poolSize = 5;
 
-    private SyncMode syncMode;                              // sync mode: 
field/row
+    // sync mode: field/row
+    private SyncMode syncMode;
 
-    private Boolean skipException = false;                  // skip sink 
process exception
+    // skip sink process exception
+    private Boolean skipException = false;
 
     public SinkConnectorConfig sinkConnectorConfig;
 
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java
index e5edc5a78..d6e6a7790 100644
--- 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java
@@ -54,27 +54,23 @@ public class CanalSourceConfig extends SourceConfig {
     // ================================= channel parameter
     // ================================
 
-    private Boolean enableRemedy = false;                                      
       // enable remedy
+    // enable remedy
+    private Boolean enableRemedy = false;
 
-    private SyncMode syncMode;                                                 
// sync mode: field/row
+    // sync mode: field/row
+    private SyncMode syncMode;
 
-    private SyncConsistency syncConsistency;                                   
       // sync consistency
+    // sync consistency
+    private SyncConsistency syncConsistency;
 
     // ================================= system parameter
     // ================================
 
-    private String systemSchema;                                             
// Default is retl
+    // Column name of the bidirectional synchronization mark
+    private String needSyncMarkTableColumnName = "needSync";
 
-    private String systemMarkTable;                                          
// Bidirectional synchronization mark table
-
-    private String systemMarkTableColumn;                                    
// Column name of the bidirectional synchronization mark
-
-    private String systemMarkTableInfo;
-    // nfo information of the bidirectional synchronization mark, similar to 
BI_SYNC
-
-    private String systemBufferTable;                                        
// sync buffer table
-
-    private String systemDualTable;                                          
// sync heartbeat table
+    // Column value of the bidirectional synchronization mark
+    private String needSyncMarkTableColumnValue = "needSync";
 
     private SourceConnectorConfig sourceConnectorConfig;
 }
diff --git 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/MysqlDialect.java
 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/MysqlDialect.java
index 32bb79b54..1a47a0521 100644
--- 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/MysqlDialect.java
+++ 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/MysqlDialect.java
@@ -29,19 +29,11 @@ import org.springframework.jdbc.support.lob.LobHandler;
 
 public class MysqlDialect extends AbstractDbDialect {
 
-    private Map<List<String>, String> shardColumns;
-
     public MysqlDialect(JdbcTemplate jdbcTemplate, LobHandler lobHandler) {
         super(jdbcTemplate, lobHandler);
         sqlTemplate = new MysqlSqlTemplate();
     }
 
-    public MysqlDialect(JdbcTemplate jdbcTemplate, LobHandler lobHandler, 
String name, String databaseVersion,
-                        int majorVersion, int minorVersion) {
-        super(jdbcTemplate, lobHandler, name, majorVersion, minorVersion);
-        sqlTemplate = new MysqlSqlTemplate();
-    }
-
     public boolean isCharSpacePadded() {
         return false;
     }
@@ -66,16 +58,8 @@ public class MysqlDialect extends AbstractDbDialect {
         return false;
     }
 
-    public String getShardColumns(String schema, String table) {
-        if (isDRDS()) {
-            return shardColumns.get(Arrays.asList(schema, table));
-        } else {
-            return null;
-        }
-    }
-
     public String getDefaultCatalog() {
-        return (String) jdbcTemplate.queryForObject("select database()", 
String.class);
+        return jdbcTemplate.queryForObject("select database()", String.class);
     }
 
 }
diff --git 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java
 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java
index ab0776c17..24d6b42f8 100644
--- 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java
+++ 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java
@@ -28,11 +28,16 @@ import java.util.List;
 
 import org.springframework.util.CollectionUtils;
 
+import lombok.Getter;
+import lombok.Setter;
+
 /**
  * compute latest sql
  */
 public class SqlBuilderLoadInterceptor {
 
+    @Getter
+    @Setter
     private DbDialect dbDialect;
 
     public boolean before(CanalSinkConfig sinkConfig, CanalConnectRecord 
record) {
@@ -128,12 +133,4 @@ public class SqlBuilderLoadInterceptor {
         }
         return result;
     }
-
-    public DbDialect getDbDialect() {
-        return dbDialect;
-    }
-
-    public void setDbDialect(DbDialect dbDialect) {
-        this.dbDialect = dbDialect;
-    }
 }
diff --git 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java
 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java
index 3031a15df..c54462374 100644
--- 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java
+++ 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java
@@ -23,7 +23,7 @@ import org.apache.eventmesh.connector.canal.model.EventColumn;
 import org.apache.eventmesh.connector.canal.model.EventColumnIndexComparable;
 import org.apache.eventmesh.connector.canal.model.EventType;
 
-import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.StringUtils;
 
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
@@ -49,57 +49,67 @@ public class EntryParser {
     public List<CanalConnectRecord> parse(CanalSourceConfig sourceConfig, 
List<Entry> datas) {
         List<CanalConnectRecord> recordList = new ArrayList<>();
         List<Entry> transactionDataBuffer = new ArrayList<>();
+        // need check weather the entry is loopback
+        boolean needSync;
         try {
             for (Entry entry : datas) {
                 switch (entry.getEntryType()) {
-                    case TRANSACTIONBEGIN:
-                        break;
                     case ROWDATA:
-                        transactionDataBuffer.add(entry);
+                        RowChange rowChange = 
RowChange.parseFrom(entry.getStoreValue());
+                        needSync = checkNeedSync(sourceConfig, 
rowChange.getRowDatas(0));
+                        if (needSync) {
+                            transactionDataBuffer.add(entry);
+                        }
                         break;
                     case TRANSACTIONEND:
-                        for (Entry bufferEntry : transactionDataBuffer) {
-                            List<CanalConnectRecord> recordParsedList = 
internParse(sourceConfig, bufferEntry);
-                            if (CollectionUtils.isEmpty(recordParsedList)) {
-                                continue;
-                            }
-                            long totalSize = 
bufferEntry.getHeader().getEventLength();
-                            long eachSize = totalSize / 
recordParsedList.size();
-                            for (CanalConnectRecord record : recordParsedList) 
{
-                                if (record == null) {
-                                    continue;
-                                }
-                                record.setSize(eachSize);
-                                recordList.add(record);
-                            }
-                        }
+                        parseRecordListWithEntryBuffer(sourceConfig, 
recordList, transactionDataBuffer);
                         transactionDataBuffer.clear();
                         break;
                     default:
                         break;
                 }
             }
+            parseRecordListWithEntryBuffer(sourceConfig, recordList, 
transactionDataBuffer);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        return recordList;
+    }
 
-            for (Entry bufferEntry : transactionDataBuffer) {
-                List<CanalConnectRecord> recordParsedList = 
internParse(sourceConfig, bufferEntry);
-                if (CollectionUtils.isEmpty(recordParsedList)) {
+    private void parseRecordListWithEntryBuffer(CanalSourceConfig 
sourceConfig, List<CanalConnectRecord> recordList,
+        List<Entry> transactionDataBuffer) {
+        for (Entry bufferEntry : transactionDataBuffer) {
+            List<CanalConnectRecord> recordParsedList = 
internParse(sourceConfig, bufferEntry);
+            if (CollectionUtils.isEmpty(recordParsedList)) {
+                continue;
+            }
+            long totalSize = bufferEntry.getHeader().getEventLength();
+            long eachSize = totalSize / recordParsedList.size();
+            for (CanalConnectRecord record : recordParsedList) {
+                if (record == null) {
                     continue;
                 }
+                record.setSize(eachSize);
+                recordList.add(record);
+            }
+        }
+    }
 
-                long totalSize = bufferEntry.getHeader().getEventLength();
-                long eachSize = totalSize / recordParsedList.size();
-                for (CanalConnectRecord record : recordParsedList) {
-                    if (record == null) {
-                        continue;
-                    }
-                    record.setSize(eachSize);
-                    recordList.add(record);
-                }
+    private boolean checkNeedSync(CanalSourceConfig sourceConfig, RowData 
rowData) {
+        Column markedColumn = 
getColumnIgnoreCase(rowData.getAfterColumnsList(), 
sourceConfig.getNeedSyncMarkTableColumnName());
+        if (markedColumn != null) {
+            return StringUtils.equalsIgnoreCase(markedColumn.getValue(), 
sourceConfig.getNeedSyncMarkTableColumnValue());
+        }
+        return false;
+    }
+
+    private Column getColumnIgnoreCase(List<Column> columns, String columName) 
{
+        for (Column column : columns) {
+            if (column.getName().equalsIgnoreCase(columName)) {
+                return column;
             }
-        } catch (Exception e) {
-            throw new RuntimeException(e);
         }
-        return recordList;
+        return null;
     }
 
     private List<CanalConnectRecord> internParse(CanalSourceConfig 
sourceConfig, Entry entry) {
@@ -127,20 +137,9 @@ public class EntryParser {
             return null;
         }
 
-        if (StringUtils.equalsIgnoreCase(sourceConfig.getSystemSchema(), 
schemaName)) {
-            // do noting
-            if (eventType.isDdl()) {
-                return null;
-            }
-
-            if 
(StringUtils.equalsIgnoreCase(sourceConfig.getSystemDualTable(), tableName)) {
-                return null;
-            }
-        } else {
-            if (eventType.isDdl()) {
-                log.warn("unsupported ddl event type: {}", eventType);
-                return null;
-            }
+        if (eventType.isDdl()) {
+            log.warn("unsupported ddl event type: {}", eventType);
+            return null;
         }
 
         List<CanalConnectRecord> recordList = new ArrayList<>();
@@ -164,13 +163,12 @@ public class EntryParser {
 
         List<Column> beforeColumns = rowData.getBeforeColumnsList();
         List<Column> afterColumns = rowData.getAfterColumnsList();
-        String tableName = canalConnectRecord.getSchemaName() + "." + 
canalConnectRecord.getTableName();
 
         boolean isRowMode = canalSourceConfig.getSyncMode().isRow();
 
-        Map<String, EventColumn> keyColumns = new LinkedHashMap<String, 
EventColumn>();
-        Map<String, EventColumn> oldKeyColumns = new LinkedHashMap<String, 
EventColumn>();
-        Map<String, EventColumn> notKeyColumns = new LinkedHashMap<String, 
EventColumn>();
+        Map<String, EventColumn> keyColumns = new LinkedHashMap<>();
+        Map<String, EventColumn> oldKeyColumns = new LinkedHashMap<>();
+        Map<String, EventColumn> notKeyColumns = new LinkedHashMap<>();
 
         if (eventType.isInsert()) {
             for (Column column : afterColumns) {
@@ -195,7 +193,7 @@ public class EntryParser {
                     keyColumns.put(column.getName(), copyEventColumn(column, 
true));
                 } else {
                     if (isRowMode && entry.getHeader().getSourceType() == 
CanalEntry.Type.ORACLE) {
-                        notKeyColumns.put(column.getName(), 
copyEventColumn(column, isRowMode));
+                        notKeyColumns.put(column.getName(), 
copyEventColumn(column, true));
                     }
                 }
             }
@@ -233,7 +231,7 @@ public class EntryParser {
             }
             canalConnectRecord.setColumns(columns);
         } else {
-            throw new RuntimeException("this row data has no pks , entry: " + 
entry.toString() + " and rowData: "
+            throw new RuntimeException("this row data has no pks , entry: " + 
entry + " and rowData: "
                 + rowData);
         }
 
diff --git 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/template/AbstractSqlTemplate.java
 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/template/AbstractSqlTemplate.java
index 10c647c8f..ceb509ef7 100644
--- 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/template/AbstractSqlTemplate.java
+++ 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/template/AbstractSqlTemplate.java
@@ -32,7 +32,7 @@ public abstract class AbstractSqlTemplate implements 
SqlTemplate {
         }
 
         sql.append(" from ").append(getFullName(schemaName, 
tableName)).append(" where ( ");
-        appendColumnEquals(sql, pkNames, "and");
+        appendColumnEquals(sql, pkNames);
         sql.append(" ) ");
         return sql.toString().intern();
     }
@@ -41,7 +41,7 @@ public abstract class AbstractSqlTemplate implements 
SqlTemplate {
         StringBuilder sql = new StringBuilder("update " + 
getFullName(schemaName, tableName) + " set ");
         appendExcludeSingleShardColumnEquals(sql, columnNames, ",", updatePks, 
shardColumn);
         sql.append(" where (");
-        appendColumnEquals(sql, pkNames, "and");
+        appendColumnEquals(sql, pkNames);
         sql.append(")");
         return sql.toString().intern();
     }
@@ -65,7 +65,7 @@ public abstract class AbstractSqlTemplate implements 
SqlTemplate {
 
     public String getDeleteSql(String schemaName, String tableName, String[] 
pkNames) {
         StringBuilder sql = new StringBuilder("delete from " + 
getFullName(schemaName, tableName) + " where ");
-        appendColumnEquals(sql, pkNames, "and");
+        appendColumnEquals(sql, pkNames);
         return sql.toString().intern();
     }
 
@@ -91,12 +91,12 @@ public abstract class AbstractSqlTemplate implements 
SqlTemplate {
         }
     }
 
-    protected void appendColumnEquals(StringBuilder sql, String[] columns, 
String separator) {
+    protected void appendColumnEquals(StringBuilder sql, String[] columns) {
         int size = columns.length;
         for (int i = 0; i < size; i++) {
             sql.append(" ").append(appendEscape(columns[i])).append(" = 
").append("? ");
             if (i != size - 1) {
-                sql.append(separator);
+                sql.append("and");
             }
         }
     }
diff --git 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/template/MysqlSqlTemplate.java
 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/template/MysqlSqlTemplate.java
index a169ed20f..37b45c746 100644
--- 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/template/MysqlSqlTemplate.java
+++ 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/template/MysqlSqlTemplate.java
@@ -47,7 +47,7 @@ public class MysqlSqlTemplate extends AbstractSqlTemplate {
 
         size = columnNames.length;
         for (int i = 0; i < size; i++) {
-            if (!includePks && shardColumn != null && 
columnNames[i].equals(shardColumn)) {
+            if (!includePks && columnNames[i].equals(shardColumn)) {
                 continue;
             }
 
diff --git 
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstance.java
 
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstance.java
index 0fade897f..56b3a5967 100644
--- 
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstance.java
+++ 
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstance.java
@@ -45,7 +45,7 @@ public class RuntimeInstance {
 
     private Map<String, RegisterServerInfo> adminServerInfoMap = new 
HashMap<>();
 
-    private final RegistryService registryService;
+//    private final RegistryService registryService;
 
     private Runtime runtime;
 
@@ -57,20 +57,20 @@ public class RuntimeInstance {
 
     public RuntimeInstance(RuntimeInstanceConfig runtimeInstanceConfig) {
         this.runtimeInstanceConfig = runtimeInstanceConfig;
-        this.registryService = 
RegistryFactory.getInstance(runtimeInstanceConfig.getRegistryPluginType());
+//        this.registryService = 
RegistryFactory.getInstance(runtimeInstanceConfig.getRegistryPluginType());
     }
 
     public void init() throws Exception {
-        registryService.init();
-        QueryInstances queryInstances = new QueryInstances();
-        
queryInstances.setServiceName(runtimeInstanceConfig.getAdminServiceName());
-        queryInstances.setHealth(true);
-        List<RegisterServerInfo> adminServerRegisterInfoList = 
registryService.selectInstances(queryInstances);
-        if (!adminServerRegisterInfoList.isEmpty()) {
-            adminServerAddr = 
getRandomAdminServerAddr(adminServerRegisterInfoList);
-        } else {
-            throw new RuntimeException("admin server address is empty, please 
check");
-        }
+//        registryService.init();
+//        QueryInstances queryInstances = new QueryInstances();
+//        
queryInstances.setServiceName(runtimeInstanceConfig.getAdminServiceName());
+//        queryInstances.setHealth(true);
+//        List<RegisterServerInfo> adminServerRegisterInfoList = 
registryService.selectInstances(queryInstances);
+//        if (!adminServerRegisterInfoList.isEmpty()) {
+//            adminServerAddr = 
getRandomAdminServerAddr(adminServerRegisterInfoList);
+//        } else {
+//            throw new RuntimeException("admin server address is empty, 
please check");
+//        }
         runtimeInstanceConfig.setAdminServerAddr(adminServerAddr);
         runtimeFactory = initRuntimeFactory(runtimeInstanceConfig);
         runtime = runtimeFactory.createRuntime(runtimeInstanceConfig);
@@ -80,19 +80,19 @@ public class RuntimeInstance {
     public void start() throws Exception {
         if (!StringUtils.isBlank(adminServerAddr)) {
 
-            registryService.subscribe((event) -> {
-                log.info("runtime receive registry event: {}", event);
-                List<RegisterServerInfo> registerServerInfoList = 
event.getInstances();
-                Map<String, RegisterServerInfo> registerServerInfoMap = new 
HashMap<>();
-                for (RegisterServerInfo registerServerInfo : 
registerServerInfoList) {
-                    registerServerInfoMap.put(registerServerInfo.getAddress(), 
registerServerInfo);
-                }
-                if (!registerServerInfoMap.isEmpty()) {
-                    adminServerInfoMap = registerServerInfoMap;
-                    updateAdminServerAddr();
-                }
-
-            }, runtimeInstanceConfig.getAdminServiceName());
+//            registryService.subscribe((event) -> {
+//                log.info("runtime receive registry event: {}", event);
+//                List<RegisterServerInfo> registerServerInfoList = 
event.getInstances();
+//                Map<String, RegisterServerInfo> registerServerInfoMap = new 
HashMap<>();
+//                for (RegisterServerInfo registerServerInfo : 
registerServerInfoList) {
+//                    
registerServerInfoMap.put(registerServerInfo.getAddress(), registerServerInfo);
+//                }
+//                if (!registerServerInfoMap.isEmpty()) {
+//                    adminServerInfoMap = registerServerInfoMap;
+//                    updateAdminServerAddr();
+//                }
+//
+//            }, runtimeInstanceConfig.getAdminServiceName());
             runtime.start();
             isStarted = true;
         } else {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to