This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new 691aab015 [ISSUE #5071] Enhancement for admin server and canal 
source/sink connector (#5072)
691aab015 is described below

commit 691aab0152022ec98ef74e6252775bbd336a2f50
Author: mike_xwm <[email protected]>
AuthorDate: Wed Aug 7 18:16:23 2024 +0800

    [ISSUE #5071] Enhancement for admin server and canal source/sink connector 
(#5072)
    
    * [ISSUE #5069] Enhancement for http source/sink connector
    
    * update http source connector & config
    
    * fix checkstyle error
    
    * [ISSUE #5071] Enhancement for admin server and canal source/sink connector
---
 eventmesh-admin-server/build.gradle                |  2 +-
 eventmesh-admin-server/conf/application.yaml       |  8 +++-
 eventmesh-admin-server/conf/eventmesh.sql          |  7 +--
 .../conf/mapper/EventMeshDataSourceMapper.xml      |  5 +-
 .../server/web/db/entity/EventMeshDataSource.java  |  2 +
 .../web/handler/impl/FetchJobRequestHandler.java   |  4 +-
 .../service/datasource/DataSourceBizService.java   |  2 +
 .../server/web/service/job/JobInfoBizService.java  | 16 +++++--
 .../server/web/service/task/TaskBizService.java    | 35 ++++++++++++--
 .../common/remote/datasource/DataSourceType.java   |  9 ++++
 .../request/CreateOrUpdateDataSourceReq.java       |  1 +
 .../common/remote/request/CreateTaskRequest.java   |  6 +--
 ...apache.eventmesh.common.remote.payload.IPayload |  1 +
 .../connector/canal/CanalConnectRecord.java        |  5 +-
 .../canal/sink/connector/CanalSinkConnector.java   | 24 +++++++---
 .../connector/canal/source/EntryParser.java        | 15 ++++--
 .../offsetmgmt/admin/AdminOffsetService.java       | 13 ++++-
 .../offsetmgmt/api/data/ConnectRecord.java         | 53 ++++----------------
 .../eventmesh/runtime/boot/RuntimeInstance.java    | 34 +++++++------
 .../runtime/connector/ConnectorRuntime.java        | 56 +++++++++++++++-------
 .../src/main/resources/connector.yaml              |  6 ++-
 .../src/main/resources/runtime.yaml                |  2 +-
 22 files changed, 195 insertions(+), 111 deletions(-)

diff --git a/eventmesh-admin-server/build.gradle 
b/eventmesh-admin-server/build.gradle
index bdb6406da..1fec2c7c5 100644
--- a/eventmesh-admin-server/build.gradle
+++ b/eventmesh-admin-server/build.gradle
@@ -20,7 +20,7 @@ dependencies {
     implementation project(":eventmesh-common")
     implementation project(":eventmesh-registry:eventmesh-registry-api")
     implementation project(":eventmesh-registry:eventmesh-registry-nacos")
-    implementation 
project(':eventmesh-openconnect:eventmesh-openconnect-offsetmgmt-plugin:eventmesh-openconnect-offsetmgmt-api')
+    implementation 
project(":eventmesh-openconnect:eventmesh-openconnect-offsetmgmt-plugin:eventmesh-openconnect-offsetmgmt-api")
     implementation "com.alibaba.nacos:nacos-client"
     implementation("org.springframework.boot:spring-boot-starter-web") {
         exclude group: "org.springframework.boot", module: 
"spring-boot-starter-tomcat"
diff --git a/eventmesh-admin-server/conf/application.yaml 
b/eventmesh-admin-server/conf/application.yaml
index afbcd4a43..274196db6 100644
--- a/eventmesh-admin-server/conf/application.yaml
+++ b/eventmesh-admin-server/conf/application.yaml
@@ -26,13 +26,17 @@ mybatis-plus:
   configuration:
     map-underscore-to-camel-case: false
     log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
+# http server port
+server:
+  port: 8082
 event-mesh:
   admin-server:
     serviceName: DEFAULT_GROUP@@em_adm_server
+    # grpc server port
     port: 8081
     adminServerList:
       region1:
-        - http://localhost:8081
-      region2:
         - http://localhost:8082
+      region2:
+        - http://localhost:8083
     region: region1
\ No newline at end of file
diff --git a/eventmesh-admin-server/conf/eventmesh.sql 
b/eventmesh-admin-server/conf/eventmesh.sql
index 94edbb6fa..bdad02a8d 100644
--- a/eventmesh-admin-server/conf/eventmesh.sql
+++ b/eventmesh-admin-server/conf/eventmesh.sql
@@ -33,6 +33,7 @@ CREATE TABLE IF NOT EXISTS `event_mesh_data_source` (
   `dataType` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
   `description` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci 
DEFAULT NULL,
   `configuration` text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT 
NULL,
+  `configurationClass` varchar(200) CHARACTER SET utf8mb4 COLLATE 
utf8mb4_general_ci NOT NULL DEFAULT '',
   `region` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
   `createUid` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
   `updateUid` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
@@ -134,13 +135,13 @@ CREATE TABLE IF NOT EXISTS `event_mesh_task_info` (
 
 -- export table eventmesh.event_mesh_verify structure
 CREATE TABLE IF NOT EXISTS `event_mesh_verify` (
-  `id` int NOT NULL,
+  `id` int unsigned NOT NULL AUTO_INCREMENT,
   `taskID` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
   `recordID` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
   `recordSig` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
-  `connectorName` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
+  `connectorName` varchar(200) COLLATE utf8mb4_general_ci DEFAULT NULL,
   `connectorStage` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
-  `position` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
+  `position` text COLLATE utf8mb4_general_ci DEFAULT NULL,
   `createTime` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
   PRIMARY KEY (`id`)
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
diff --git a/eventmesh-admin-server/conf/mapper/EventMeshDataSourceMapper.xml 
b/eventmesh-admin-server/conf/mapper/EventMeshDataSourceMapper.xml
index d100e1903..50e6ad82c 100644
--- a/eventmesh-admin-server/conf/mapper/EventMeshDataSourceMapper.xml
+++ b/eventmesh-admin-server/conf/mapper/EventMeshDataSourceMapper.xml
@@ -28,6 +28,7 @@
             <result property="dataType" column="dataType" jdbcType="VARCHAR"/>
             <result property="description" column="description" 
jdbcType="VARCHAR"/>
             <result property="configuration" column="configuration" 
jdbcType="VARCHAR"/>
+            <result property="configurationClass" column="configurationClass" 
jdbcType="VARCHAR"/>
             <result property="region" column="region" jdbcType="VARCHAR"/>
             <result property="createUid" column="createUid" 
jdbcType="VARCHAR"/>
             <result property="updateUid" column="updateUid" 
jdbcType="VARCHAR"/>
@@ -37,7 +38,7 @@
 
     <sql id="Base_Column_List">
         id,dataType,description,
-        configuration,region,createUid,updateUid,
-        createTime,updateTime
+        configuration,configurationClass,region,
+        createUid,updateUid,createTime,updateTime
     </sql>
 </mapper>
diff --git 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshDataSource.java
 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshDataSource.java
index 9d81366aa..e6e328984 100644
--- 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshDataSource.java
+++ 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshDataSource.java
@@ -41,6 +41,8 @@ public class EventMeshDataSource implements Serializable {
 
     private String configuration;
 
+    private String configurationClass;
+
     private String region;
 
     private String createUid;
diff --git 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchJobRequestHandler.java
 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchJobRequestHandler.java
index 8f159fa45..b377bcddd 100644
--- 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchJobRequestHandler.java
+++ 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchJobRequestHandler.java
@@ -53,9 +53,9 @@ public class FetchJobRequestHandler extends 
BaseRequestHandler<FetchJobRequest,
         }
         response.setId(detail.getJobID());
         JobConnectorConfig config = new JobConnectorConfig();
-        
config.setSourceConnectorConfig(JsonUtils.objectToMap(detail.getSourceDataSource()));
+        
config.setSourceConnectorConfig(JsonUtils.objectToMap(detail.getSourceDataSource().getConf()));
         config.setSourceConnectorDesc(detail.getSourceConnectorDesc());
-        
config.setSinkConnectorConfig(JsonUtils.objectToMap(detail.getSinkDataSource()));
+        
config.setSinkConnectorConfig(JsonUtils.objectToMap(detail.getSinkDataSource().getConf()));
         config.setSourceConnectorDesc(detail.getSinkConnectorDesc());
         response.setConnectorConfig(config);
         response.setTransportType(detail.getTransportType());
diff --git 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/datasource/DataSourceBizService.java
 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/datasource/DataSourceBizService.java
index 433847a4c..4d2d67010 100644
--- 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/datasource/DataSourceBizService.java
+++ 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/datasource/DataSourceBizService.java
@@ -29,12 +29,14 @@ import org.springframework.stereotype.Service;
 
 @Service
 public class DataSourceBizService {
+
     @Autowired
     private EventMeshDataSourceService dataSourceService;
 
     public EventMeshDataSource createDataSource(CreateOrUpdateDataSourceReq 
dataSource) {
         EventMeshDataSource entity = new EventMeshDataSource();
         
entity.setConfiguration(JsonUtils.toJSONString(dataSource.getConfig()));
+        entity.setConfigurationClass(dataSource.getConfigClass());
         entity.setDataType(dataSource.getType().name());
         entity.setCreateUid(dataSource.getOperator());
         entity.setUpdateUid(dataSource.getOperator());
diff --git 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java
 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java
index ea0265848..0657383e2 100644
--- 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java
+++ 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java
@@ -27,6 +27,7 @@ import 
org.apache.eventmesh.admin.server.web.db.service.EventMeshJobInfoService;
 import org.apache.eventmesh.admin.server.web.pojo.JobDetail;
 import 
org.apache.eventmesh.admin.server.web.service.datasource.DataSourceBizService;
 import 
org.apache.eventmesh.admin.server.web.service.position.PositionBizService;
+import org.apache.eventmesh.common.config.connector.Config;
 import org.apache.eventmesh.common.remote.TaskState;
 import org.apache.eventmesh.common.remote.TransportType;
 import org.apache.eventmesh.common.remote.datasource.DataSource;
@@ -114,6 +115,7 @@ public class JobInfoBizService {
             source.setRegion(job.getSourceDataSource().getRegion());
             source.setDesc(job.getSourceConnectorDesc());
             source.setConfig(job.getSourceDataSource().getConf());
+            
source.setConfigClass(job.getSourceDataSource().getConfClazz().getName());
             EventMeshDataSource createdSource = 
dataSourceBizService.createDataSource(source);
             entity.setSourceData(createdSource.getId());
 
@@ -123,6 +125,7 @@ public class JobInfoBizService {
             sink.setRegion(job.getSinkDataSource().getRegion());
             sink.setDesc(job.getSinkConnectorDesc());
             sink.setConfig(job.getSinkDataSource().getConf());
+            
sink.setConfigClass(job.getSinkDataSource().getConfClazz().getName());
             EventMeshDataSource createdSink = 
dataSourceBizService.createDataSource(sink);
             entity.setTargetData(createdSink.getId());
 
@@ -141,18 +144,22 @@ public class JobInfoBizService {
         if (jobID == null) {
             return null;
         }
-        EventMeshJobInfo job = jobInfoService.getById(jobID);
+        EventMeshJobInfo job = 
jobInfoService.getOne(Wrappers.<EventMeshJobInfo>query().eq("jobID", jobID));
         if (job == null) {
             return null;
         }
         JobDetail detail = new JobDetail();
+        detail.setTaskID(job.getTaskID());
         detail.setJobID(job.getJobID());
         EventMeshDataSource source = 
dataSourceService.getById(job.getSourceData());
         EventMeshDataSource target = 
dataSourceService.getById(job.getTargetData());
         if (source != null) {
             if (!StringUtils.isBlank(source.getConfiguration())) {
                 try {
-                    
detail.setSourceDataSource(JsonUtils.parseObject(source.getConfiguration(), 
DataSource.class));
+                    DataSource sourceDataSource = new DataSource();
+                    Class<?> configClass = 
Class.forName(source.getConfigurationClass());
+                    sourceDataSource.setConf((Config) 
JsonUtils.parseObject(source.getConfiguration(), configClass));
+                    detail.setSourceDataSource(sourceDataSource);
                 } catch (Exception e) {
                     log.warn("parse source config id [{}] fail", 
job.getSourceData(), e);
                     throw new 
AdminServerRuntimeException(ErrorCode.BAD_DB_DATA, "illegal source data source 
config");
@@ -168,7 +175,10 @@ public class JobInfoBizService {
         if (target != null) {
             if (!StringUtils.isBlank(target.getConfiguration())) {
                 try {
-                    
detail.setSinkDataSource(JsonUtils.parseObject(target.getConfiguration(), 
DataSource.class));
+                    DataSource sinkDataSource = new DataSource();
+                    Class<?> configClass = 
Class.forName(target.getConfigurationClass());
+                    sinkDataSource.setConf((Config) 
JsonUtils.parseObject(target.getConfiguration(), configClass));
+                    detail.setSinkDataSource(sinkDataSource);
                 } catch (Exception e) {
                     log.warn("parse sink config id [{}] fail", 
job.getSourceData(), e);
                     throw new 
AdminServerRuntimeException(ErrorCode.BAD_DB_DATA, "illegal target data sink 
config");
diff --git 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/task/TaskBizService.java
 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/task/TaskBizService.java
index f68645613..7089f9cf7 100644
--- 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/task/TaskBizService.java
+++ 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/task/TaskBizService.java
@@ -22,12 +22,17 @@ import 
org.apache.eventmesh.admin.server.web.db.entity.EventMeshTaskInfo;
 import 
org.apache.eventmesh.admin.server.web.db.service.EventMeshTaskInfoService;
 import org.apache.eventmesh.admin.server.web.pojo.JobDetail;
 import org.apache.eventmesh.admin.server.web.service.job.JobInfoBizService;
+import org.apache.eventmesh.common.config.connector.Config;
 import org.apache.eventmesh.common.remote.TaskState;
+import org.apache.eventmesh.common.remote.datasource.DataSource;
+import org.apache.eventmesh.common.remote.datasource.DataSourceType;
 import org.apache.eventmesh.common.remote.request.CreateTaskRequest;
+import org.apache.eventmesh.common.utils.JsonUtils;
 
 import org.apache.commons.lang3.StringUtils;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.UUID;
 import java.util.stream.Collectors;
@@ -40,6 +45,7 @@ import org.springframework.web.client.RestTemplate;
 
 @Service
 public class TaskBizService {
+
     @Autowired
     private EventMeshTaskInfoService taskInfoService;
 
@@ -76,7 +82,12 @@ public class TaskBizService {
 
         String finalTaskID = taskID;
         List<JobDetail> jobs = req.getJobs().stream().map(x -> {
-            JobDetail job = parse(x);
+            JobDetail job = null;
+            try {
+                job = parse(x);
+            } catch (ClassNotFoundException e) {
+                throw new RuntimeException(e);
+            }
             job.setTaskID(finalTaskID);
             job.setCreateUid(req.getUid());
             job.setUpdateUid(req.getUid());
@@ -95,14 +106,30 @@ public class TaskBizService {
         return finalTaskID;
     }
 
-    private JobDetail parse(CreateTaskRequest.JobDetail src) {
+    private JobDetail parse(CreateTaskRequest.JobDetail src) throws 
ClassNotFoundException {
         JobDetail dst = new JobDetail();
         dst.setJobDesc(src.getJobDesc());
         dst.setTransportType(src.getTransportType());
         dst.setSourceConnectorDesc(src.getSourceConnectorDesc());
-        dst.setSourceDataSource(src.getSourceDataSource());
+        Map<String, Object> sourceDataMap = src.getSourceDataSource();
+        DataSource sourceDataSource = new DataSource();
+        
sourceDataSource.setType(DataSourceType.fromString(sourceDataMap.get("type").toString()));
+        sourceDataSource.setDesc((String) sourceDataMap.get("desc"));
+        sourceDataSource.setConfClazz((Class<? extends Config>) 
Class.forName(sourceDataMap.get("confClazz").toString()));
+        
sourceDataSource.setConf(JsonUtils.parseObject(JsonUtils.toJSONString(sourceDataMap.get("conf")),
 sourceDataSource.getConfClazz()));
+        sourceDataSource.setRegion((String) sourceDataMap.get("region"));
+        dst.setSourceDataSource(sourceDataSource);
+
         dst.setSinkConnectorDesc(src.getSinkConnectorDesc());
-        dst.setSinkDataSource(src.getSinkDataSource());
+        Map<String, Object> sinkDataMap = src.getSinkDataSource();
+        DataSource sinkDataSource = new DataSource();
+        
sinkDataSource.setType(DataSourceType.fromString(sinkDataMap.get("type").toString()));
+        sinkDataSource.setDesc((String) sinkDataMap.get("desc"));
+        sinkDataSource.setConfClazz((Class<? extends Config>) 
Class.forName(sinkDataMap.get("confClazz").toString()));
+        
sinkDataSource.setConf(JsonUtils.parseObject(JsonUtils.toJSONString(sinkDataMap.get("conf")),
 sinkDataSource.getConfClazz()));
+        sinkDataSource.setRegion((String) sinkDataMap.get("region"));
+        dst.setSinkDataSource(sinkDataSource);
+
         // full/increase/check
         dst.setJobType(src.getJobType());
         dst.setFromRegion(src.getFromRegion());
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSourceType.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSourceType.java
index 985f311b9..8c40971e7 100644
--- 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSourceType.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSourceType.java
@@ -61,4 +61,13 @@ public enum DataSourceType {
         }
         return TYPES[index];
     }
+
+    public static DataSourceType fromString(String type) {
+        for (DataSourceType dataSourceType : DataSourceType.values()) {
+            if (dataSourceType.name().equalsIgnoreCase(type)) {
+                return dataSourceType;
+            }
+        }
+        throw new IllegalArgumentException("No enum constant for type: " + 
type);
+    }
 }
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateOrUpdateDataSourceReq.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateOrUpdateDataSourceReq.java
index fadfa68e7..f78349703 100644
--- 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateOrUpdateDataSourceReq.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateOrUpdateDataSourceReq.java
@@ -34,6 +34,7 @@ public class CreateOrUpdateDataSourceReq extends 
BaseRemoteRequest {
     private DataSourceType type;
     private String desc;
     private Config config;
+    private String configClass;
     private String region;
     private String operator;
 }
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateTaskRequest.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateTaskRequest.java
index 47c45595a..c895b5c44 100644
--- 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateTaskRequest.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateTaskRequest.java
@@ -18,10 +18,10 @@
 package org.apache.eventmesh.common.remote.request;
 
 import org.apache.eventmesh.common.remote.TransportType;
-import org.apache.eventmesh.common.remote.datasource.DataSource;
 import org.apache.eventmesh.common.remote.job.JobType;
 
 import java.util.List;
+import java.util.Map;
 
 import lombok.Data;
 
@@ -61,11 +61,11 @@ public class CreateTaskRequest {
         // full/increase/check
         private JobType jobType;
 
-        private DataSource sourceDataSource;
+        private Map<String, Object> sourceDataSource;
 
         private String sourceConnectorDesc;
 
-        private DataSource sinkDataSource;
+        private Map<String, Object> sinkDataSource;
 
         private String sinkConnectorDesc;
 
diff --git 
a/eventmesh-common/src/main/resources/META-INF/services/org.apache.eventmesh.common.remote.payload.IPayload
 
b/eventmesh-common/src/main/resources/META-INF/services/org.apache.eventmesh.common.remote.payload.IPayload
index 2af95c751..82d5c94dd 100644
--- 
a/eventmesh-common/src/main/resources/META-INF/services/org.apache.eventmesh.common.remote.payload.IPayload
+++ 
b/eventmesh-common/src/main/resources/META-INF/services/org.apache.eventmesh.common.remote.payload.IPayload
@@ -16,6 +16,7 @@
 org.apache.eventmesh.common.remote.request.FetchJobRequest
 org.apache.eventmesh.common.remote.response.FetchJobResponse
 org.apache.eventmesh.common.remote.request.ReportPositionRequest
+org.apache.eventmesh.common.remote.request.ReportVerifyRequest
 org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest
 org.apache.eventmesh.common.remote.request.FetchPositionRequest
 org.apache.eventmesh.common.remote.response.FetchPositionResponse
\ No newline at end of file
diff --git 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/CanalConnectRecord.java
 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/CanalConnectRecord.java
index 36ecd158f..6f112081e 100644
--- 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/CanalConnectRecord.java
+++ 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/CanalConnectRecord.java
@@ -22,13 +22,16 @@ import org.apache.eventmesh.common.remote.job.SyncMode;
 import org.apache.eventmesh.connector.canal.model.EventColumn;
 import org.apache.eventmesh.connector.canal.model.EventType;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
 import lombok.Data;
 
 @Data
-public class CanalConnectRecord {
+public class CanalConnectRecord implements Serializable {
+
+    private static final long serialVersionUID = 1L;
 
     private String schemaName;
 
diff --git 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java
 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java
index 2ecb2384a..49fb10dd3 100644
--- 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java
@@ -44,6 +44,7 @@ import 
org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.commons.lang3.SerializationUtils;
 
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
@@ -163,7 +164,11 @@ public class CanalSinkConnector implements Sink, 
ConnectorCreateService<Sink> {
     public void put(List<ConnectRecord> sinkRecords) {
         DbLoadContext context = new DbLoadContext();
         for (ConnectRecord connectRecord : sinkRecords) {
-            List<CanalConnectRecord> canalConnectRecordList = 
(List<CanalConnectRecord>) connectRecord.getData();
+            List<CanalConnectRecord> canalConnectRecordList = new 
ArrayList<>();
+            // deep copy connectRecord data
+            for (CanalConnectRecord record : (List<CanalConnectRecord>) 
connectRecord.getData()) {
+                canalConnectRecordList.add(SerializationUtils.clone(record));
+            }
             canalConnectRecordList = filterRecord(canalConnectRecordList);
             if (isDdlDatas(canalConnectRecordList)) {
                 doDdl(context, canalConnectRecordList, connectRecord);
@@ -175,7 +180,7 @@ public class CanalSinkConnector implements Sink, 
ConnectorCreateService<Sink> {
                 DbLoadData loadData = new DbLoadData();
                 doBefore(canalConnectRecordList, loadData);
 
-                doLoad(context, sinkConfig, loadData);
+                doLoad(context, sinkConfig, loadData, connectRecord);
 
             }
 
@@ -259,7 +264,7 @@ public class CanalSinkConnector implements Sink, 
ConnectorCreateService<Sink> {
         }
     }
 
-    private void doLoad(DbLoadContext context, CanalSinkConfig sinkConfig, 
DbLoadData loadData) {
+    private void doLoad(DbLoadContext context, CanalSinkConfig sinkConfig, 
DbLoadData loadData, ConnectRecord connectRecord) {
         List<List<CanalConnectRecord>> batchDatas = new ArrayList<>();
         for (TableLoadData tableData : loadData.getTables()) {
             if (useBatch) {
@@ -271,7 +276,7 @@ public class CanalSinkConnector implements Sink, 
ConnectorCreateService<Sink> {
             }
         }
 
-        doTwoPhase(context, sinkConfig, batchDatas, true);
+        doTwoPhase(context, sinkConfig, batchDatas, true, connectRecord);
 
         batchDatas.clear();
 
@@ -289,7 +294,7 @@ public class CanalSinkConnector implements Sink, 
ConnectorCreateService<Sink> {
             }
         }
 
-        doTwoPhase(context, sinkConfig, batchDatas, true);
+        doTwoPhase(context, sinkConfig, batchDatas, true, connectRecord);
 
         batchDatas.clear();
     }
@@ -390,7 +395,8 @@ public class CanalSinkConnector implements Sink, 
ConnectorCreateService<Sink> {
             && StringUtils.equals(source.getSql(), target.getSql());
     }
 
-    private void doTwoPhase(DbLoadContext context, CanalSinkConfig sinkConfig, 
List<List<CanalConnectRecord>> totalRows, boolean canBatch) {
+    private void doTwoPhase(DbLoadContext context, CanalSinkConfig sinkConfig, 
List<List<CanalConnectRecord>> totalRows, boolean canBatch,
+        ConnectRecord connectRecord) {
         List<Future<Exception>> results = new ArrayList<>();
         for (List<CanalConnectRecord> rows : totalRows) {
             if (CollectionUtils.isEmpty(rows)) {
@@ -404,6 +410,9 @@ public class CanalSinkConnector implements Sink, 
ConnectorCreateService<Sink> {
             Exception ex = null;
             try {
                 ex = result.get();
+                if (ex == null) {
+                    
connectRecord.getCallback().onSuccess(convertToSendResult(connectRecord));
+                }
             } catch (Exception e) {
                 ex = e;
             }
@@ -433,12 +442,14 @@ public class CanalSinkConnector implements Sink, 
ConnectorCreateService<Sink> {
                             log.warn("skip exception for data : {} , caused by 
{}",
                                 retryRecord,
                                 ExceptionUtils.getFullStackTrace(ex));
+                            
connectRecord.getCallback().onSuccess(convertToSendResult(connectRecord));
                         }
                     } catch (Exception ex) {
                         // do skip
                         log.warn("skip exception for data : {} , caused by {}",
                             retryRecord,
                             ExceptionUtils.getFullStackTrace(ex));
+                        
connectRecord.getCallback().onSuccess(convertToSendResult(connectRecord));
                     }
                 }
             } else {
@@ -451,6 +462,7 @@ public class CanalSinkConnector implements Sink, 
ConnectorCreateService<Sink> {
                 } catch (Exception ex) {
                     log.error("##load phase two failed!", ex);
                     log.error("sink connector will shutdown by " + 
ex.getMessage(), ex);
+                    
connectRecord.getCallback().onException(buildSendExceptionContext(connectRecord,
 ex));
                     executor.shutdown();
                     System.exit(1);
                 }
diff --git 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java
 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java
index 5c4303588..75572a5fa 100644
--- 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java
+++ 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java
@@ -67,7 +67,7 @@ public class EntryParser {
                             }
                         } else {
                             // if not gtid mode, need check weather the entry 
is loopback by specified column value
-                            needSync = checkNeedSync(sourceConfig, 
rowChange.getRowDatas(0));
+                            needSync = checkNeedSync(sourceConfig, rowChange);
                             if (needSync) {
                                 transactionDataBuffer.add(entry);
                             }
@@ -115,9 +115,16 @@ public class EntryParser {
         }
     }
 
-    private static boolean checkNeedSync(CanalSourceConfig sourceConfig, 
RowData rowData) {
-        Column markedColumn = 
getColumnIgnoreCase(rowData.getAfterColumnsList(),
-            sourceConfig.getNeedSyncMarkTableColumnName());
+    private static boolean checkNeedSync(CanalSourceConfig sourceConfig, 
RowChange rowChange) {
+        Column markedColumn = null;
+        CanalEntry.EventType eventType = rowChange.getEventType();
+        if (eventType.equals(CanalEntry.EventType.DELETE) || 
eventType.equals(CanalEntry.EventType.UPDATE)) {
+            markedColumn = 
getColumnIgnoreCase(rowChange.getRowDatas(0).getBeforeColumnsList(),
+                sourceConfig.getNeedSyncMarkTableColumnName());
+        } else if (eventType.equals(CanalEntry.EventType.INSERT)) {
+            markedColumn = 
getColumnIgnoreCase(rowChange.getRowDatas(0).getAfterColumnsList(),
+                sourceConfig.getNeedSyncMarkTableColumnName());
+        }
         if (markedColumn != null) {
             return StringUtils.equalsIgnoreCase(markedColumn.getValue(),
                 sourceConfig.getNeedSyncMarkTableColumnValue());
diff --git 
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java
 
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java
index 08270fc02..977661b13 100644
--- 
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java
+++ 
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java
@@ -41,6 +41,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Random;
 
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
@@ -236,7 +237,7 @@ public class AdminOffsetService implements 
OffsetManagementService {
         this.dataSourceType = offsetStorageConfig.getDataSourceType();
         this.dataSinkType = offsetStorageConfig.getDataSinkType();
 
-        this.adminServerAddr = offsetStorageConfig.getOffsetStorageAddr();
+        this.adminServerAddr = 
getRandomAdminServerAddr(offsetStorageConfig.getOffsetStorageAddr());
         this.channel = ManagedChannelBuilder.forTarget(adminServerAddr)
             .usePlaintext()
             .build();
@@ -274,4 +275,14 @@ public class AdminOffsetService implements 
OffsetManagementService {
         this.jobState = TaskState.RUNNING;
         this.jobId = offsetStorageConfig.getExtensions().get("jobId");
     }
+
+    private String getRandomAdminServerAddr(String adminServerAddrList) {
+        String[] addresses = adminServerAddrList.split(";");
+        if (addresses.length == 0) {
+            throw new IllegalArgumentException("Admin server address list is 
empty");
+        }
+        Random random = new Random();
+        int randomIndex = random.nextInt(addresses.length);
+        return addresses[randomIndex];
+    }
 }
diff --git 
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java
 
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java
index b3fc4346c..0a41e18f7 100644
--- 
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java
+++ 
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java
@@ -26,21 +26,30 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 
+import lombok.Getter;
+import lombok.Setter;
+
 /**
  * SourceDataEntries are generated by SourceTasks and passed to specific 
message queue to store.
  */
+@Getter
 public class ConnectRecord {
 
     private final String recordId = UUID.randomUUID().toString();
 
+    @Setter
     private Long timestamp;
 
+    @Setter
     private Object data;
 
+    @Setter
     private RecordPosition position;
 
+    @Setter
     private KeyValue extensions;
 
+    @Setter
     private SendMessageCallback callback;
 
     public ConnectRecord() {
@@ -63,42 +72,6 @@ public class ConnectRecord {
         this.data = data;
     }
 
-    public String getRecordId() {
-        return recordId;
-    }
-
-    public Long getTimestamp() {
-        return timestamp;
-    }
-
-    public void setTimestamp(Long timestamp) {
-        this.timestamp = timestamp;
-    }
-
-    public Object getData() {
-        return data;
-    }
-
-    public void setData(Object data) {
-        this.data = data;
-    }
-
-    public KeyValue getExtensions() {
-        return extensions;
-    }
-
-    public void setExtensions(KeyValue extensions) {
-        this.extensions = extensions;
-    }
-
-    public RecordPosition getPosition() {
-        return position;
-    }
-
-    public void setPosition(RecordPosition position) {
-        this.position = position;
-    }
-
     public void addExtension(KeyValue extensions) {
         if (this.extensions == null) {
             this.extensions = new DefaultKeyValue();
@@ -137,14 +110,6 @@ public class ConnectRecord {
         return this.extensions.getObject(key);
     }
 
-    public SendMessageCallback getCallback() {
-        return callback;
-    }
-
-    public void setCallback(SendMessageCallback callback) {
-        this.callback = callback;
-    }
-
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git 
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstance.java
 
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstance.java
index acea321e9..beb1d1eed 100644
--- 
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstance.java
+++ 
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstance.java
@@ -76,6 +76,8 @@ public class RuntimeInstance {
             }
             // use registry adminServiceAddr value replace config
             runtimeInstanceConfig.setAdminServiceAddr(adminServiceAddr);
+        } else {
+            adminServiceAddr = runtimeInstanceConfig.getAdminServiceAddr();
         }
 
         runtimeFactory = initRuntimeFactory(runtimeInstanceConfig);
@@ -84,23 +86,25 @@ public class RuntimeInstance {
     }
 
     public void start() throws Exception {
-        if (!StringUtils.isBlank(adminServiceAddr) && registryService != null) 
{
-            registryService.subscribe((event) -> {
-                log.info("runtime receive registry event: {}", event);
-                List<RegisterServerInfo> registerServerInfoList = 
event.getInstances();
-                Map<String, RegisterServerInfo> registerServerInfoMap = new 
HashMap<>();
-                for (RegisterServerInfo registerServerInfo : 
registerServerInfoList) {
-                    registerServerInfoMap.put(registerServerInfo.getAddress(), 
registerServerInfo);
-                }
-                if (!registerServerInfoMap.isEmpty()) {
-                    adminServerInfoMap = registerServerInfoMap;
-                    updateAdminServerAddr();
-                }
-            }, runtimeInstanceConfig.getAdminServiceName());
+        if (StringUtils.isBlank(adminServiceAddr)) {
+            throw new RuntimeException("admin server address is empty, please 
check");
+        } else {
+            if (registryService != null) {
+                registryService.subscribe((event) -> {
+                    log.info("runtime receive registry event: {}", event);
+                    List<RegisterServerInfo> registerServerInfoList = 
event.getInstances();
+                    Map<String, RegisterServerInfo> registerServerInfoMap = 
new HashMap<>();
+                    for (RegisterServerInfo registerServerInfo : 
registerServerInfoList) {
+                        
registerServerInfoMap.put(registerServerInfo.getAddress(), registerServerInfo);
+                    }
+                    if (!registerServerInfoMap.isEmpty()) {
+                        adminServerInfoMap = registerServerInfoMap;
+                        updateAdminServerAddr();
+                    }
+                }, runtimeInstanceConfig.getAdminServiceName());
+            }
             runtime.start();
             isStarted = true;
-        } else {
-            throw new RuntimeException("admin server address is empty, please 
check");
         }
     }
 
diff --git 
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java
 
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java
index 1e589ebd9..501f222fd 100644
--- 
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java
+++ 
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java
@@ -63,9 +63,12 @@ import org.apache.commons.lang3.StringUtils;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Random;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -132,6 +135,8 @@ public class ConnectorRuntime implements Runtime {
 
     public static final String CALLBACK_EXTENSION = "callBackExtension";
 
+    private String adminServerAddr;
+
 
     public ConnectorRuntime(RuntimeInstanceConfig runtimeInstanceConfig) {
         this.runtimeInstanceConfig = runtimeInstanceConfig;
@@ -149,8 +154,9 @@ public class ConnectorRuntime implements Runtime {
     }
 
     private void initAdminService() {
+        adminServerAddr = 
getRandomAdminServerAddr(runtimeInstanceConfig.getAdminServiceAddr());
         // create gRPC channel
-        channel = 
ManagedChannelBuilder.forTarget(runtimeInstanceConfig.getAdminServiceAddr()).usePlaintext().build();
+        channel = 
ManagedChannelBuilder.forTarget(adminServerAddr).usePlaintext().build();
 
         adminServiceStub = 
AdminServiceGrpc.newStub(channel).withWaitForReady();
 
@@ -176,6 +182,16 @@ public class ConnectorRuntime implements Runtime {
         requestObserver = adminServiceStub.invokeBiStream(responseObserver);
     }
 
+    private String getRandomAdminServerAddr(String adminServerAddrList) {
+        String[] addresses = adminServerAddrList.split(";");
+        if (addresses.length == 0) {
+            throw new IllegalArgumentException("Admin server address list is 
empty");
+        }
+        Random random = new Random();
+        int randomIndex = random.nextInt(addresses.length);
+        return addresses[randomIndex];
+    }
+
     private void initStorageService() {
         // TODO: init producer & consumer
         producer = 
StoragePluginFactory.getMeshMQProducer(runtimeInstanceConfig.getStoragePluginType());
@@ -202,25 +218,18 @@ public class ConnectorRuntime implements Runtime {
         
connectorRuntimeConfig.setSinkConnectorDesc(jobResponse.getConnectorConfig().getSinkConnectorDesc());
         
connectorRuntimeConfig.setSinkConnectorConfig(jobResponse.getConnectorConfig().getSinkConnectorConfig());
 
-        ConnectorCreateService<?> sourceConnectorCreateService =
-            
ConnectorPluginFactory.createConnector(connectorRuntimeConfig.getSourceConnectorType()
 + "-Source");
-        sourceConnector = (Source) sourceConnectorCreateService.create();
-
-        SourceConfig sourceConfig = (SourceConfig) 
ConfigUtil.parse(connectorRuntimeConfig.getSourceConnectorConfig(), 
sourceConnector.configClass());
-        SourceConnectorContext sourceConnectorContext = new 
SourceConnectorContext();
-        sourceConnectorContext.setSourceConfig(sourceConfig);
-        
sourceConnectorContext.setRuntimeConfig(connectorRuntimeConfig.getRuntimeConfig());
-        sourceConnectorContext.setOffsetStorageReader(offsetStorageReader);
-        if (CollectionUtils.isNotEmpty(jobResponse.getPosition())) {
-            
sourceConnectorContext.setRecordPositionList(jobResponse.getPosition());
-        }
-
         // spi load offsetMgmtService
         this.offsetManagement = new RecordOffsetManagement();
         this.committableOffsets = 
RecordOffsetManagement.CommittableOffsets.EMPTY;
-        OffsetStorageConfig offsetStorageConfig = 
sourceConfig.getOffsetStorageConfig();
+        OffsetStorageConfig offsetStorageConfig = new OffsetStorageConfig();
+        
offsetStorageConfig.setOffsetStorageAddr(connectorRuntimeConfig.getRuntimeConfig().get("offsetStorageAddr").toString());
+        
offsetStorageConfig.setOffsetStorageType(connectorRuntimeConfig.getRuntimeConfig().get("offsetStoragePluginType").toString());
         
offsetStorageConfig.setDataSourceType(jobResponse.getTransportType().getSrc());
         
offsetStorageConfig.setDataSinkType(jobResponse.getTransportType().getDst());
+        Map<String, String> offsetStorageExtensions = new HashMap<>();
+        offsetStorageExtensions.put("jobId", 
connectorRuntimeConfig.getJobID());
+        offsetStorageConfig.setExtensions(offsetStorageExtensions);
+
         this.offsetManagementService = 
Optional.ofNullable(offsetStorageConfig).map(OffsetStorageConfig::getOffsetStorageType)
             .map(storageType -> 
EventMeshExtensionFactory.getExtension(OffsetManagementService.class, 
storageType))
             .orElse(new DefaultOffsetManagementServiceImpl());
@@ -228,6 +237,18 @@ public class ConnectorRuntime implements Runtime {
         this.offsetStorageWriter = new 
OffsetStorageWriterImpl(offsetManagementService);
         this.offsetStorageReader = new 
OffsetStorageReaderImpl(offsetManagementService);
 
+        ConnectorCreateService<?> sourceConnectorCreateService =
+            
ConnectorPluginFactory.createConnector(connectorRuntimeConfig.getSourceConnectorType()
 + "-Source");
+        sourceConnector = (Source) sourceConnectorCreateService.create();
+
+        SourceConfig sourceConfig = (SourceConfig) 
ConfigUtil.parse(connectorRuntimeConfig.getSourceConnectorConfig(), 
sourceConnector.configClass());
+        SourceConnectorContext sourceConnectorContext = new 
SourceConnectorContext();
+        sourceConnectorContext.setSourceConfig(sourceConfig);
+        
sourceConnectorContext.setRuntimeConfig(connectorRuntimeConfig.getRuntimeConfig());
+        sourceConnectorContext.setOffsetStorageReader(offsetStorageReader);
+        if (CollectionUtils.isNotEmpty(jobResponse.getPosition())) {
+            
sourceConnectorContext.setRecordPositionList(jobResponse.getPosition());
+        }
         sourceConnector.init(sourceConnectorContext);
 
         ConnectorCreateService<?> sinkConnectorCreateService =
@@ -330,6 +351,9 @@ public class ConnectorRuntime implements Runtime {
             // TODO: use producer pub record to storage replace below
             if (connectorRecordList != null && !connectorRecordList.isEmpty()) 
{
                 for (ConnectRecord record : connectorRecordList) {
+
+                    queue.put(record);
+
                     // if enabled incremental data reporting consistency check
                     if 
(connectorRuntimeConfig.enableIncrementalDataConsistencyCheck) {
                         reportVerifyRequest(record, connectorRuntimeConfig, 
ConnectorStage.SOURCE);
@@ -363,8 +387,6 @@ public class ConnectorRuntime implements Runtime {
                         }
                     });
 
-                    queue.put(record);
-
                     offsetManagement.awaitAllMessages(5000, 
TimeUnit.MILLISECONDS);
                     // update & commit offset
                     updateCommittableOffsets();
diff --git a/eventmesh-runtime-v2/src/main/resources/connector.yaml 
b/eventmesh-runtime-v2/src/main/resources/connector.yaml
index 2e79e5ced..3e407fa3e 100644
--- a/eventmesh-runtime-v2/src/main/resources/connector.yaml
+++ b/eventmesh-runtime-v2/src/main/resources/connector.yaml
@@ -15,7 +15,9 @@
 # limitations under the License.
 #
 
-taskID: 1
-jobID: 1
+taskID: 9c18a0d2-7a61-482c-8275-34f8c2786cea
+jobID: a01fd5e1-d295-4b89-99bc-0ae23eb85acf
 region: region1
 runtimeConfig: # this used for connector runtime config
+  offsetStoragePluginType: admin
+  offsetStorageAddr: "127.0.0.1:8081;127.0.0.1:8081"
\ No newline at end of file
diff --git a/eventmesh-runtime-v2/src/main/resources/runtime.yaml 
b/eventmesh-runtime-v2/src/main/resources/runtime.yaml
index c5ffac9d9..9ac36f27b 100644
--- a/eventmesh-runtime-v2/src/main/resources/runtime.yaml
+++ b/eventmesh-runtime-v2/src/main/resources/runtime.yaml
@@ -21,4 +21,4 @@ registryServerAddr: 127.0.0.1:8085
 registryPluginType: nacos
 storagePluginType: memory
 adminServiceName: eventmesh-admin
-adminServiceAddr: "127.0.0.1:8085;127.0.0.1:8086"
+adminServiceAddr: "127.0.0.1:8081;127.0.0.1:8081"


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to